001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc.adapter; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.sql.Blob; 022import java.sql.Connection; 023import java.sql.PreparedStatement; 024import java.sql.ResultSet; 025import java.sql.SQLException; 026 027import org.apache.activemq.command.ActiveMQDestination; 028import org.apache.activemq.command.MessageId; 029import org.apache.activemq.command.XATransactionId; 030import org.apache.activemq.store.jdbc.Statements; 031import org.apache.activemq.store.jdbc.TransactionContext; 032import org.apache.activemq.util.ByteArrayOutputStream; 033 034/** 035 * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob() 036 * operations. This is a little more involved since to insert a blob you have 037 * to: 038 * 039 * 1: insert empty blob. 2: select the blob 3: finally update the blob with data 040 * value. 041 * 042 * The databases/JDBC drivers that use this adapter are: 043 * <ul> 044 * <li></li> 045 * </ul> 046 * 047 * @org.apache.xbean.XBean element="blobJDBCAdapter" 048 * 049 * 050 */ 051public class BlobJDBCAdapter extends DefaultJDBCAdapter { 052 053 @Override 054 public void setStatements(Statements statements) { 055 056 String addMessageStatement = "INSERT INTO " 057 + statements.getFullMessageTableName() 058 + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG, XID) VALUES (?, ?, ?, ?, ?, ?, empty_blob(), empty_blob())"; 059 statements.setAddMessageStatement(addMessageStatement); 060 061 String findMessageByIdStatement = "SELECT MSG FROM " + 062 statements.getFullMessageTableName() + " WHERE ID=? FOR UPDATE"; 063 statements.setFindMessageByIdStatement(findMessageByIdStatement); 064 065 super.setStatements(statements); 066 } 067 068 @Override 069 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, 070 long expiration, byte priority, XATransactionId xid) throws SQLException, IOException { 071 PreparedStatement s = null; 072 cleanupExclusiveLock.readLock().lock(); 073 try { 074 // Add the Blob record. 075 s = c.getConnection().prepareStatement(statements.getAddMessageStatement()); 076 s.setLong(1, sequence); 077 s.setString(2, messageID.getProducerId().toString()); 078 s.setLong(3, messageID.getProducerSequenceId()); 079 s.setString(4, destination.getQualifiedName()); 080 s.setLong(5, expiration); 081 s.setLong(6, priority); 082 083 if (s.executeUpdate() != 1) { 084 throw new IOException("Failed to add broker message: " + messageID + " in container."); 085 } 086 s.close(); 087 088 // Select the blob record so that we can update it. 089 updateBlob(c.getConnection(), statements.getFindMessageByIdStatement(), sequence, data); 090 if (xid != null) { 091 byte[] xidVal = xid.getEncodedXidBytes(); 092 xidVal[0] = '+'; 093 updateBlob(c.getConnection(), statements.getFindXidByIdStatement(), sequence, xidVal); 094 } 095 096 } finally { 097 cleanupExclusiveLock.readLock().unlock(); 098 close(s); 099 } 100 } 101 102 private void updateBlob(Connection connection, String findMessageByIdStatement, long sequence, byte[] data) throws SQLException, IOException { 103 PreparedStatement s = null; 104 ResultSet rs = null; 105 try { 106 s = connection.prepareStatement(statements.getFindMessageByIdStatement(), 107 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE); 108 s.setLong(1, sequence); 109 rs = s.executeQuery(); 110 if (!rs.next()) { 111 throw new IOException("Failed select blob for message: " + sequence + " in container."); 112 } 113 114 // Update the blob 115 Blob blob = rs.getBlob(1); 116 blob.truncate(0); 117 blob.setBytes(1, data); 118 rs.updateBlob(1, blob); 119 rs.updateRow(); // Update the row with the updated blob 120 } finally { 121 close(rs); 122 close(s); 123 } 124 } 125 126 @Override 127 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException { 128 PreparedStatement s = null; 129 ResultSet rs = null; 130 cleanupExclusiveLock.readLock().lock(); 131 try { 132 133 s = c.getConnection().prepareStatement(statements.getFindMessageStatement()); 134 s.setString(1, id.getProducerId().toString()); 135 s.setLong(2, id.getProducerSequenceId()); 136 rs = s.executeQuery(); 137 138 if (!rs.next()) { 139 return null; 140 } 141 Blob blob = rs.getBlob(1); 142 InputStream is = blob.getBinaryStream(); 143 144 ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length()); 145 int ch; 146 while ((ch = is.read()) >= 0) { 147 os.write(ch); 148 } 149 is.close(); 150 os.close(); 151 152 return os.toByteArray(); 153 154 } finally { 155 cleanupExclusiveLock.readLock().unlock(); 156 close(rs); 157 close(s); 158 } 159 } 160 161}