001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.File;
020import java.io.IOException;
021import java.sql.Connection;
022import java.sql.SQLException;
023import java.util.Collections;
024import java.util.Locale;
025import java.util.Set;
026import java.util.concurrent.ScheduledFuture;
027import java.util.concurrent.ScheduledThreadPoolExecutor;
028import java.util.concurrent.ThreadFactory;
029import java.util.concurrent.TimeUnit;
030
031import javax.sql.DataSource;
032
033import org.apache.activemq.ActiveMQMessageAudit;
034import org.apache.activemq.broker.BrokerService;
035import org.apache.activemq.broker.ConnectionContext;
036import org.apache.activemq.broker.Locker;
037import org.apache.activemq.broker.scheduler.JobSchedulerStore;
038import org.apache.activemq.command.ActiveMQDestination;
039import org.apache.activemq.command.ActiveMQQueue;
040import org.apache.activemq.command.ActiveMQTopic;
041import org.apache.activemq.command.Message;
042import org.apache.activemq.command.MessageAck;
043import org.apache.activemq.command.MessageId;
044import org.apache.activemq.command.ProducerId;
045import org.apache.activemq.openwire.OpenWireFormat;
046import org.apache.activemq.store.MessageStore;
047import org.apache.activemq.store.PersistenceAdapter;
048import org.apache.activemq.store.TopicMessageStore;
049import org.apache.activemq.store.TransactionStore;
050import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
051import org.apache.activemq.store.memory.MemoryTransactionStore;
052import org.apache.activemq.usage.SystemUsage;
053import org.apache.activemq.util.ByteSequence;
054import org.apache.activemq.util.FactoryFinder;
055import org.apache.activemq.util.IOExceptionSupport;
056import org.apache.activemq.util.LongSequenceGenerator;
057import org.apache.activemq.util.ServiceStopper;
058import org.apache.activemq.util.ThreadPoolUtils;
059import org.apache.activemq.wireformat.WireFormat;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * A {@link PersistenceAdapter} implementation using JDBC for persistence
065 * storage.
066 *
067 * This persistence adapter will correctly remember prepared XA transactions,
068 * but it will not keep track of local transaction commits so that operations
069 * performed against the Message store are done as a single uow.
070 *
071 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
072 *
073 */
074public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter {
075
076    private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
077    private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
078        "META-INF/services/org/apache/activemq/store/jdbc/");
079    private static FactoryFinder lockFactoryFinder = new FactoryFinder(
080        "META-INF/services/org/apache/activemq/store/jdbc/lock/");
081
082    public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000;
083
084    private WireFormat wireFormat = new OpenWireFormat();
085    private Statements statements;
086    private JDBCAdapter adapter;
087    private MemoryTransactionStore transactionStore;
088    private ScheduledFuture<?> cleanupTicket;
089    private int cleanupPeriod = 1000 * 60 * 5;
090    private boolean useExternalMessageReferences;
091    private boolean createTablesOnStartup = true;
092    private DataSource lockDataSource;
093    private int transactionIsolation;
094    private File directory;
095    private boolean changeAutoCommitAllowed = true;
096
097    protected int maxProducersToAudit=1024;
098    protected int maxAuditDepth=1000;
099    protected boolean enableAudit=false;
100    protected int auditRecoveryDepth = 1024;
101    protected ActiveMQMessageAudit audit;
102
103    protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
104    protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
105
106    {
107        setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD);
108    }
109
110    public JDBCPersistenceAdapter() {
111    }
112
113    public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
114        super(ds);
115        this.wireFormat = wireFormat;
116    }
117
118    @Override
119    public Set<ActiveMQDestination> getDestinations() {
120        TransactionContext c = null;
121        try {
122            c = getTransactionContext();
123            return getAdapter().doGetDestinations(c);
124        } catch (IOException e) {
125            return emptyDestinationSet();
126        } catch (SQLException e) {
127            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
128            return emptyDestinationSet();
129        } finally {
130            if (c != null) {
131                try {
132                    c.close();
133                } catch (Throwable e) {
134                }
135            }
136        }
137    }
138
139    @SuppressWarnings("unchecked")
140    private Set<ActiveMQDestination> emptyDestinationSet() {
141        return Collections.EMPTY_SET;
142    }
143
144    protected void createMessageAudit() {
145        if (enableAudit && audit == null) {
146            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
147            TransactionContext c = null;
148
149            try {
150                c = getTransactionContext();
151                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
152                    @Override
153                    public void messageId(MessageId id) {
154                        audit.isDuplicate(id);
155                    }
156                });
157            } catch (Exception e) {
158                LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
159            } finally {
160                if (c != null) {
161                    try {
162                        c.close();
163                    } catch (Throwable e) {
164                    }
165                }
166            }
167        }
168    }
169
170    public void initSequenceIdGenerator() {
171        TransactionContext c = null;
172        try {
173            c = getTransactionContext();
174            getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
175                @Override
176                public void messageId(MessageId id) {
177                    audit.isDuplicate(id);
178                }
179            });
180        } catch (Exception e) {
181            LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
182        } finally {
183            if (c != null) {
184                try {
185                    c.close();
186                } catch (Throwable e) {
187                }
188            }
189        }
190    }
191
192    @Override
193    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
194        MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
195        if (transactionStore != null) {
196            rc = transactionStore.proxy(rc);
197        }
198        return rc;
199    }
200
201    @Override
202    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
203        TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
204        if (transactionStore != null) {
205            rc = transactionStore.proxy(rc);
206        }
207        return rc;
208    }
209
210    /**
211     * Cleanup method to remove any state associated with the given destination
212     * @param destination Destination to forget
213     */
214    @Override
215    public void removeQueueMessageStore(ActiveMQQueue destination) {
216        if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) {
217            try {
218                removeConsumerDestination(destination);
219            } catch (IOException ioe) {
220                LOG.error("Failed to remove consumer destination: " + destination, ioe);
221            }
222        }
223    }
224
225    private void removeConsumerDestination(ActiveMQQueue destination) throws IOException {
226        TransactionContext c = getTransactionContext();
227        try {
228            String id = destination.getQualifiedName();
229            getAdapter().doDeleteSubscription(c, destination, id, id);
230        } catch (SQLException e) {
231            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
232            throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e);
233        } finally {
234            c.close();
235        }
236    }
237
238    /**
239     * Cleanup method to remove any state associated with the given destination
240     * No state retained.... nothing to do
241     *
242     * @param destination Destination to forget
243     */
244    @Override
245    public void removeTopicMessageStore(ActiveMQTopic destination) {
246    }
247
248    @Override
249    public TransactionStore createTransactionStore() throws IOException {
250        if (transactionStore == null) {
251            transactionStore = new JdbcMemoryTransactionStore(this);
252        }
253        return this.transactionStore;
254    }
255
256    @Override
257    public long getLastMessageBrokerSequenceId() throws IOException {
258        TransactionContext c = getTransactionContext();
259        try {
260            long seq =  getAdapter().doGetLastMessageStoreSequenceId(c);
261            sequenceGenerator.setLastSequenceId(seq);
262            long brokerSeq = 0;
263            if (seq != 0) {
264                byte[] msg = getAdapter().doGetMessageById(c, seq);
265                if (msg != null) {
266                    Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
267                    brokerSeq = last.getMessageId().getBrokerSequenceId();
268                } else {
269                   LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
270                }
271            }
272            return brokerSeq;
273        } catch (SQLException e) {
274            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
275            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
276        } finally {
277            c.close();
278        }
279    }
280
281    @Override
282    public long getLastProducerSequenceId(ProducerId id) throws IOException {
283        TransactionContext c = getTransactionContext();
284        try {
285            return getAdapter().doGetLastProducerSequenceId(c, id);
286        } catch (SQLException e) {
287            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
288            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
289        } finally {
290            c.close();
291        }
292    }
293
294    @Override
295    public void init() throws Exception {
296        getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
297
298        if (isCreateTablesOnStartup()) {
299            TransactionContext transactionContext = getTransactionContext();
300            transactionContext.begin();
301            try {
302                try {
303                    getAdapter().doCreateTables(transactionContext);
304                } catch (SQLException e) {
305                    LOG.warn("Cannot create tables due to: " + e);
306                    JDBCPersistenceAdapter.log("Failure Details: ", e);
307                }
308            } finally {
309                transactionContext.commit();
310            }
311        }
312    }
313
314    @Override
315    public void doStart() throws Exception {
316
317        if( brokerService!=null ) {
318          wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
319        }
320
321        // Cleanup the db periodically.
322        if (cleanupPeriod > 0) {
323            cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
324                @Override
325                public void run() {
326                    cleanup();
327                }
328            }, 0, cleanupPeriod, TimeUnit.MILLISECONDS);
329        }
330        createMessageAudit();
331    }
332
333    @Override
334    public synchronized void doStop(ServiceStopper stopper) throws Exception {
335        if (cleanupTicket != null) {
336            cleanupTicket.cancel(true);
337            cleanupTicket = null;
338        }
339        closeDataSource(getDataSource());
340    }
341
342    public void cleanup() {
343        TransactionContext c = null;
344        try {
345            LOG.debug("Cleaning up old messages.");
346            c = getTransactionContext();
347            getAdapter().doDeleteOldMessages(c);
348        } catch (IOException e) {
349            LOG.warn("Old message cleanup failed due to: " + e, e);
350        } catch (SQLException e) {
351            LOG.warn("Old message cleanup failed due to: " + e);
352            JDBCPersistenceAdapter.log("Failure Details: ", e);
353        } finally {
354            if (c != null) {
355                try {
356                    c.close();
357                } catch (Throwable e) {
358                }
359            }
360            LOG.debug("Cleanup done.");
361        }
362    }
363
364    @Override
365    public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
366        if (clockDaemon == null) {
367            clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
368                @Override
369                public Thread newThread(Runnable runnable) {
370                    Thread thread = new Thread(runnable, "ActiveMQ JDBC PA Scheduled Task");
371                    thread.setDaemon(true);
372                    return thread;
373                }
374            });
375        }
376        return clockDaemon;
377    }
378
379    public JDBCAdapter getAdapter() throws IOException {
380        if (adapter == null) {
381            setAdapter(createAdapter());
382        }
383        return adapter;
384    }
385
386    /**
387     * @deprecated as of 5.7.0, replaced by {@link #getLocker()}
388     */
389    @Deprecated
390    public Locker getDatabaseLocker() throws IOException {
391        return getLocker();
392    }
393
394    /**
395     * Sets the database locker strategy to use to lock the database on startup
396     * @throws IOException
397     *
398     * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)}
399     */
400    @Deprecated
401    public void setDatabaseLocker(Locker locker) throws IOException {
402        setLocker(locker);
403    }
404
405    public DataSource getLockDataSource() throws IOException {
406        if (lockDataSource == null) {
407            lockDataSource = getDataSource();
408            if (lockDataSource == null) {
409                throw new IllegalArgumentException(
410                        "No dataSource property has been configured");
411            }
412        }
413        return lockDataSource;
414    }
415
416    public void setLockDataSource(DataSource dataSource) {
417        this.lockDataSource = dataSource;
418        LOG.info("Using a separate dataSource for locking: "
419                            + lockDataSource);
420    }
421
422    @Override
423    public BrokerService getBrokerService() {
424        return brokerService;
425    }
426
427    /**
428     * @throws IOException
429     */
430    protected JDBCAdapter createAdapter() throws IOException {
431
432        adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
433
434        // Use the default JDBC adapter if the
435        // Database type is not recognized.
436        if (adapter == null) {
437            adapter = new DefaultJDBCAdapter();
438            LOG.debug("Using default JDBC Adapter: " + adapter);
439        }
440        return adapter;
441    }
442
443    private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
444        Object adapter = null;
445        TransactionContext c = getTransactionContext();
446        try {
447            try {
448                // Make the filename file system safe.
449                String dirverName = c.getConnection().getMetaData().getDriverName();
450                dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(Locale.ENGLISH);
451
452                try {
453                    adapter = finder.newInstance(dirverName);
454                    LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
455                } catch (Throwable e) {
456                    LOG.info("Database " + kind + " driver override not found for : [" + dirverName
457                             + "].  Will use default implementation.");
458                }
459            } catch (SQLException e) {
460                LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
461                          + e.getMessage());
462                JDBCPersistenceAdapter.log("Failure Details: ", e);
463            }
464        } finally {
465            c.close();
466        }
467        return adapter;
468    }
469
470    public void setAdapter(JDBCAdapter adapter) {
471        this.adapter = adapter;
472        this.adapter.setStatements(getStatements());
473        this.adapter.setMaxRows(getMaxRows());
474    }
475
476    public WireFormat getWireFormat() {
477        return wireFormat;
478    }
479
480    public void setWireFormat(WireFormat wireFormat) {
481        this.wireFormat = wireFormat;
482    }
483
484    public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
485        if (context == null) {
486            return getTransactionContext();
487        } else {
488            TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
489            if (answer == null) {
490                answer = getTransactionContext();
491                context.setLongTermStoreContext(answer);
492            }
493            return answer;
494        }
495    }
496
497    public TransactionContext getTransactionContext() throws IOException {
498        TransactionContext answer = new TransactionContext(this);
499        if (transactionIsolation > 0) {
500            answer.setTransactionIsolation(transactionIsolation);
501        }
502        return answer;
503    }
504
505    @Override
506    public void beginTransaction(ConnectionContext context) throws IOException {
507        TransactionContext transactionContext = getTransactionContext(context);
508        transactionContext.begin();
509    }
510
511    @Override
512    public void commitTransaction(ConnectionContext context) throws IOException {
513        TransactionContext transactionContext = getTransactionContext(context);
514        transactionContext.commit();
515    }
516
517    @Override
518    public void rollbackTransaction(ConnectionContext context) throws IOException {
519        TransactionContext transactionContext = getTransactionContext(context);
520        transactionContext.rollback();
521    }
522
523    public int getCleanupPeriod() {
524        return cleanupPeriod;
525    }
526
527    /**
528     * Sets the number of milliseconds until the database is attempted to be
529     * cleaned up for durable topics
530     */
531    public void setCleanupPeriod(int cleanupPeriod) {
532        this.cleanupPeriod = cleanupPeriod;
533    }
534
535    public boolean isChangeAutoCommitAllowed() {
536        return changeAutoCommitAllowed;
537    }
538
539    /**
540     * Whether the JDBC driver allows to set the auto commit.
541     * Some drivers does not allow changing the auto commit. The default value is true.
542     *
543     * @param changeAutoCommitAllowed true to change, false to not change.
544     */
545    public void setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed) {
546        this.changeAutoCommitAllowed = changeAutoCommitAllowed;
547    }
548
549    @Override
550    public void deleteAllMessages() throws IOException {
551        TransactionContext c = getTransactionContext();
552        try {
553            getAdapter().doDropTables(c);
554            getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
555            getAdapter().doCreateTables(c);
556            LOG.info("Persistence store purged.");
557        } catch (SQLException e) {
558            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
559            throw IOExceptionSupport.create(e);
560        } finally {
561            c.close();
562        }
563    }
564
565    public boolean isUseExternalMessageReferences() {
566        return useExternalMessageReferences;
567    }
568
569    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
570        this.useExternalMessageReferences = useExternalMessageReferences;
571    }
572
573    public boolean isCreateTablesOnStartup() {
574        return createTablesOnStartup;
575    }
576
577    /**
578     * Sets whether or not tables are created on startup
579     */
580    public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
581        this.createTablesOnStartup = createTablesOnStartup;
582    }
583
584    /**
585     * @deprecated use {@link #setUseLock(boolean)} instead
586     *
587     * Sets whether or not an exclusive database lock should be used to enable
588     * JDBC Master/Slave. Enabled by default.
589     */
590    @Deprecated
591    public void setUseDatabaseLock(boolean useDatabaseLock) {
592        setUseLock(useDatabaseLock);
593    }
594
595    public static void log(String msg, SQLException e) {
596        String s = msg + e.getMessage();
597        while (e.getNextException() != null) {
598            e = e.getNextException();
599            s += ", due to: " + e.getMessage();
600        }
601        LOG.warn(s, e);
602    }
603
604    public Statements getStatements() {
605        if (statements == null) {
606            statements = new Statements();
607        }
608        return statements;
609    }
610
611    public void setStatements(Statements statements) {
612        this.statements = statements;
613        if (adapter != null) {
614            this.adapter.setStatements(getStatements());
615        }
616    }
617
618    /**
619     * @param usageManager The UsageManager that is controlling the
620     *                destination's memory usage.
621     */
622    @Override
623    public void setUsageManager(SystemUsage usageManager) {
624    }
625
626    @Override
627    public Locker createDefaultLocker() throws IOException {
628        Locker locker = (Locker) loadAdapter(lockFactoryFinder, "lock");
629        if (locker == null) {
630            locker = new DefaultDatabaseLocker();
631            LOG.debug("Using default JDBC Locker: " + locker);
632        }
633        locker.configure(this);
634        return locker;
635    }
636
637    @Override
638    public void setBrokerName(String brokerName) {
639    }
640
641    @Override
642    public String toString() {
643        return "JDBCPersistenceAdapter(" + super.toString() + ")";
644    }
645
646    @Override
647    public void setDirectory(File dir) {
648        this.directory=dir;
649    }
650
651    @Override
652    public File getDirectory(){
653        if (this.directory==null && brokerService != null){
654            this.directory=brokerService.getBrokerDataDirectory();
655        }
656        return this.directory;
657    }
658
659    // interesting bit here is proof that DB is ok
660    @Override
661    public void checkpoint(boolean sync) throws IOException {
662        // by pass TransactionContext to avoid IO Exception handler
663        Connection connection = null;
664        try {
665            connection = getDataSource().getConnection();
666            if (!connection.isValid(10)) {
667                throw new IOException("isValid(10) failed for: " + connection);
668            }
669        } catch (SQLException e) {
670            LOG.debug("Could not get JDBC connection for checkpoint: " + e);
671            throw IOExceptionSupport.create(e);
672        } finally {
673            if (connection != null) {
674                try {
675                    connection.close();
676                } catch (Throwable ignored) {
677                }
678            }
679        }
680    }
681
682    @Override
683    public long size(){
684        return 0;
685    }
686
687    /**
688     * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
689     *
690     * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
691     * not applied if DataBaseLocker is injected.
692     *
693     */
694    @Deprecated
695    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException {
696        getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval);
697    }
698
699    /**
700     * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
701     * This allowable dirty isolation level may not be achievable in clustered DB environments
702     * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
703     * see isolation level constants in {@link java.sql.Connection}
704     * @param transactionIsolation the isolation level to use
705     */
706    public void setTransactionIsolation(int transactionIsolation) {
707        this.transactionIsolation = transactionIsolation;
708    }
709
710    public int getMaxProducersToAudit() {
711        return maxProducersToAudit;
712    }
713
714    public void setMaxProducersToAudit(int maxProducersToAudit) {
715        this.maxProducersToAudit = maxProducersToAudit;
716    }
717
718    public int getMaxAuditDepth() {
719        return maxAuditDepth;
720    }
721
722    public void setMaxAuditDepth(int maxAuditDepth) {
723        this.maxAuditDepth = maxAuditDepth;
724    }
725
726    public boolean isEnableAudit() {
727        return enableAudit;
728    }
729
730    public void setEnableAudit(boolean enableAudit) {
731        this.enableAudit = enableAudit;
732    }
733
734    public int getAuditRecoveryDepth() {
735        return auditRecoveryDepth;
736    }
737
738    public void setAuditRecoveryDepth(int auditRecoveryDepth) {
739        this.auditRecoveryDepth = auditRecoveryDepth;
740    }
741
742    public long getNextSequenceId() {
743        return sequenceGenerator.getNextSequenceId();
744    }
745
746    public int getMaxRows() {
747        return maxRows;
748    }
749
750    /*
751     * the max rows return from queries, with sparse selectors this may need to be increased
752     */
753    public void setMaxRows(int maxRows) {
754        this.maxRows = maxRows;
755    }
756
757    public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException {
758        TransactionContext c = getTransactionContext();
759        try {
760            getAdapter().doRecoverPreparedOps(c, jdbcMemoryTransactionStore);
761        } catch (SQLException e) {
762            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
763            throw IOExceptionSupport.create("Failed to recover from: " + jdbcMemoryTransactionStore + ". Reason: " + e,e);
764        } finally {
765            c.close();
766        }
767    }
768
769    public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId) throws IOException {
770        TransactionContext c = getTransactionContext(context);
771        try {
772            long sequence = (Long)messageId.getEntryLocator();
773            getAdapter().doCommitAddOp(c, preparedSequenceId, sequence);
774        } catch (SQLException e) {
775            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
776            throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);
777        } finally {
778            c.close();
779        }
780    }
781
782    public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException {
783        TransactionContext c = getTransactionContext(context);
784        try {
785            getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getFutureOrSequenceLong(), null);
786        } catch (SQLException e) {
787            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
788            throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e);
789        } finally {
790            c.close();
791        }
792    }
793
794    public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
795        TransactionContext c = getTransactionContext(context);
796        try {
797            getAdapter().doSetLastAck(c, destination, null, clientId, subName, xidLastAck, priority);
798        } catch (SQLException e) {
799            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
800            throw IOExceptionSupport.create("Failed to commit last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e,e);
801        } finally {
802            c.close();
803        }
804    }
805
806    public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException {
807        TransactionContext c = getTransactionContext(context);
808        try {
809            byte priority = (byte) store.getCachedStoreSequenceId(c, store.getDestination(), ack.getLastMessageId())[1];
810            getAdapter().doClearLastAck(c, store.getDestination(), priority, clientId, subName);
811        } catch (SQLException e) {
812            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
813            throw IOExceptionSupport.create("Failed to rollback last ack: " + ack + " on " +  store.getDestination() + " for " + subName + ":" + clientId + ". Reason: " + e,e);
814        } finally {
815            c.close();
816        }
817    }
818
819    // after recovery there is no record of the original messageId for the ack
820    public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
821        TransactionContext c = getTransactionContext(context);
822        try {
823            getAdapter().doClearLastAck(c, destination, priority, clientId, subName);
824        } catch (SQLException e) {
825            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
826            throw IOExceptionSupport.create("Failed to rollback last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e, e);
827        } finally {
828            c.close();
829        }
830    }
831
832    long[] getStoreSequenceIdForMessageId(ConnectionContext context, MessageId messageId, ActiveMQDestination destination) throws IOException {
833        long[] result = new long[]{-1, Byte.MAX_VALUE -1};
834        TransactionContext c = getTransactionContext(context);
835        try {
836            result = adapter.getStoreSequenceId(c, destination, messageId);
837        } catch (SQLException e) {
838            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
839            throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
840        } finally {
841            c.close();
842        }
843        return result;
844    }
845
846    @Override
847    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
848        throw new UnsupportedOperationException();
849    }
850}