001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc.adapter;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.sql.Blob;
022import java.sql.Connection;
023import java.sql.PreparedStatement;
024import java.sql.ResultSet;
025import java.sql.SQLException;
026
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.MessageId;
029import org.apache.activemq.command.XATransactionId;
030import org.apache.activemq.store.jdbc.Statements;
031import org.apache.activemq.store.jdbc.TransactionContext;
032import org.apache.activemq.util.ByteArrayOutputStream;
033
034/**
035 * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob()
036 * operations. This is a little more involved since to insert a blob you have
037 * to:
038 * 
039 * 1: insert empty blob. 2: select the blob 3: finally update the blob with data
040 * value.
041 * 
042 * The databases/JDBC drivers that use this adapter are:
043 * <ul>
044 * <li></li>
045 * </ul>
046 * 
047 * @org.apache.xbean.XBean element="blobJDBCAdapter"
048 * 
049 * 
050 */
051public class BlobJDBCAdapter extends DefaultJDBCAdapter {
052
053    @Override
054    public void setStatements(Statements statements) {
055
056        String addMessageStatement = "INSERT INTO "
057            + statements.getFullMessageTableName()
058            + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG, XID) VALUES (?, ?, ?, ?, ?, ?, empty_blob(), empty_blob())";
059        statements.setAddMessageStatement(addMessageStatement);
060
061        String findMessageByIdStatement = "SELECT MSG FROM " +
062                statements.getFullMessageTableName() + " WHERE ID=? FOR UPDATE";
063        statements.setFindMessageByIdStatement(findMessageByIdStatement);
064
065        super.setStatements(statements);
066    }
067
068    @Override
069    public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
070                             long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
071        PreparedStatement s = null;
072        cleanupExclusiveLock.readLock().lock();
073        try {
074            // Add the Blob record.
075            s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
076            s.setLong(1, sequence);
077            s.setString(2, messageID.getProducerId().toString());
078            s.setLong(3, messageID.getProducerSequenceId());
079            s.setString(4, destination.getQualifiedName());
080            s.setLong(5, expiration);
081            s.setLong(6, priority);
082
083            if (s.executeUpdate() != 1) {
084                throw new IOException("Failed to add broker message: " + messageID + " in container.");
085            }
086            s.close();
087
088            // Select the blob record so that we can update it.
089            updateBlob(c.getConnection(), statements.getFindMessageByIdStatement(), sequence, data);
090            if (xid != null) {
091                byte[] xidVal = xid.getEncodedXidBytes();
092                xidVal[0] = '+';
093                updateBlob(c.getConnection(), statements.getFindXidByIdStatement(), sequence, xidVal);
094            }
095
096        } finally {
097            cleanupExclusiveLock.readLock().unlock();
098            close(s);
099        }
100    }
101
102    private void updateBlob(Connection connection, String findMessageByIdStatement, long sequence, byte[] data) throws SQLException, IOException {
103        PreparedStatement s = null;
104        ResultSet rs = null;
105        try {
106            s = connection.prepareStatement(statements.getFindMessageByIdStatement(),
107                ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
108            s.setLong(1, sequence);
109            rs = s.executeQuery();
110            if (!rs.next()) {
111                throw new IOException("Failed select blob for message: " + sequence + " in container.");
112            }
113
114            // Update the blob
115            Blob blob = rs.getBlob(1);
116            blob.truncate(0);
117            blob.setBytes(1, data);
118            rs.updateBlob(1, blob);
119            rs.updateRow();             // Update the row with the updated blob
120        } finally {
121            close(rs);
122            close(s);
123        }
124    }
125
126    @Override
127    public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
128        PreparedStatement s = null;
129        ResultSet rs = null;
130        cleanupExclusiveLock.readLock().lock();
131        try {
132
133            s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
134            s.setString(1, id.getProducerId().toString());
135            s.setLong(2, id.getProducerSequenceId());
136            rs = s.executeQuery();
137
138            if (!rs.next()) {
139                return null;
140            }
141            Blob blob = rs.getBlob(1);
142            InputStream is = blob.getBinaryStream();
143
144            ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());
145            int ch;
146            while ((ch = is.read()) >= 0) {
147                os.write(ch);
148            }
149            is.close();
150            os.close();
151
152            return os.toByteArray();
153
154        } finally {
155            cleanupExclusiveLock.readLock().unlock();
156            close(rs);
157            close(s);
158        }
159    }
160
161}