001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.File; 020import java.io.IOException; 021import java.sql.Connection; 022import java.sql.SQLException; 023import java.util.Collections; 024import java.util.Locale; 025import java.util.Set; 026import java.util.concurrent.ScheduledFuture; 027import java.util.concurrent.ScheduledThreadPoolExecutor; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.TimeUnit; 030 031import javax.sql.DataSource; 032 033import org.apache.activemq.ActiveMQMessageAudit; 034import org.apache.activemq.broker.BrokerService; 035import org.apache.activemq.broker.ConnectionContext; 036import org.apache.activemq.broker.Locker; 037import org.apache.activemq.broker.scheduler.JobSchedulerStore; 038import org.apache.activemq.command.ActiveMQDestination; 039import org.apache.activemq.command.ActiveMQQueue; 040import org.apache.activemq.command.ActiveMQTopic; 041import org.apache.activemq.command.Message; 042import org.apache.activemq.command.MessageAck; 043import org.apache.activemq.command.MessageId; 044import org.apache.activemq.command.ProducerId; 045import org.apache.activemq.openwire.OpenWireFormat; 046import org.apache.activemq.store.MessageStore; 047import org.apache.activemq.store.PersistenceAdapter; 048import org.apache.activemq.store.TopicMessageStore; 049import org.apache.activemq.store.TransactionStore; 050import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; 051import org.apache.activemq.store.memory.MemoryTransactionStore; 052import org.apache.activemq.usage.SystemUsage; 053import org.apache.activemq.util.ByteSequence; 054import org.apache.activemq.util.FactoryFinder; 055import org.apache.activemq.util.IOExceptionSupport; 056import org.apache.activemq.util.LongSequenceGenerator; 057import org.apache.activemq.util.ServiceStopper; 058import org.apache.activemq.util.ThreadPoolUtils; 059import org.apache.activemq.wireformat.WireFormat; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * A {@link PersistenceAdapter} implementation using JDBC for persistence 065 * storage. 066 * 067 * This persistence adapter will correctly remember prepared XA transactions, 068 * but it will not keep track of local transaction commits so that operations 069 * performed against the Message store are done as a single uow. 070 * 071 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter" 072 * 073 */ 074public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter { 075 076 private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class); 077 private static FactoryFinder adapterFactoryFinder = new FactoryFinder( 078 "META-INF/services/org/apache/activemq/store/jdbc/"); 079 private static FactoryFinder lockFactoryFinder = new FactoryFinder( 080 "META-INF/services/org/apache/activemq/store/jdbc/lock/"); 081 082 public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000; 083 084 private WireFormat wireFormat = new OpenWireFormat(); 085 private Statements statements; 086 private JDBCAdapter adapter; 087 private MemoryTransactionStore transactionStore; 088 private ScheduledFuture<?> cleanupTicket; 089 private int cleanupPeriod = 1000 * 60 * 5; 090 private boolean useExternalMessageReferences; 091 private boolean createTablesOnStartup = true; 092 private DataSource lockDataSource; 093 private int transactionIsolation; 094 private File directory; 095 private boolean changeAutoCommitAllowed = true; 096 097 protected int maxProducersToAudit=1024; 098 protected int maxAuditDepth=1000; 099 protected boolean enableAudit=false; 100 protected int auditRecoveryDepth = 1024; 101 protected ActiveMQMessageAudit audit; 102 103 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 104 protected int maxRows = DefaultJDBCAdapter.MAX_ROWS; 105 106 { 107 setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD); 108 } 109 110 public JDBCPersistenceAdapter() { 111 } 112 113 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) { 114 super(ds); 115 this.wireFormat = wireFormat; 116 } 117 118 @Override 119 public Set<ActiveMQDestination> getDestinations() { 120 TransactionContext c = null; 121 try { 122 c = getTransactionContext(); 123 return getAdapter().doGetDestinations(c); 124 } catch (IOException e) { 125 return emptyDestinationSet(); 126 } catch (SQLException e) { 127 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 128 return emptyDestinationSet(); 129 } finally { 130 if (c != null) { 131 try { 132 c.close(); 133 } catch (Throwable e) { 134 } 135 } 136 } 137 } 138 139 @SuppressWarnings("unchecked") 140 private Set<ActiveMQDestination> emptyDestinationSet() { 141 return Collections.EMPTY_SET; 142 } 143 144 protected void createMessageAudit() { 145 if (enableAudit && audit == null) { 146 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 147 TransactionContext c = null; 148 149 try { 150 c = getTransactionContext(); 151 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { 152 @Override 153 public void messageId(MessageId id) { 154 audit.isDuplicate(id); 155 } 156 }); 157 } catch (Exception e) { 158 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); 159 } finally { 160 if (c != null) { 161 try { 162 c.close(); 163 } catch (Throwable e) { 164 } 165 } 166 } 167 } 168 } 169 170 public void initSequenceIdGenerator() { 171 TransactionContext c = null; 172 try { 173 c = getTransactionContext(); 174 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { 175 @Override 176 public void messageId(MessageId id) { 177 audit.isDuplicate(id); 178 } 179 }); 180 } catch (Exception e) { 181 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); 182 } finally { 183 if (c != null) { 184 try { 185 c.close(); 186 } catch (Throwable e) { 187 } 188 } 189 } 190 } 191 192 @Override 193 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 194 MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit); 195 if (transactionStore != null) { 196 rc = transactionStore.proxy(rc); 197 } 198 return rc; 199 } 200 201 @Override 202 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 203 TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit); 204 if (transactionStore != null) { 205 rc = transactionStore.proxy(rc); 206 } 207 return rc; 208 } 209 210 /** 211 * Cleanup method to remove any state associated with the given destination 212 * @param destination Destination to forget 213 */ 214 @Override 215 public void removeQueueMessageStore(ActiveMQQueue destination) { 216 if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) { 217 try { 218 removeConsumerDestination(destination); 219 } catch (IOException ioe) { 220 LOG.error("Failed to remove consumer destination: " + destination, ioe); 221 } 222 } 223 } 224 225 private void removeConsumerDestination(ActiveMQQueue destination) throws IOException { 226 TransactionContext c = getTransactionContext(); 227 try { 228 String id = destination.getQualifiedName(); 229 getAdapter().doDeleteSubscription(c, destination, id, id); 230 } catch (SQLException e) { 231 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 232 throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e); 233 } finally { 234 c.close(); 235 } 236 } 237 238 /** 239 * Cleanup method to remove any state associated with the given destination 240 * No state retained.... nothing to do 241 * 242 * @param destination Destination to forget 243 */ 244 @Override 245 public void removeTopicMessageStore(ActiveMQTopic destination) { 246 } 247 248 @Override 249 public TransactionStore createTransactionStore() throws IOException { 250 if (transactionStore == null) { 251 transactionStore = new JdbcMemoryTransactionStore(this); 252 } 253 return this.transactionStore; 254 } 255 256 @Override 257 public long getLastMessageBrokerSequenceId() throws IOException { 258 TransactionContext c = getTransactionContext(); 259 try { 260 long seq = getAdapter().doGetLastMessageStoreSequenceId(c); 261 sequenceGenerator.setLastSequenceId(seq); 262 long brokerSeq = 0; 263 if (seq != 0) { 264 byte[] msg = getAdapter().doGetMessageById(c, seq); 265 if (msg != null) { 266 Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg)); 267 brokerSeq = last.getMessageId().getBrokerSequenceId(); 268 } else { 269 LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!"); 270 } 271 } 272 return brokerSeq; 273 } catch (SQLException e) { 274 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 275 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); 276 } finally { 277 c.close(); 278 } 279 } 280 281 @Override 282 public long getLastProducerSequenceId(ProducerId id) throws IOException { 283 TransactionContext c = getTransactionContext(); 284 try { 285 return getAdapter().doGetLastProducerSequenceId(c, id); 286 } catch (SQLException e) { 287 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 288 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); 289 } finally { 290 c.close(); 291 } 292 } 293 294 @Override 295 public void init() throws Exception { 296 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 297 298 if (isCreateTablesOnStartup()) { 299 TransactionContext transactionContext = getTransactionContext(); 300 transactionContext.begin(); 301 try { 302 try { 303 getAdapter().doCreateTables(transactionContext); 304 } catch (SQLException e) { 305 LOG.warn("Cannot create tables due to: " + e); 306 JDBCPersistenceAdapter.log("Failure Details: ", e); 307 } 308 } finally { 309 transactionContext.commit(); 310 } 311 } 312 } 313 314 @Override 315 public void doStart() throws Exception { 316 317 if( brokerService!=null ) { 318 wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); 319 } 320 321 // Cleanup the db periodically. 322 if (cleanupPeriod > 0) { 323 cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() { 324 @Override 325 public void run() { 326 cleanup(); 327 } 328 }, 0, cleanupPeriod, TimeUnit.MILLISECONDS); 329 } 330 createMessageAudit(); 331 } 332 333 @Override 334 public synchronized void doStop(ServiceStopper stopper) throws Exception { 335 if (cleanupTicket != null) { 336 cleanupTicket.cancel(true); 337 cleanupTicket = null; 338 } 339 closeDataSource(getDataSource()); 340 } 341 342 public void cleanup() { 343 TransactionContext c = null; 344 try { 345 LOG.debug("Cleaning up old messages."); 346 c = getTransactionContext(); 347 getAdapter().doDeleteOldMessages(c); 348 } catch (IOException e) { 349 LOG.warn("Old message cleanup failed due to: " + e, e); 350 } catch (SQLException e) { 351 LOG.warn("Old message cleanup failed due to: " + e); 352 JDBCPersistenceAdapter.log("Failure Details: ", e); 353 } finally { 354 if (c != null) { 355 try { 356 c.close(); 357 } catch (Throwable e) { 358 } 359 } 360 LOG.debug("Cleanup done."); 361 } 362 } 363 364 @Override 365 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() { 366 if (clockDaemon == null) { 367 clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { 368 @Override 369 public Thread newThread(Runnable runnable) { 370 Thread thread = new Thread(runnable, "ActiveMQ JDBC PA Scheduled Task"); 371 thread.setDaemon(true); 372 return thread; 373 } 374 }); 375 } 376 return clockDaemon; 377 } 378 379 public JDBCAdapter getAdapter() throws IOException { 380 if (adapter == null) { 381 setAdapter(createAdapter()); 382 } 383 return adapter; 384 } 385 386 /** 387 * @deprecated as of 5.7.0, replaced by {@link #getLocker()} 388 */ 389 @Deprecated 390 public Locker getDatabaseLocker() throws IOException { 391 return getLocker(); 392 } 393 394 /** 395 * Sets the database locker strategy to use to lock the database on startup 396 * @throws IOException 397 * 398 * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)} 399 */ 400 @Deprecated 401 public void setDatabaseLocker(Locker locker) throws IOException { 402 setLocker(locker); 403 } 404 405 public DataSource getLockDataSource() throws IOException { 406 if (lockDataSource == null) { 407 lockDataSource = getDataSource(); 408 if (lockDataSource == null) { 409 throw new IllegalArgumentException( 410 "No dataSource property has been configured"); 411 } 412 } 413 return lockDataSource; 414 } 415 416 public void setLockDataSource(DataSource dataSource) { 417 this.lockDataSource = dataSource; 418 LOG.info("Using a separate dataSource for locking: " 419 + lockDataSource); 420 } 421 422 @Override 423 public BrokerService getBrokerService() { 424 return brokerService; 425 } 426 427 /** 428 * @throws IOException 429 */ 430 protected JDBCAdapter createAdapter() throws IOException { 431 432 adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter"); 433 434 // Use the default JDBC adapter if the 435 // Database type is not recognized. 436 if (adapter == null) { 437 adapter = new DefaultJDBCAdapter(); 438 LOG.debug("Using default JDBC Adapter: " + adapter); 439 } 440 return adapter; 441 } 442 443 private Object loadAdapter(FactoryFinder finder, String kind) throws IOException { 444 Object adapter = null; 445 TransactionContext c = getTransactionContext(); 446 try { 447 try { 448 // Make the filename file system safe. 449 String dirverName = c.getConnection().getMetaData().getDriverName(); 450 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(Locale.ENGLISH); 451 452 try { 453 adapter = finder.newInstance(dirverName); 454 LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass()); 455 } catch (Throwable e) { 456 LOG.info("Database " + kind + " driver override not found for : [" + dirverName 457 + "]. Will use default implementation."); 458 } 459 } catch (SQLException e) { 460 LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: " 461 + e.getMessage()); 462 JDBCPersistenceAdapter.log("Failure Details: ", e); 463 } 464 } finally { 465 c.close(); 466 } 467 return adapter; 468 } 469 470 public void setAdapter(JDBCAdapter adapter) { 471 this.adapter = adapter; 472 this.adapter.setStatements(getStatements()); 473 this.adapter.setMaxRows(getMaxRows()); 474 } 475 476 public WireFormat getWireFormat() { 477 return wireFormat; 478 } 479 480 public void setWireFormat(WireFormat wireFormat) { 481 this.wireFormat = wireFormat; 482 } 483 484 public TransactionContext getTransactionContext(ConnectionContext context) throws IOException { 485 if (context == null) { 486 return getTransactionContext(); 487 } else { 488 TransactionContext answer = (TransactionContext)context.getLongTermStoreContext(); 489 if (answer == null) { 490 answer = getTransactionContext(); 491 context.setLongTermStoreContext(answer); 492 } 493 return answer; 494 } 495 } 496 497 public TransactionContext getTransactionContext() throws IOException { 498 TransactionContext answer = new TransactionContext(this); 499 if (transactionIsolation > 0) { 500 answer.setTransactionIsolation(transactionIsolation); 501 } 502 return answer; 503 } 504 505 @Override 506 public void beginTransaction(ConnectionContext context) throws IOException { 507 TransactionContext transactionContext = getTransactionContext(context); 508 transactionContext.begin(); 509 } 510 511 @Override 512 public void commitTransaction(ConnectionContext context) throws IOException { 513 TransactionContext transactionContext = getTransactionContext(context); 514 transactionContext.commit(); 515 } 516 517 @Override 518 public void rollbackTransaction(ConnectionContext context) throws IOException { 519 TransactionContext transactionContext = getTransactionContext(context); 520 transactionContext.rollback(); 521 } 522 523 public int getCleanupPeriod() { 524 return cleanupPeriod; 525 } 526 527 /** 528 * Sets the number of milliseconds until the database is attempted to be 529 * cleaned up for durable topics 530 */ 531 public void setCleanupPeriod(int cleanupPeriod) { 532 this.cleanupPeriod = cleanupPeriod; 533 } 534 535 public boolean isChangeAutoCommitAllowed() { 536 return changeAutoCommitAllowed; 537 } 538 539 /** 540 * Whether the JDBC driver allows to set the auto commit. 541 * Some drivers does not allow changing the auto commit. The default value is true. 542 * 543 * @param changeAutoCommitAllowed true to change, false to not change. 544 */ 545 public void setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed) { 546 this.changeAutoCommitAllowed = changeAutoCommitAllowed; 547 } 548 549 @Override 550 public void deleteAllMessages() throws IOException { 551 TransactionContext c = getTransactionContext(); 552 try { 553 getAdapter().doDropTables(c); 554 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 555 getAdapter().doCreateTables(c); 556 LOG.info("Persistence store purged."); 557 } catch (SQLException e) { 558 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 559 throw IOExceptionSupport.create(e); 560 } finally { 561 c.close(); 562 } 563 } 564 565 public boolean isUseExternalMessageReferences() { 566 return useExternalMessageReferences; 567 } 568 569 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 570 this.useExternalMessageReferences = useExternalMessageReferences; 571 } 572 573 public boolean isCreateTablesOnStartup() { 574 return createTablesOnStartup; 575 } 576 577 /** 578 * Sets whether or not tables are created on startup 579 */ 580 public void setCreateTablesOnStartup(boolean createTablesOnStartup) { 581 this.createTablesOnStartup = createTablesOnStartup; 582 } 583 584 /** 585 * @deprecated use {@link #setUseLock(boolean)} instead 586 * 587 * Sets whether or not an exclusive database lock should be used to enable 588 * JDBC Master/Slave. Enabled by default. 589 */ 590 @Deprecated 591 public void setUseDatabaseLock(boolean useDatabaseLock) { 592 setUseLock(useDatabaseLock); 593 } 594 595 public static void log(String msg, SQLException e) { 596 String s = msg + e.getMessage(); 597 while (e.getNextException() != null) { 598 e = e.getNextException(); 599 s += ", due to: " + e.getMessage(); 600 } 601 LOG.warn(s, e); 602 } 603 604 public Statements getStatements() { 605 if (statements == null) { 606 statements = new Statements(); 607 } 608 return statements; 609 } 610 611 public void setStatements(Statements statements) { 612 this.statements = statements; 613 if (adapter != null) { 614 this.adapter.setStatements(getStatements()); 615 } 616 } 617 618 /** 619 * @param usageManager The UsageManager that is controlling the 620 * destination's memory usage. 621 */ 622 @Override 623 public void setUsageManager(SystemUsage usageManager) { 624 } 625 626 @Override 627 public Locker createDefaultLocker() throws IOException { 628 Locker locker = (Locker) loadAdapter(lockFactoryFinder, "lock"); 629 if (locker == null) { 630 locker = new DefaultDatabaseLocker(); 631 LOG.debug("Using default JDBC Locker: " + locker); 632 } 633 locker.configure(this); 634 return locker; 635 } 636 637 @Override 638 public void setBrokerName(String brokerName) { 639 } 640 641 @Override 642 public String toString() { 643 return "JDBCPersistenceAdapter(" + super.toString() + ")"; 644 } 645 646 @Override 647 public void setDirectory(File dir) { 648 this.directory=dir; 649 } 650 651 @Override 652 public File getDirectory(){ 653 if (this.directory==null && brokerService != null){ 654 this.directory=brokerService.getBrokerDataDirectory(); 655 } 656 return this.directory; 657 } 658 659 // interesting bit here is proof that DB is ok 660 @Override 661 public void checkpoint(boolean sync) throws IOException { 662 // by pass TransactionContext to avoid IO Exception handler 663 Connection connection = null; 664 try { 665 connection = getDataSource().getConnection(); 666 if (!connection.isValid(10)) { 667 throw new IOException("isValid(10) failed for: " + connection); 668 } 669 } catch (SQLException e) { 670 LOG.debug("Could not get JDBC connection for checkpoint: " + e); 671 throw IOExceptionSupport.create(e); 672 } finally { 673 if (connection != null) { 674 try { 675 connection.close(); 676 } catch (Throwable ignored) { 677 } 678 } 679 } 680 } 681 682 @Override 683 public long size(){ 684 return 0; 685 } 686 687 /** 688 * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead 689 * 690 * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker 691 * not applied if DataBaseLocker is injected. 692 * 693 */ 694 @Deprecated 695 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException { 696 getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval); 697 } 698 699 /** 700 * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED 701 * This allowable dirty isolation level may not be achievable in clustered DB environments 702 * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ 703 * see isolation level constants in {@link java.sql.Connection} 704 * @param transactionIsolation the isolation level to use 705 */ 706 public void setTransactionIsolation(int transactionIsolation) { 707 this.transactionIsolation = transactionIsolation; 708 } 709 710 public int getMaxProducersToAudit() { 711 return maxProducersToAudit; 712 } 713 714 public void setMaxProducersToAudit(int maxProducersToAudit) { 715 this.maxProducersToAudit = maxProducersToAudit; 716 } 717 718 public int getMaxAuditDepth() { 719 return maxAuditDepth; 720 } 721 722 public void setMaxAuditDepth(int maxAuditDepth) { 723 this.maxAuditDepth = maxAuditDepth; 724 } 725 726 public boolean isEnableAudit() { 727 return enableAudit; 728 } 729 730 public void setEnableAudit(boolean enableAudit) { 731 this.enableAudit = enableAudit; 732 } 733 734 public int getAuditRecoveryDepth() { 735 return auditRecoveryDepth; 736 } 737 738 public void setAuditRecoveryDepth(int auditRecoveryDepth) { 739 this.auditRecoveryDepth = auditRecoveryDepth; 740 } 741 742 public long getNextSequenceId() { 743 return sequenceGenerator.getNextSequenceId(); 744 } 745 746 public int getMaxRows() { 747 return maxRows; 748 } 749 750 /* 751 * the max rows return from queries, with sparse selectors this may need to be increased 752 */ 753 public void setMaxRows(int maxRows) { 754 this.maxRows = maxRows; 755 } 756 757 public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException { 758 TransactionContext c = getTransactionContext(); 759 try { 760 getAdapter().doRecoverPreparedOps(c, jdbcMemoryTransactionStore); 761 } catch (SQLException e) { 762 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 763 throw IOExceptionSupport.create("Failed to recover from: " + jdbcMemoryTransactionStore + ". Reason: " + e,e); 764 } finally { 765 c.close(); 766 } 767 } 768 769 public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId) throws IOException { 770 TransactionContext c = getTransactionContext(context); 771 try { 772 long sequence = (Long)messageId.getEntryLocator(); 773 getAdapter().doCommitAddOp(c, preparedSequenceId, sequence); 774 } catch (SQLException e) { 775 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 776 throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e); 777 } finally { 778 c.close(); 779 } 780 } 781 782 public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException { 783 TransactionContext c = getTransactionContext(context); 784 try { 785 getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getFutureOrSequenceLong(), null); 786 } catch (SQLException e) { 787 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 788 throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e); 789 } finally { 790 c.close(); 791 } 792 } 793 794 public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException { 795 TransactionContext c = getTransactionContext(context); 796 try { 797 getAdapter().doSetLastAck(c, destination, null, clientId, subName, xidLastAck, priority); 798 } catch (SQLException e) { 799 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 800 throw IOExceptionSupport.create("Failed to commit last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e,e); 801 } finally { 802 c.close(); 803 } 804 } 805 806 public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException { 807 TransactionContext c = getTransactionContext(context); 808 try { 809 byte priority = (byte) store.getCachedStoreSequenceId(c, store.getDestination(), ack.getLastMessageId())[1]; 810 getAdapter().doClearLastAck(c, store.getDestination(), priority, clientId, subName); 811 } catch (SQLException e) { 812 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 813 throw IOExceptionSupport.create("Failed to rollback last ack: " + ack + " on " + store.getDestination() + " for " + subName + ":" + clientId + ". Reason: " + e,e); 814 } finally { 815 c.close(); 816 } 817 } 818 819 // after recovery there is no record of the original messageId for the ack 820 public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException { 821 TransactionContext c = getTransactionContext(context); 822 try { 823 getAdapter().doClearLastAck(c, destination, priority, clientId, subName); 824 } catch (SQLException e) { 825 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 826 throw IOExceptionSupport.create("Failed to rollback last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e, e); 827 } finally { 828 c.close(); 829 } 830 } 831 832 long[] getStoreSequenceIdForMessageId(ConnectionContext context, MessageId messageId, ActiveMQDestination destination) throws IOException { 833 long[] result = new long[]{-1, Byte.MAX_VALUE -1}; 834 TransactionContext c = getTransactionContext(context); 835 try { 836 result = adapter.getStoreSequenceId(c, destination, messageId); 837 } catch (SQLException e) { 838 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 839 throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e); 840 } finally { 841 c.close(); 842 } 843 return result; 844 } 845 846 @Override 847 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 848 throw new UnsupportedOperationException(); 849 } 850}