001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc.adapter;
018
019import java.io.IOException;
020import java.sql.Connection;
021import java.sql.PreparedStatement;
022import java.sql.ResultSet;
023import java.sql.SQLException;
024import java.sql.Statement;
025import java.util.ArrayList;
026import java.util.HashSet;
027import java.util.LinkedList;
028import java.util.Set;
029import java.util.concurrent.locks.ReadWriteLock;
030import java.util.concurrent.locks.ReentrantReadWriteLock;
031
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.MessageId;
034import org.apache.activemq.command.ProducerId;
035import org.apache.activemq.command.SubscriptionInfo;
036import org.apache.activemq.command.XATransactionId;
037import org.apache.activemq.store.jdbc.JDBCAdapter;
038import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
039import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
040import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
041import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
042import org.apache.activemq.store.jdbc.Statements;
043import org.apache.activemq.store.jdbc.TransactionContext;
044import org.apache.activemq.util.DataByteArrayOutputStream;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import static javax.xml.bind.DatatypeConverter.parseBase64Binary;
049import static javax.xml.bind.DatatypeConverter.printBase64Binary;
050
051/**
052 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
053 * encouraged to override the default implementation of methods to account for differences in JDBC Driver
054 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
055 * The databases/JDBC drivers that use this adapter are:
056 * <ul>
057 * <li></li>
058 * </ul>
059 * 
060 * @org.apache.xbean.XBean element="defaultJDBCAdapter"
061 * 
062 * 
063 */
064public class DefaultJDBCAdapter implements JDBCAdapter {
065    private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
066    public static final int MAX_ROWS = org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE;
067    protected Statements statements;
068    private boolean batchStatements = true;
069    //This is deprecated and should be removed in a future release
070    protected boolean batchStatments = true;
071    protected boolean prioritizedMessages;
072    protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
073    protected int maxRows = MAX_ROWS;
074
075    protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
076        s.setBytes(index, data);
077    }
078
079    protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
080        return rs.getBytes(index);
081    }
082
083    public void doCreateTables(TransactionContext c) throws SQLException, IOException {
084        Statement s = null;
085        cleanupExclusiveLock.writeLock().lock();
086        try {
087            // Check to see if the table already exists. If it does, then don't
088            // log warnings during startup.
089            // Need to run the scripts anyways since they may contain ALTER
090            // statements that upgrade a previous version
091            // of the table
092            boolean alreadyExists = false;
093            ResultSet rs = null;
094            try {
095                rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
096                        new String[] { "TABLE" });
097                alreadyExists = rs.next();
098            } catch (Throwable ignore) {
099            } finally {
100                close(rs);
101            }
102            s = c.getConnection().createStatement();
103            String[] createStatments = this.statements.getCreateSchemaStatements();
104            for (int i = 0; i < createStatments.length; i++) {
105                // This will fail usually since the tables will be
106                // created already.
107                try {
108                    LOG.debug("Executing SQL: " + createStatments[i]);
109                    s.execute(createStatments[i]);
110                } catch (SQLException e) {
111                    if (alreadyExists) {
112                        LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
113                                + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
114                                + " Vendor code: " + e.getErrorCode());
115                    } else {
116                        LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
117                                + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
118                                + " Vendor code: " + e.getErrorCode());
119                        JDBCPersistenceAdapter.log("Failure details: ", e);
120                    }
121                }
122            }
123
124            // if autoCommit used do not call commit
125            if(!c.getConnection().getAutoCommit()){
126                c.getConnection().commit();
127            }
128
129        } finally {
130            cleanupExclusiveLock.writeLock().unlock();
131            try {
132                s.close();
133            } catch (Throwable e) {
134            }
135        }
136    }
137
138    public void doDropTables(TransactionContext c) throws SQLException, IOException {
139        Statement s = null;
140        cleanupExclusiveLock.writeLock().lock();
141        try {
142            s = c.getConnection().createStatement();
143            String[] dropStatments = this.statements.getDropSchemaStatements();
144            for (int i = 0; i < dropStatments.length; i++) {
145                // This will fail usually since the tables will be
146                // created already.
147                try {
148                    LOG.debug("Executing SQL: " + dropStatments[i]);
149                    s.execute(dropStatments[i]);
150                } catch (SQLException e) {
151                    LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
152                            + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
153                            + e.getErrorCode());
154                    JDBCPersistenceAdapter.log("Failure details: ", e);
155                }
156            }
157            // if autoCommit used do not call commit
158            if(!c.getConnection().getAutoCommit()){
159               c.getConnection().commit();
160            }
161        } finally {
162            cleanupExclusiveLock.writeLock().unlock();
163            try {
164                s.close();
165            } catch (Throwable e) {
166            }
167        }
168    }
169
170    public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
171        PreparedStatement s = null;
172        ResultSet rs = null;
173        cleanupExclusiveLock.readLock().lock();
174        try {
175            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
176            rs = s.executeQuery();
177            long seq1 = 0;
178            if (rs.next()) {
179                seq1 = rs.getLong(1);
180            }
181            rs.close();
182            s.close();
183            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
184            rs = s.executeQuery();
185            long seq2 = 0;
186            if (rs.next()) {
187                seq2 = rs.getLong(1);
188            }
189            long seq = Math.max(seq1, seq2);
190            return seq;
191        } finally {
192            cleanupExclusiveLock.readLock().unlock();
193            close(rs);
194            close(s);
195        }
196    }
197    
198    public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
199        PreparedStatement s = null;
200        ResultSet rs = null;
201        cleanupExclusiveLock.readLock().lock();
202        try {
203            s = c.getConnection().prepareStatement(
204                    this.statements.getFindMessageByIdStatement());
205            s.setLong(1, storeSequenceId);
206            rs = s.executeQuery();
207            if (!rs.next()) {
208                return null;
209            }
210            return getBinaryData(rs, 1);
211        } finally {
212            cleanupExclusiveLock.readLock().unlock();
213            close(rs);
214            close(s);
215        }
216    }
217
218
219    /**
220     * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
221     */
222    public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
223                             long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
224        PreparedStatement s = c.getAddMessageStatement();
225        cleanupExclusiveLock.readLock().lock();
226        try {
227            if (s == null) {
228                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
229                if (this.batchStatements) {
230                    c.setAddMessageStatement(s);
231                }
232            }
233            s.setLong(1, sequence);
234            s.setString(2, messageID.getProducerId().toString());
235            s.setLong(3, messageID.getProducerSequenceId());
236            s.setString(4, destination.getQualifiedName());
237            s.setLong(5, expiration);
238            s.setLong(6, priority);
239            setBinaryData(s, 7, data);
240            if (xid != null) {
241                byte[] xidVal = xid.getEncodedXidBytes();
242                xidVal[0] = '+';
243                String xidString = printBase64Binary(xidVal);
244                s.setString(8, xidString);
245            } else {
246                s.setString(8, null);
247            }
248            if (this.batchStatements) {
249                s.addBatch();
250            } else if (s.executeUpdate() != 1) {
251                throw new SQLException("Failed add a message");
252            }
253        } finally {
254            cleanupExclusiveLock.readLock().unlock();
255            if (!this.batchStatements) {
256                if (s != null) {
257                    s.close();
258                }
259            }
260        }
261    }
262
263    @Override
264    public void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException {
265        PreparedStatement s = null;
266        cleanupExclusiveLock.readLock().lock();
267        try {
268            s = c.getConnection().prepareStatement(this.statements.getUpdateMessageStatement());
269            setBinaryData(s, 1, data);
270            s.setString(2, id.getProducerId().toString());
271            s.setLong(3, id.getProducerSequenceId());
272            s.setString(4, destination.getQualifiedName());
273            if (s.executeUpdate() != 1) {
274                throw new IOException("Could not update message: " + id + " in " + destination);
275            }
276        } finally {
277            cleanupExclusiveLock.readLock().unlock();
278            close(s);
279        }
280    }
281
282
283    public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
284            long expirationTime, String messageRef) throws SQLException, IOException {
285        PreparedStatement s = c.getAddMessageStatement();
286        cleanupExclusiveLock.readLock().lock();
287        try {
288            if (s == null) {
289                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
290                if (this.batchStatements) {
291                    c.setAddMessageStatement(s);
292                }
293            }
294            s.setLong(1, messageID.getBrokerSequenceId());
295            s.setString(2, messageID.getProducerId().toString());
296            s.setLong(3, messageID.getProducerSequenceId());
297            s.setString(4, destination.getQualifiedName());
298            s.setLong(5, expirationTime);
299            s.setString(6, messageRef);
300            if (this.batchStatements) {
301                s.addBatch();
302            } else if (s.executeUpdate() != 1) {
303                throw new SQLException("Failed add a message");
304            }
305        } finally {
306            cleanupExclusiveLock.readLock().unlock();
307            if (!this.batchStatements) {
308                s.close();
309            }
310        }
311    }
312
313    public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
314        PreparedStatement s = null;
315        ResultSet rs = null;
316        cleanupExclusiveLock.readLock().lock();
317        try {
318            s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
319            s.setString(1, messageID.getProducerId().toString());
320            s.setLong(2, messageID.getProducerSequenceId());
321            s.setString(3, destination.getQualifiedName());
322            rs = s.executeQuery();
323            if (!rs.next()) {
324                return new long[]{0,0};
325            }
326            return new long[]{rs.getLong(1), rs.getLong(2)};
327        } finally {
328            cleanupExclusiveLock.readLock().unlock();
329            close(rs);
330            close(s);
331        }
332    }
333
334    public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
335        PreparedStatement s = null;
336        ResultSet rs = null;
337        cleanupExclusiveLock.readLock().lock();
338        try {
339            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
340            s.setString(1, id.getProducerId().toString());
341            s.setLong(2, id.getProducerSequenceId());
342            rs = s.executeQuery();
343            if (!rs.next()) {
344                return null;
345            }
346            return getBinaryData(rs, 1);
347        } finally {
348            cleanupExclusiveLock.readLock().unlock();
349            close(rs);
350            close(s);
351        }
352    }
353
354    public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
355        PreparedStatement s = null;
356        ResultSet rs = null;
357        cleanupExclusiveLock.readLock().lock();
358        try {
359            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
360            s.setLong(1, seq);
361            rs = s.executeQuery();
362            if (!rs.next()) {
363                return null;
364            }
365            return rs.getString(1);
366        } finally {
367            cleanupExclusiveLock.readLock().unlock();
368            close(rs);
369            close(s);
370        }
371    }
372
373    /**
374     * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
375     */
376    public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException {
377        PreparedStatement s = c.getRemovedMessageStatement();
378        cleanupExclusiveLock.readLock().lock();
379        try {
380            if (s == null) {
381                s = c.getConnection().prepareStatement(xid == null ?
382                        this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement());
383                if (this.batchStatements) {
384                    c.setRemovedMessageStatement(s);
385                }
386            }
387            if (xid == null) {
388                s.setLong(1, seq);
389            } else {
390                byte[] xidVal = xid.getEncodedXidBytes();
391                xidVal[0] = '-';
392                String xidString = printBase64Binary(xidVal);
393                s.setString(1, xidString);
394                s.setLong(2, seq);
395            }
396            if (this.batchStatements) {
397                s.addBatch();
398            } else if (s.executeUpdate() != 1) {
399                throw new SQLException("Failed to remove message seq: " + seq);
400            }
401        } finally {
402            cleanupExclusiveLock.readLock().unlock();
403            if (!this.batchStatements && s != null) {
404                s.close();
405            }
406        }
407    }
408
409    public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
410            throws Exception {
411        PreparedStatement s = null;
412        ResultSet rs = null;
413        cleanupExclusiveLock.readLock().lock();
414        try {
415            s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
416            s.setString(1, destination.getQualifiedName());
417            rs = s.executeQuery();
418            if (this.statements.isUseExternalMessageReferences()) {
419                while (rs.next()) {
420                    if (!listener.recoverMessageReference(rs.getString(2))) {
421                        break;
422                    }
423                }
424            } else {
425                while (rs.next()) {
426                    if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
427                        break;
428                    }
429                }
430            }
431        } finally {
432            cleanupExclusiveLock.readLock().unlock();
433            close(rs);
434            close(s);
435        }
436    }
437
438    public void doMessageIdScan(TransactionContext c, int limit, 
439            JDBCMessageIdScanListener listener) throws SQLException, IOException {
440        PreparedStatement s = null;
441        ResultSet rs = null;
442        cleanupExclusiveLock.readLock().lock();
443        try {
444            s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
445            s.setMaxRows(limit);
446            rs = s.executeQuery();
447            // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
448            LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
449            while (rs.next()) {
450                reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
451            }
452            if (LOG.isDebugEnabled()) {
453                LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
454            }
455            for (MessageId id : reverseOrderIds) {
456                listener.messageId(id);
457            }
458        } finally {
459            cleanupExclusiveLock.readLock().unlock();
460            close(rs);
461            close(s);
462        }
463    }
464    
465    public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
466                                         String subscriptionName, long seq, long priority) throws SQLException, IOException {
467        PreparedStatement s = c.getUpdateLastAckStatement();
468        cleanupExclusiveLock.readLock().lock();
469        try {
470            if (s == null) {
471                s = c.getConnection().prepareStatement(xid == null ?
472                        this.statements.getUpdateDurableLastAckWithPriorityStatement() :
473                        this.statements.getUpdateDurableLastAckWithPriorityInTxStatement());
474                if (this.batchStatements) {
475                    c.setUpdateLastAckStatement(s);
476                }
477            }
478            if (xid != null) {
479                byte[] xidVal = encodeXid(xid, seq, priority);
480                String xidString = printBase64Binary(xidVal);
481                s.setString(1, xidString);
482            } else {
483                s.setLong(1, seq);
484            }
485            s.setString(2, destination.getQualifiedName());
486            s.setString(3, clientId);
487            s.setString(4, subscriptionName);
488            s.setLong(5, priority);
489            if (this.batchStatements) {
490                s.addBatch();
491            } else if (s.executeUpdate() != 1) {
492                throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName);
493            }
494        } finally {
495            cleanupExclusiveLock.readLock().unlock();
496            if (!this.batchStatements) {
497                close(s);
498            }
499        }
500    }
501
502
503    public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
504                             String subscriptionName, long seq, long priority) throws SQLException, IOException {
505        PreparedStatement s = c.getUpdateLastAckStatement();
506        cleanupExclusiveLock.readLock().lock();
507        try {
508            if (s == null) {
509                s = c.getConnection().prepareStatement(xid == null ?
510                        this.statements.getUpdateDurableLastAckStatement() :
511                        this.statements.getUpdateDurableLastAckInTxStatement());
512                if (this.batchStatements) {
513                    c.setUpdateLastAckStatement(s);
514                }
515            }
516            if (xid != null) {
517                byte[] xidVal = encodeXid(xid, seq, priority);
518                String xidString = printBase64Binary(xidVal);
519                s.setString(1, xidString);
520            } else {
521                s.setLong(1, seq);
522            }
523            s.setString(2, destination.getQualifiedName());
524            s.setString(3, clientId);
525            s.setString(4, subscriptionName);
526
527            if (this.batchStatements) {
528                s.addBatch();
529            } else if (s.executeUpdate() != 1) {
530                throw new IOException("Could not update last ack seq : "
531                            + seq + ", for sub: " + subscriptionName);
532            }
533        } finally {
534            cleanupExclusiveLock.readLock().unlock();
535            if (!this.batchStatements) {
536                close(s);
537            }            
538        }
539    }
540
541    private byte[] encodeXid(XATransactionId xid, long seq, long priority) {
542        byte[] xidVal = xid.getEncodedXidBytes();
543        // encode the update
544        DataByteArrayOutputStream outputStream = xid.internalOutputStream();
545        outputStream.position(1);
546        outputStream.writeLong(seq);
547        outputStream.writeByte(Long.valueOf(priority).byteValue());
548        return xidVal;
549    }
550
551    @Override
552    public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException {
553        PreparedStatement s = null;
554        cleanupExclusiveLock.readLock().lock();
555        try {
556            s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement());
557            s.setString(1, destination.getQualifiedName());
558            s.setString(2, clientId);
559            s.setString(3, subName);
560            s.setLong(4, priority);
561            if (s.executeUpdate() != 1) {
562                throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName);
563            }
564        } finally {
565            cleanupExclusiveLock.readLock().unlock();
566            close(s);
567        }
568    }
569
570    public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
571            String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
572        // dumpTables(c,
573        // destination.getQualifiedName(),clientId,subscriptionName);
574        PreparedStatement s = null;
575        ResultSet rs = null;
576        cleanupExclusiveLock.readLock().lock();
577        try {
578            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
579            s.setString(1, destination.getQualifiedName());
580            s.setString(2, clientId);
581            s.setString(3, subscriptionName);
582            rs = s.executeQuery();
583            if (this.statements.isUseExternalMessageReferences()) {
584                while (rs.next()) {
585                    if (!listener.recoverMessageReference(rs.getString(2))) {
586                        break;
587                    }
588                }
589            } else {
590                while (rs.next()) {
591                    if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
592                        break;
593                    }
594                }
595            }
596        } finally {
597            cleanupExclusiveLock.readLock().unlock();
598            close(rs);
599            close(s);
600        }
601    }
602
603    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
604            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
605        
606        PreparedStatement s = null;
607        ResultSet rs = null;
608        cleanupExclusiveLock.readLock().lock();
609        try {
610            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
611            s.setMaxRows(Math.min(maxReturned * 2, maxRows));
612            s.setString(1, destination.getQualifiedName());
613            s.setString(2, clientId);
614            s.setString(3, subscriptionName);
615            s.setLong(4, seq);
616            rs = s.executeQuery();
617            int count = 0;
618            if (this.statements.isUseExternalMessageReferences()) {
619                while (rs.next() && count < maxReturned) {
620                    if (listener.recoverMessageReference(rs.getString(1))) {
621                        count++;
622                    }
623                }
624            } else {
625                while (rs.next() && count < maxReturned) {
626                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
627                        count++;
628                    }
629                }
630            }
631        } finally {
632            cleanupExclusiveLock.readLock().unlock();
633            close(rs);
634            close(s);
635        }
636    }
637
638    public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
639            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
640
641        PreparedStatement s = null;
642        ResultSet rs = null;
643        cleanupExclusiveLock.readLock().lock();
644        try {
645            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
646            s.setMaxRows(Math.min(maxReturned * 2, maxRows));
647            s.setString(1, destination.getQualifiedName());
648            s.setString(2, clientId);
649            s.setString(3, subscriptionName);
650            s.setLong(4, seq);
651            s.setLong(5, priority);
652            rs = s.executeQuery();
653            int count = 0;
654            if (this.statements.isUseExternalMessageReferences()) {
655                while (rs.next() && count < maxReturned) {
656                    if (listener.recoverMessageReference(rs.getString(1))) {
657                        count++;
658                    }
659                }
660            } else {
661                while (rs.next() && count < maxReturned) {
662                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
663                        count++;
664                    }
665                }
666            }
667        } finally {
668            cleanupExclusiveLock.readLock().unlock();
669            close(rs);
670            close(s);
671        }
672    }
673
674    public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
675            String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
676        PreparedStatement s = null;
677        ResultSet rs = null;
678        int result = 0;
679        cleanupExclusiveLock.readLock().lock();
680        try {
681            if (isPrioritizedMessages) {
682                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
683            } else {
684                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());    
685            }
686            s.setString(1, destination.getQualifiedName());
687            s.setString(2, clientId);
688            s.setString(3, subscriptionName);
689            rs = s.executeQuery();
690            if (rs.next()) {
691                result = rs.getInt(1);
692            }
693        } finally {
694            cleanupExclusiveLock.readLock().unlock();
695            close(rs);
696            close(s);
697        }
698        return result;
699    }
700
701    /**
702     * @param c 
703     * @param info 
704     * @param retroactive 
705     * @throws SQLException 
706     * @throws IOException 
707     */
708    public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
709            throws SQLException, IOException {
710        // dumpTables(c, destination.getQualifiedName(), clientId,
711        // subscriptionName);
712        PreparedStatement s = null;
713        cleanupExclusiveLock.readLock().lock();
714        try {
715            long lastMessageId = -1;
716            if (!retroactive) {
717                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
718                ResultSet rs = null;
719                try {
720                    rs = s.executeQuery();
721                    if (rs.next()) {
722                        lastMessageId = rs.getLong(1);
723                    }
724                } finally {
725                    close(rs);
726                    close(s);
727                }
728            }
729            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
730            int maxPriority = 1;
731            if (isPrioritizedMessages) {
732                maxPriority = 10;
733            }
734
735            for (int priority = 0; priority < maxPriority; priority++) {
736                s.setString(1, info.getDestination().getQualifiedName());
737                s.setString(2, info.getClientId());
738                s.setString(3, info.getSubscriptionName());
739                s.setString(4, info.getSelector());
740                s.setLong(5, lastMessageId);
741                s.setString(6, info.getSubscribedDestination().getQualifiedName());
742                s.setLong(7, priority);
743
744                if (s.executeUpdate() != 1) {
745                    throw new IOException("Could not create durable subscription for: " + info.getClientId());
746                }
747            }
748
749        } finally {
750            cleanupExclusiveLock.readLock().unlock();
751            close(s);
752        }
753    }
754
755    public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
756            String clientId, String subscriptionName) throws SQLException, IOException {
757        PreparedStatement s = null;
758        ResultSet rs = null;
759        cleanupExclusiveLock.readLock().lock();
760        try {
761            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
762            s.setString(1, destination.getQualifiedName());
763            s.setString(2, clientId);
764            s.setString(3, subscriptionName);
765            rs = s.executeQuery();
766            if (!rs.next()) {
767                return null;
768            }
769            SubscriptionInfo subscription = new SubscriptionInfo();
770            subscription.setDestination(destination);
771            subscription.setClientId(clientId);
772            subscription.setSubscriptionName(subscriptionName);
773            subscription.setSelector(rs.getString(1));
774            subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
775                    ActiveMQDestination.QUEUE_TYPE));
776            return subscription;
777        } finally {
778            cleanupExclusiveLock.readLock().unlock();
779            close(rs);
780            close(s);
781        }
782    }
783
784    public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
785            throws SQLException, IOException {
786        PreparedStatement s = null;
787        ResultSet rs = null;
788        cleanupExclusiveLock.readLock().lock();
789        try {
790            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
791            s.setString(1, destination.getQualifiedName());
792            rs = s.executeQuery();
793            ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
794            while (rs.next()) {
795                SubscriptionInfo subscription = new SubscriptionInfo();
796                subscription.setDestination(destination);
797                subscription.setSelector(rs.getString(1));
798                subscription.setSubscriptionName(rs.getString(2));
799                subscription.setClientId(rs.getString(3));
800                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
801                        ActiveMQDestination.QUEUE_TYPE));
802                rc.add(subscription);
803            }
804            return rc.toArray(new SubscriptionInfo[rc.size()]);
805        } finally {
806            cleanupExclusiveLock.readLock().unlock();
807            close(rs);
808            close(s);
809        }
810    }
811
812    public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
813            IOException {
814        PreparedStatement s = null;
815        cleanupExclusiveLock.readLock().lock();
816        try {
817            s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
818            s.setString(1, destinationName.getQualifiedName());
819            s.executeUpdate();
820            s.close();
821            s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
822            s.setString(1, destinationName.getQualifiedName());
823            s.executeUpdate();
824        } finally {
825            cleanupExclusiveLock.readLock().unlock();
826            close(s);
827        }
828    }
829
830    public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
831            String subscriptionName) throws SQLException, IOException {
832        PreparedStatement s = null;
833        cleanupExclusiveLock.readLock().lock();
834        try {
835            s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
836            s.setString(1, destination.getQualifiedName());
837            s.setString(2, clientId);
838            s.setString(3, subscriptionName);
839            s.executeUpdate();
840        } finally {
841            cleanupExclusiveLock.readLock().unlock();
842            close(s);
843        }
844    }
845
846    char priorityIterator = 0; // unsigned
847    public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
848        PreparedStatement s = null;
849        cleanupExclusiveLock.writeLock().lock();
850        try {
851            LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
852            s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
853            int priority = priorityIterator++%10;
854            s.setInt(1, priority);
855            s.setInt(2, priority);
856            int i = s.executeUpdate();
857            LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
858        } finally {
859            cleanupExclusiveLock.writeLock().unlock();
860            close(s);
861        }
862    }
863
864    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
865            String clientId, String subscriberName) throws SQLException, IOException {
866        PreparedStatement s = null;
867        ResultSet rs = null;
868        long result = -1;
869        cleanupExclusiveLock.readLock().lock();
870        try {
871            s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
872            s.setString(1, destination.getQualifiedName());
873            s.setString(2, clientId);
874            s.setString(3, subscriberName);
875            rs = s.executeQuery();
876            if (rs.next()) {
877                result = rs.getLong(1);
878                if (result == 0 && rs.wasNull()) {
879                    result = -1;
880                }
881            }
882        } finally {
883            cleanupExclusiveLock.readLock().unlock();
884            close(rs);
885            close(s);
886        }
887        return result;
888    }
889
890    protected static void close(PreparedStatement s) {
891        try {
892            s.close();
893        } catch (Throwable e) {
894        }
895    }
896
897    protected static void close(ResultSet rs) {
898        try {
899            rs.close();
900        } catch (Throwable e) {
901        }
902    }
903
904    public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
905        HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
906        PreparedStatement s = null;
907        ResultSet rs = null;
908        cleanupExclusiveLock.readLock().lock();
909        try {
910            s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
911            rs = s.executeQuery();
912            while (rs.next()) {
913                rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
914            }
915        } finally {
916            cleanupExclusiveLock.readLock().unlock();
917            close(rs);
918            close(s);
919        }
920        return rc;
921    }
922
923    /**
924     * @return true if batchStatements
925     */
926    public boolean isBatchStatements() {
927        return batchStatements;
928    }
929
930    /**
931     * Set the number of statements to process as a single batch DB update
932     * @param batchStatements
933     */
934    public void setBatchStatements(boolean batchStatements) {
935        this.batchStatements = batchStatements;
936        // The next lines are deprecated and should be removed in a future release
937        // and is here in case someone created their own
938       // this.batchStatments = batchStatements;
939    }
940
941    // Note - remove batchStatment in future distributions.  Here for backward compatibility
942    /**
943     * @return true if batchStements
944     */
945    public boolean isBatchStatments() {
946        return this.batchStatements;
947    }
948
949    /**
950     * This value batchStatments is deprecated and will be removed in a future release.  Use batchStatements instead (Note the 'e' in Statement)"
951     * @deprecated
952     * @param batchStatments
953     */
954    public void setBatchStatments(boolean batchStatments) {
955        LOG.warn("batchStatments is deprecated and will be removed in a future release.  Use batchStatements instead (Note the 'e' in Statement)");
956        this.batchStatements = batchStatments;
957        this.batchStatments = batchStatments;
958    }
959
960    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
961        this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
962    }
963
964    /**
965     * @return the statements
966     */
967    public Statements getStatements() {
968        return this.statements;
969    }
970
971    public void setStatements(Statements statements) {
972        this.statements = statements;
973    }
974
975    public int getMaxRows() {
976        return maxRows;
977    }
978
979    /**
980     * the max value for statement maxRows, used to limit jdbc queries
981     */
982    public void setMaxRows(int maxRows) {
983        this.maxRows = maxRows;
984    }
985
986    @Override
987    public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
988        PreparedStatement s = null;
989        cleanupExclusiveLock.readLock().lock();
990        try {
991            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
992            s.setString(1, destination.getQualifiedName());
993            s.setString(2, destination.getQualifiedName());
994            s.setString(3, destination.getQualifiedName());
995            s.setString(4, null);
996            s.setLong(5, 0);
997            s.setString(6, destination.getQualifiedName());
998            s.setLong(7, 11);  // entry out of priority range
999
1000            if (s.executeUpdate() != 1) {
1001                throw new IOException("Could not create ack record for destination: " + destination);
1002            }
1003        } finally {
1004            cleanupExclusiveLock.readLock().unlock();
1005            close(s);
1006        }
1007    }
1008
1009    @Override
1010    public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException {
1011        PreparedStatement s = null;
1012        ResultSet rs = null;
1013        cleanupExclusiveLock.readLock().lock();
1014        try {
1015            s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement());
1016            rs = s.executeQuery();
1017            while (rs.next()) {
1018                long id = rs.getLong(1);
1019                String encodedString = rs.getString(2);
1020                byte[] encodedXid = parseBase64Binary(encodedString);
1021                if (encodedXid[0] == '+') {
1022                    jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3));
1023                } else {
1024                    jdbcMemoryTransactionStore.recoverAck(id, encodedXid, getBinaryData(rs, 3));
1025                }
1026            }
1027
1028            close(rs);
1029            close(s);
1030
1031            s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement());
1032            rs = s.executeQuery();
1033            while (rs.next()) {
1034                String encodedString = rs.getString(1);
1035                byte[] encodedXid = parseBase64Binary(encodedString);
1036                String destination = rs.getString(2);
1037                String subName = rs.getString(3);
1038                String subId = rs.getString(4);
1039                jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
1040                        ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE),
1041                        subName, subId);
1042            }
1043        } finally {
1044            close(rs);
1045            cleanupExclusiveLock.readLock().unlock();
1046            close(s);
1047        }
1048    }
1049
1050    @Override
1051    public void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException {
1052        PreparedStatement s = null;
1053        cleanupExclusiveLock.readLock().lock();
1054        try {
1055            s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement());
1056            s.setLong(1, sequence);
1057            s.setLong(2, preparedSequence);
1058            if (s.executeUpdate() != 1) {
1059                throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence);
1060            }
1061        } finally {
1062            cleanupExclusiveLock.readLock().unlock();
1063            close(s);
1064        }
1065    }
1066
1067
1068    public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
1069            IOException {
1070        PreparedStatement s = null;
1071        ResultSet rs = null;
1072        int result = 0;
1073        cleanupExclusiveLock.readLock().lock();
1074        try {
1075            s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
1076            s.setString(1, destination.getQualifiedName());
1077            rs = s.executeQuery();
1078            if (rs.next()) {
1079                result = rs.getInt(1);
1080            }
1081        } finally {
1082            cleanupExclusiveLock.readLock().unlock();
1083            close(rs);
1084            close(s);
1085        }
1086        return result;
1087    }
1088
1089    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long maxSeq, long lastRecoveredSeq,
1090            long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
1091        PreparedStatement s = null;
1092        ResultSet rs = null;
1093        cleanupExclusiveLock.readLock().lock();
1094        try {
1095            if (isPrioritizedMessages) {
1096                s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
1097            } else {
1098                s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
1099            }
1100            s.setMaxRows(Math.min(maxReturned * 2, maxRows));
1101            s.setString(1, destination.getQualifiedName());
1102            s.setLong(2, lastRecoveredSeq);
1103            s.setLong(3, maxSeq);
1104            if (isPrioritizedMessages) {
1105                s.setLong(4, priority);
1106                s.setLong(5, priority);
1107            }
1108            rs = s.executeQuery();
1109            int count = 0;
1110            if (this.statements.isUseExternalMessageReferences()) {
1111                while (rs.next() && count < maxReturned) {
1112                    if (listener.recoverMessageReference(rs.getString(1))) {
1113                        count++;
1114                    } else {
1115                        LOG.debug("Stopped recover next messages");
1116                        break;
1117                    }
1118                }
1119            } else {
1120                while (rs.next() && count < maxReturned) {
1121                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
1122                        count++;
1123                    } else {
1124                        LOG.debug("Stopped recover next messages");
1125                        break;
1126                    }
1127                }
1128            }
1129        } catch (Exception e) {
1130            e.printStackTrace();
1131        } finally {
1132            cleanupExclusiveLock.readLock().unlock();
1133            close(rs);
1134            close(s);
1135        }
1136    }
1137
1138    public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1139            throws SQLException, IOException {
1140        PreparedStatement s = null;
1141        ResultSet rs = null;
1142        cleanupExclusiveLock.readLock().lock();
1143        try {
1144            s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1145            s.setString(1, id.toString());
1146            rs = s.executeQuery();
1147            long seq = -1;
1148            if (rs.next()) {
1149                seq = rs.getLong(1);
1150            }
1151            return seq;
1152        } finally {
1153            cleanupExclusiveLock.readLock().unlock();
1154            close(rs);
1155            close(s);
1156        }
1157    }
1158
1159    public static void dumpTables(Connection c, String destinationName, String clientId, String
1160      subscriptionName) throws SQLException { 
1161        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 
1162        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 
1163        PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 
1164                + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 
1165                + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 
1166                + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 
1167                + " ORDER BY M.ID");
1168      s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
1169      printQuery(s,System.out); }
1170
1171    public static void dumpTables(java.sql.Connection c) throws SQLException {
1172        printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_MSGS", System.out);
1173
1174        //printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_ACKS", System.out);
1175
1176        //printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out);
1177        //printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
1178    }
1179
1180    public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
1181            throws SQLException {
1182        printQuery(c.prepareStatement(query), out);
1183    }
1184
1185    public static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
1186            throws SQLException {
1187
1188        ResultSet set = null;
1189        try {
1190            set = s.executeQuery();
1191            java.sql.ResultSetMetaData metaData = set.getMetaData();
1192            for (int i = 1; i <= metaData.getColumnCount(); i++) {
1193                if (i == 1)
1194                    out.print("||");
1195                out.print(metaData.getColumnName(i) + "||");
1196            }
1197            out.println();
1198            while (set.next()) {
1199                for (int i = 1; i <= metaData.getColumnCount(); i++) {
1200                    if (i == 1)
1201                        out.print("|");
1202                    out.print(set.getString(i) + "|");
1203                }
1204                out.println();
1205            }
1206        } finally {
1207            try {
1208                set.close();
1209            } catch (Throwable ignore) {
1210            }
1211            try {
1212                s.close();
1213            } catch (Throwable ignore) {
1214            }
1215        }
1216    }
1217
1218}