001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc.adapter; 018 019import java.io.IOException; 020import java.sql.Connection; 021import java.sql.PreparedStatement; 022import java.sql.ResultSet; 023import java.sql.SQLException; 024import java.sql.Statement; 025import java.util.ArrayList; 026import java.util.HashSet; 027import java.util.LinkedList; 028import java.util.Set; 029import java.util.concurrent.locks.ReadWriteLock; 030import java.util.concurrent.locks.ReentrantReadWriteLock; 031 032import org.apache.activemq.command.ActiveMQDestination; 033import org.apache.activemq.command.MessageId; 034import org.apache.activemq.command.ProducerId; 035import org.apache.activemq.command.SubscriptionInfo; 036import org.apache.activemq.command.XATransactionId; 037import org.apache.activemq.store.jdbc.JDBCAdapter; 038import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; 039import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; 040import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 041import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore; 042import org.apache.activemq.store.jdbc.Statements; 043import org.apache.activemq.store.jdbc.TransactionContext; 044import org.apache.activemq.util.DataByteArrayOutputStream; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import static javax.xml.bind.DatatypeConverter.parseBase64Binary; 049import static javax.xml.bind.DatatypeConverter.printBase64Binary; 050 051/** 052 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is 053 * encouraged to override the default implementation of methods to account for differences in JDBC Driver 054 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/> 055 * The databases/JDBC drivers that use this adapter are: 056 * <ul> 057 * <li></li> 058 * </ul> 059 * 060 * @org.apache.xbean.XBean element="defaultJDBCAdapter" 061 * 062 * 063 */ 064public class DefaultJDBCAdapter implements JDBCAdapter { 065 private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class); 066 public static final int MAX_ROWS = org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE; 067 protected Statements statements; 068 private boolean batchStatements = true; 069 //This is deprecated and should be removed in a future release 070 protected boolean batchStatments = true; 071 protected boolean prioritizedMessages; 072 protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock(); 073 protected int maxRows = MAX_ROWS; 074 075 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { 076 s.setBytes(index, data); 077 } 078 079 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { 080 return rs.getBytes(index); 081 } 082 083 public void doCreateTables(TransactionContext c) throws SQLException, IOException { 084 Statement s = null; 085 cleanupExclusiveLock.writeLock().lock(); 086 try { 087 // Check to see if the table already exists. If it does, then don't 088 // log warnings during startup. 089 // Need to run the scripts anyways since they may contain ALTER 090 // statements that upgrade a previous version 091 // of the table 092 boolean alreadyExists = false; 093 ResultSet rs = null; 094 try { 095 rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(), 096 new String[] { "TABLE" }); 097 alreadyExists = rs.next(); 098 } catch (Throwable ignore) { 099 } finally { 100 close(rs); 101 } 102 s = c.getConnection().createStatement(); 103 String[] createStatments = this.statements.getCreateSchemaStatements(); 104 for (int i = 0; i < createStatments.length; i++) { 105 // This will fail usually since the tables will be 106 // created already. 107 try { 108 LOG.debug("Executing SQL: " + createStatments[i]); 109 s.execute(createStatments[i]); 110 } catch (SQLException e) { 111 if (alreadyExists) { 112 LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: " 113 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() 114 + " Vendor code: " + e.getErrorCode()); 115 } else { 116 LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: " 117 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() 118 + " Vendor code: " + e.getErrorCode()); 119 JDBCPersistenceAdapter.log("Failure details: ", e); 120 } 121 } 122 } 123 124 // if autoCommit used do not call commit 125 if(!c.getConnection().getAutoCommit()){ 126 c.getConnection().commit(); 127 } 128 129 } finally { 130 cleanupExclusiveLock.writeLock().unlock(); 131 try { 132 s.close(); 133 } catch (Throwable e) { 134 } 135 } 136 } 137 138 public void doDropTables(TransactionContext c) throws SQLException, IOException { 139 Statement s = null; 140 cleanupExclusiveLock.writeLock().lock(); 141 try { 142 s = c.getConnection().createStatement(); 143 String[] dropStatments = this.statements.getDropSchemaStatements(); 144 for (int i = 0; i < dropStatments.length; i++) { 145 // This will fail usually since the tables will be 146 // created already. 147 try { 148 LOG.debug("Executing SQL: " + dropStatments[i]); 149 s.execute(dropStatments[i]); 150 } catch (SQLException e) { 151 LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i] 152 + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: " 153 + e.getErrorCode()); 154 JDBCPersistenceAdapter.log("Failure details: ", e); 155 } 156 } 157 // if autoCommit used do not call commit 158 if(!c.getConnection().getAutoCommit()){ 159 c.getConnection().commit(); 160 } 161 } finally { 162 cleanupExclusiveLock.writeLock().unlock(); 163 try { 164 s.close(); 165 } catch (Throwable e) { 166 } 167 } 168 } 169 170 public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException { 171 PreparedStatement s = null; 172 ResultSet rs = null; 173 cleanupExclusiveLock.readLock().lock(); 174 try { 175 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); 176 rs = s.executeQuery(); 177 long seq1 = 0; 178 if (rs.next()) { 179 seq1 = rs.getLong(1); 180 } 181 rs.close(); 182 s.close(); 183 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement()); 184 rs = s.executeQuery(); 185 long seq2 = 0; 186 if (rs.next()) { 187 seq2 = rs.getLong(1); 188 } 189 long seq = Math.max(seq1, seq2); 190 return seq; 191 } finally { 192 cleanupExclusiveLock.readLock().unlock(); 193 close(rs); 194 close(s); 195 } 196 } 197 198 public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException { 199 PreparedStatement s = null; 200 ResultSet rs = null; 201 cleanupExclusiveLock.readLock().lock(); 202 try { 203 s = c.getConnection().prepareStatement( 204 this.statements.getFindMessageByIdStatement()); 205 s.setLong(1, storeSequenceId); 206 rs = s.executeQuery(); 207 if (!rs.next()) { 208 return null; 209 } 210 return getBinaryData(rs, 1); 211 } finally { 212 cleanupExclusiveLock.readLock().unlock(); 213 close(rs); 214 close(s); 215 } 216 } 217 218 219 /** 220 * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome 221 */ 222 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, 223 long expiration, byte priority, XATransactionId xid) throws SQLException, IOException { 224 PreparedStatement s = c.getAddMessageStatement(); 225 cleanupExclusiveLock.readLock().lock(); 226 try { 227 if (s == null) { 228 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); 229 if (this.batchStatements) { 230 c.setAddMessageStatement(s); 231 } 232 } 233 s.setLong(1, sequence); 234 s.setString(2, messageID.getProducerId().toString()); 235 s.setLong(3, messageID.getProducerSequenceId()); 236 s.setString(4, destination.getQualifiedName()); 237 s.setLong(5, expiration); 238 s.setLong(6, priority); 239 setBinaryData(s, 7, data); 240 if (xid != null) { 241 byte[] xidVal = xid.getEncodedXidBytes(); 242 xidVal[0] = '+'; 243 String xidString = printBase64Binary(xidVal); 244 s.setString(8, xidString); 245 } else { 246 s.setString(8, null); 247 } 248 if (this.batchStatements) { 249 s.addBatch(); 250 } else if (s.executeUpdate() != 1) { 251 throw new SQLException("Failed add a message"); 252 } 253 } finally { 254 cleanupExclusiveLock.readLock().unlock(); 255 if (!this.batchStatements) { 256 if (s != null) { 257 s.close(); 258 } 259 } 260 } 261 } 262 263 @Override 264 public void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException { 265 PreparedStatement s = null; 266 cleanupExclusiveLock.readLock().lock(); 267 try { 268 s = c.getConnection().prepareStatement(this.statements.getUpdateMessageStatement()); 269 setBinaryData(s, 1, data); 270 s.setString(2, id.getProducerId().toString()); 271 s.setLong(3, id.getProducerSequenceId()); 272 s.setString(4, destination.getQualifiedName()); 273 if (s.executeUpdate() != 1) { 274 throw new IOException("Could not update message: " + id + " in " + destination); 275 } 276 } finally { 277 cleanupExclusiveLock.readLock().unlock(); 278 close(s); 279 } 280 } 281 282 283 public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, 284 long expirationTime, String messageRef) throws SQLException, IOException { 285 PreparedStatement s = c.getAddMessageStatement(); 286 cleanupExclusiveLock.readLock().lock(); 287 try { 288 if (s == null) { 289 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); 290 if (this.batchStatements) { 291 c.setAddMessageStatement(s); 292 } 293 } 294 s.setLong(1, messageID.getBrokerSequenceId()); 295 s.setString(2, messageID.getProducerId().toString()); 296 s.setLong(3, messageID.getProducerSequenceId()); 297 s.setString(4, destination.getQualifiedName()); 298 s.setLong(5, expirationTime); 299 s.setString(6, messageRef); 300 if (this.batchStatements) { 301 s.addBatch(); 302 } else if (s.executeUpdate() != 1) { 303 throw new SQLException("Failed add a message"); 304 } 305 } finally { 306 cleanupExclusiveLock.readLock().unlock(); 307 if (!this.batchStatements) { 308 s.close(); 309 } 310 } 311 } 312 313 public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException { 314 PreparedStatement s = null; 315 ResultSet rs = null; 316 cleanupExclusiveLock.readLock().lock(); 317 try { 318 s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement()); 319 s.setString(1, messageID.getProducerId().toString()); 320 s.setLong(2, messageID.getProducerSequenceId()); 321 s.setString(3, destination.getQualifiedName()); 322 rs = s.executeQuery(); 323 if (!rs.next()) { 324 return new long[]{0,0}; 325 } 326 return new long[]{rs.getLong(1), rs.getLong(2)}; 327 } finally { 328 cleanupExclusiveLock.readLock().unlock(); 329 close(rs); 330 close(s); 331 } 332 } 333 334 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException { 335 PreparedStatement s = null; 336 ResultSet rs = null; 337 cleanupExclusiveLock.readLock().lock(); 338 try { 339 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); 340 s.setString(1, id.getProducerId().toString()); 341 s.setLong(2, id.getProducerSequenceId()); 342 rs = s.executeQuery(); 343 if (!rs.next()) { 344 return null; 345 } 346 return getBinaryData(rs, 1); 347 } finally { 348 cleanupExclusiveLock.readLock().unlock(); 349 close(rs); 350 close(s); 351 } 352 } 353 354 public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException { 355 PreparedStatement s = null; 356 ResultSet rs = null; 357 cleanupExclusiveLock.readLock().lock(); 358 try { 359 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); 360 s.setLong(1, seq); 361 rs = s.executeQuery(); 362 if (!rs.next()) { 363 return null; 364 } 365 return rs.getString(1); 366 } finally { 367 cleanupExclusiveLock.readLock().unlock(); 368 close(rs); 369 close(s); 370 } 371 } 372 373 /** 374 * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome 375 */ 376 public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException { 377 PreparedStatement s = c.getRemovedMessageStatement(); 378 cleanupExclusiveLock.readLock().lock(); 379 try { 380 if (s == null) { 381 s = c.getConnection().prepareStatement(xid == null ? 382 this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement()); 383 if (this.batchStatements) { 384 c.setRemovedMessageStatement(s); 385 } 386 } 387 if (xid == null) { 388 s.setLong(1, seq); 389 } else { 390 byte[] xidVal = xid.getEncodedXidBytes(); 391 xidVal[0] = '-'; 392 String xidString = printBase64Binary(xidVal); 393 s.setString(1, xidString); 394 s.setLong(2, seq); 395 } 396 if (this.batchStatements) { 397 s.addBatch(); 398 } else if (s.executeUpdate() != 1) { 399 throw new SQLException("Failed to remove message seq: " + seq); 400 } 401 } finally { 402 cleanupExclusiveLock.readLock().unlock(); 403 if (!this.batchStatements && s != null) { 404 s.close(); 405 } 406 } 407 } 408 409 public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) 410 throws Exception { 411 PreparedStatement s = null; 412 ResultSet rs = null; 413 cleanupExclusiveLock.readLock().lock(); 414 try { 415 s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement()); 416 s.setString(1, destination.getQualifiedName()); 417 rs = s.executeQuery(); 418 if (this.statements.isUseExternalMessageReferences()) { 419 while (rs.next()) { 420 if (!listener.recoverMessageReference(rs.getString(2))) { 421 break; 422 } 423 } 424 } else { 425 while (rs.next()) { 426 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 427 break; 428 } 429 } 430 } 431 } finally { 432 cleanupExclusiveLock.readLock().unlock(); 433 close(rs); 434 close(s); 435 } 436 } 437 438 public void doMessageIdScan(TransactionContext c, int limit, 439 JDBCMessageIdScanListener listener) throws SQLException, IOException { 440 PreparedStatement s = null; 441 ResultSet rs = null; 442 cleanupExclusiveLock.readLock().lock(); 443 try { 444 s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement()); 445 s.setMaxRows(limit); 446 rs = s.executeQuery(); 447 // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid 448 LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>(); 449 while (rs.next()) { 450 reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3))); 451 } 452 if (LOG.isDebugEnabled()) { 453 LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids"); 454 } 455 for (MessageId id : reverseOrderIds) { 456 listener.messageId(id); 457 } 458 } finally { 459 cleanupExclusiveLock.readLock().unlock(); 460 close(rs); 461 close(s); 462 } 463 } 464 465 public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId, 466 String subscriptionName, long seq, long priority) throws SQLException, IOException { 467 PreparedStatement s = c.getUpdateLastAckStatement(); 468 cleanupExclusiveLock.readLock().lock(); 469 try { 470 if (s == null) { 471 s = c.getConnection().prepareStatement(xid == null ? 472 this.statements.getUpdateDurableLastAckWithPriorityStatement() : 473 this.statements.getUpdateDurableLastAckWithPriorityInTxStatement()); 474 if (this.batchStatements) { 475 c.setUpdateLastAckStatement(s); 476 } 477 } 478 if (xid != null) { 479 byte[] xidVal = encodeXid(xid, seq, priority); 480 String xidString = printBase64Binary(xidVal); 481 s.setString(1, xidString); 482 } else { 483 s.setLong(1, seq); 484 } 485 s.setString(2, destination.getQualifiedName()); 486 s.setString(3, clientId); 487 s.setString(4, subscriptionName); 488 s.setLong(5, priority); 489 if (this.batchStatements) { 490 s.addBatch(); 491 } else if (s.executeUpdate() != 1) { 492 throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName); 493 } 494 } finally { 495 cleanupExclusiveLock.readLock().unlock(); 496 if (!this.batchStatements) { 497 close(s); 498 } 499 } 500 } 501 502 503 public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId, 504 String subscriptionName, long seq, long priority) throws SQLException, IOException { 505 PreparedStatement s = c.getUpdateLastAckStatement(); 506 cleanupExclusiveLock.readLock().lock(); 507 try { 508 if (s == null) { 509 s = c.getConnection().prepareStatement(xid == null ? 510 this.statements.getUpdateDurableLastAckStatement() : 511 this.statements.getUpdateDurableLastAckInTxStatement()); 512 if (this.batchStatements) { 513 c.setUpdateLastAckStatement(s); 514 } 515 } 516 if (xid != null) { 517 byte[] xidVal = encodeXid(xid, seq, priority); 518 String xidString = printBase64Binary(xidVal); 519 s.setString(1, xidString); 520 } else { 521 s.setLong(1, seq); 522 } 523 s.setString(2, destination.getQualifiedName()); 524 s.setString(3, clientId); 525 s.setString(4, subscriptionName); 526 527 if (this.batchStatements) { 528 s.addBatch(); 529 } else if (s.executeUpdate() != 1) { 530 throw new IOException("Could not update last ack seq : " 531 + seq + ", for sub: " + subscriptionName); 532 } 533 } finally { 534 cleanupExclusiveLock.readLock().unlock(); 535 if (!this.batchStatements) { 536 close(s); 537 } 538 } 539 } 540 541 private byte[] encodeXid(XATransactionId xid, long seq, long priority) { 542 byte[] xidVal = xid.getEncodedXidBytes(); 543 // encode the update 544 DataByteArrayOutputStream outputStream = xid.internalOutputStream(); 545 outputStream.position(1); 546 outputStream.writeLong(seq); 547 outputStream.writeByte(Long.valueOf(priority).byteValue()); 548 return xidVal; 549 } 550 551 @Override 552 public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException { 553 PreparedStatement s = null; 554 cleanupExclusiveLock.readLock().lock(); 555 try { 556 s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement()); 557 s.setString(1, destination.getQualifiedName()); 558 s.setString(2, clientId); 559 s.setString(3, subName); 560 s.setLong(4, priority); 561 if (s.executeUpdate() != 1) { 562 throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName); 563 } 564 } finally { 565 cleanupExclusiveLock.readLock().unlock(); 566 close(s); 567 } 568 } 569 570 public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, 571 String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception { 572 // dumpTables(c, 573 // destination.getQualifiedName(),clientId,subscriptionName); 574 PreparedStatement s = null; 575 ResultSet rs = null; 576 cleanupExclusiveLock.readLock().lock(); 577 try { 578 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement()); 579 s.setString(1, destination.getQualifiedName()); 580 s.setString(2, clientId); 581 s.setString(3, subscriptionName); 582 rs = s.executeQuery(); 583 if (this.statements.isUseExternalMessageReferences()) { 584 while (rs.next()) { 585 if (!listener.recoverMessageReference(rs.getString(2))) { 586 break; 587 } 588 } 589 } else { 590 while (rs.next()) { 591 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 592 break; 593 } 594 } 595 } 596 } finally { 597 cleanupExclusiveLock.readLock().unlock(); 598 close(rs); 599 close(s); 600 } 601 } 602 603 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, 604 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { 605 606 PreparedStatement s = null; 607 ResultSet rs = null; 608 cleanupExclusiveLock.readLock().lock(); 609 try { 610 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement()); 611 s.setMaxRows(Math.min(maxReturned * 2, maxRows)); 612 s.setString(1, destination.getQualifiedName()); 613 s.setString(2, clientId); 614 s.setString(3, subscriptionName); 615 s.setLong(4, seq); 616 rs = s.executeQuery(); 617 int count = 0; 618 if (this.statements.isUseExternalMessageReferences()) { 619 while (rs.next() && count < maxReturned) { 620 if (listener.recoverMessageReference(rs.getString(1))) { 621 count++; 622 } 623 } 624 } else { 625 while (rs.next() && count < maxReturned) { 626 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 627 count++; 628 } 629 } 630 } 631 } finally { 632 cleanupExclusiveLock.readLock().unlock(); 633 close(rs); 634 close(s); 635 } 636 } 637 638 public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, 639 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { 640 641 PreparedStatement s = null; 642 ResultSet rs = null; 643 cleanupExclusiveLock.readLock().lock(); 644 try { 645 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement()); 646 s.setMaxRows(Math.min(maxReturned * 2, maxRows)); 647 s.setString(1, destination.getQualifiedName()); 648 s.setString(2, clientId); 649 s.setString(3, subscriptionName); 650 s.setLong(4, seq); 651 s.setLong(5, priority); 652 rs = s.executeQuery(); 653 int count = 0; 654 if (this.statements.isUseExternalMessageReferences()) { 655 while (rs.next() && count < maxReturned) { 656 if (listener.recoverMessageReference(rs.getString(1))) { 657 count++; 658 } 659 } 660 } else { 661 while (rs.next() && count < maxReturned) { 662 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 663 count++; 664 } 665 } 666 } 667 } finally { 668 cleanupExclusiveLock.readLock().unlock(); 669 close(rs); 670 close(s); 671 } 672 } 673 674 public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, 675 String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException { 676 PreparedStatement s = null; 677 ResultSet rs = null; 678 int result = 0; 679 cleanupExclusiveLock.readLock().lock(); 680 try { 681 if (isPrioritizedMessages) { 682 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority()); 683 } else { 684 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement()); 685 } 686 s.setString(1, destination.getQualifiedName()); 687 s.setString(2, clientId); 688 s.setString(3, subscriptionName); 689 rs = s.executeQuery(); 690 if (rs.next()) { 691 result = rs.getInt(1); 692 } 693 } finally { 694 cleanupExclusiveLock.readLock().unlock(); 695 close(rs); 696 close(s); 697 } 698 return result; 699 } 700 701 /** 702 * @param c 703 * @param info 704 * @param retroactive 705 * @throws SQLException 706 * @throws IOException 707 */ 708 public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages) 709 throws SQLException, IOException { 710 // dumpTables(c, destination.getQualifiedName(), clientId, 711 // subscriptionName); 712 PreparedStatement s = null; 713 cleanupExclusiveLock.readLock().lock(); 714 try { 715 long lastMessageId = -1; 716 if (!retroactive) { 717 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); 718 ResultSet rs = null; 719 try { 720 rs = s.executeQuery(); 721 if (rs.next()) { 722 lastMessageId = rs.getLong(1); 723 } 724 } finally { 725 close(rs); 726 close(s); 727 } 728 } 729 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); 730 int maxPriority = 1; 731 if (isPrioritizedMessages) { 732 maxPriority = 10; 733 } 734 735 for (int priority = 0; priority < maxPriority; priority++) { 736 s.setString(1, info.getDestination().getQualifiedName()); 737 s.setString(2, info.getClientId()); 738 s.setString(3, info.getSubscriptionName()); 739 s.setString(4, info.getSelector()); 740 s.setLong(5, lastMessageId); 741 s.setString(6, info.getSubscribedDestination().getQualifiedName()); 742 s.setLong(7, priority); 743 744 if (s.executeUpdate() != 1) { 745 throw new IOException("Could not create durable subscription for: " + info.getClientId()); 746 } 747 } 748 749 } finally { 750 cleanupExclusiveLock.readLock().unlock(); 751 close(s); 752 } 753 } 754 755 public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, 756 String clientId, String subscriptionName) throws SQLException, IOException { 757 PreparedStatement s = null; 758 ResultSet rs = null; 759 cleanupExclusiveLock.readLock().lock(); 760 try { 761 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement()); 762 s.setString(1, destination.getQualifiedName()); 763 s.setString(2, clientId); 764 s.setString(3, subscriptionName); 765 rs = s.executeQuery(); 766 if (!rs.next()) { 767 return null; 768 } 769 SubscriptionInfo subscription = new SubscriptionInfo(); 770 subscription.setDestination(destination); 771 subscription.setClientId(clientId); 772 subscription.setSubscriptionName(subscriptionName); 773 subscription.setSelector(rs.getString(1)); 774 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2), 775 ActiveMQDestination.QUEUE_TYPE)); 776 return subscription; 777 } finally { 778 cleanupExclusiveLock.readLock().unlock(); 779 close(rs); 780 close(s); 781 } 782 } 783 784 public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) 785 throws SQLException, IOException { 786 PreparedStatement s = null; 787 ResultSet rs = null; 788 cleanupExclusiveLock.readLock().lock(); 789 try { 790 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement()); 791 s.setString(1, destination.getQualifiedName()); 792 rs = s.executeQuery(); 793 ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>(); 794 while (rs.next()) { 795 SubscriptionInfo subscription = new SubscriptionInfo(); 796 subscription.setDestination(destination); 797 subscription.setSelector(rs.getString(1)); 798 subscription.setSubscriptionName(rs.getString(2)); 799 subscription.setClientId(rs.getString(3)); 800 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4), 801 ActiveMQDestination.QUEUE_TYPE)); 802 rc.add(subscription); 803 } 804 return rc.toArray(new SubscriptionInfo[rc.size()]); 805 } finally { 806 cleanupExclusiveLock.readLock().unlock(); 807 close(rs); 808 close(s); 809 } 810 } 811 812 public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, 813 IOException { 814 PreparedStatement s = null; 815 cleanupExclusiveLock.readLock().lock(); 816 try { 817 s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement()); 818 s.setString(1, destinationName.getQualifiedName()); 819 s.executeUpdate(); 820 s.close(); 821 s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement()); 822 s.setString(1, destinationName.getQualifiedName()); 823 s.executeUpdate(); 824 } finally { 825 cleanupExclusiveLock.readLock().unlock(); 826 close(s); 827 } 828 } 829 830 public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, 831 String subscriptionName) throws SQLException, IOException { 832 PreparedStatement s = null; 833 cleanupExclusiveLock.readLock().lock(); 834 try { 835 s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement()); 836 s.setString(1, destination.getQualifiedName()); 837 s.setString(2, clientId); 838 s.setString(3, subscriptionName); 839 s.executeUpdate(); 840 } finally { 841 cleanupExclusiveLock.readLock().unlock(); 842 close(s); 843 } 844 } 845 846 char priorityIterator = 0; // unsigned 847 public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException { 848 PreparedStatement s = null; 849 cleanupExclusiveLock.writeLock().lock(); 850 try { 851 LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority()); 852 s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority()); 853 int priority = priorityIterator++%10; 854 s.setInt(1, priority); 855 s.setInt(2, priority); 856 int i = s.executeUpdate(); 857 LOG.debug("Deleted " + i + " old message(s) at priority: " + priority); 858 } finally { 859 cleanupExclusiveLock.writeLock().unlock(); 860 close(s); 861 } 862 } 863 864 public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, 865 String clientId, String subscriberName) throws SQLException, IOException { 866 PreparedStatement s = null; 867 ResultSet rs = null; 868 long result = -1; 869 cleanupExclusiveLock.readLock().lock(); 870 try { 871 s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement()); 872 s.setString(1, destination.getQualifiedName()); 873 s.setString(2, clientId); 874 s.setString(3, subscriberName); 875 rs = s.executeQuery(); 876 if (rs.next()) { 877 result = rs.getLong(1); 878 if (result == 0 && rs.wasNull()) { 879 result = -1; 880 } 881 } 882 } finally { 883 cleanupExclusiveLock.readLock().unlock(); 884 close(rs); 885 close(s); 886 } 887 return result; 888 } 889 890 protected static void close(PreparedStatement s) { 891 try { 892 s.close(); 893 } catch (Throwable e) { 894 } 895 } 896 897 protected static void close(ResultSet rs) { 898 try { 899 rs.close(); 900 } catch (Throwable e) { 901 } 902 } 903 904 public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException { 905 HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 906 PreparedStatement s = null; 907 ResultSet rs = null; 908 cleanupExclusiveLock.readLock().lock(); 909 try { 910 s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement()); 911 rs = s.executeQuery(); 912 while (rs.next()) { 913 rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE)); 914 } 915 } finally { 916 cleanupExclusiveLock.readLock().unlock(); 917 close(rs); 918 close(s); 919 } 920 return rc; 921 } 922 923 /** 924 * @return true if batchStatements 925 */ 926 public boolean isBatchStatements() { 927 return batchStatements; 928 } 929 930 /** 931 * Set the number of statements to process as a single batch DB update 932 * @param batchStatements 933 */ 934 public void setBatchStatements(boolean batchStatements) { 935 this.batchStatements = batchStatements; 936 // The next lines are deprecated and should be removed in a future release 937 // and is here in case someone created their own 938 // this.batchStatments = batchStatements; 939 } 940 941 // Note - remove batchStatment in future distributions. Here for backward compatibility 942 /** 943 * @return true if batchStements 944 */ 945 public boolean isBatchStatments() { 946 return this.batchStatements; 947 } 948 949 /** 950 * This value batchStatments is deprecated and will be removed in a future release. Use batchStatements instead (Note the 'e' in Statement)" 951 * @deprecated 952 * @param batchStatments 953 */ 954 public void setBatchStatments(boolean batchStatments) { 955 LOG.warn("batchStatments is deprecated and will be removed in a future release. Use batchStatements instead (Note the 'e' in Statement)"); 956 this.batchStatements = batchStatments; 957 this.batchStatments = batchStatments; 958 } 959 960 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 961 this.statements.setUseExternalMessageReferences(useExternalMessageReferences); 962 } 963 964 /** 965 * @return the statements 966 */ 967 public Statements getStatements() { 968 return this.statements; 969 } 970 971 public void setStatements(Statements statements) { 972 this.statements = statements; 973 } 974 975 public int getMaxRows() { 976 return maxRows; 977 } 978 979 /** 980 * the max value for statement maxRows, used to limit jdbc queries 981 */ 982 public void setMaxRows(int maxRows) { 983 this.maxRows = maxRows; 984 } 985 986 @Override 987 public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException { 988 PreparedStatement s = null; 989 cleanupExclusiveLock.readLock().lock(); 990 try { 991 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); 992 s.setString(1, destination.getQualifiedName()); 993 s.setString(2, destination.getQualifiedName()); 994 s.setString(3, destination.getQualifiedName()); 995 s.setString(4, null); 996 s.setLong(5, 0); 997 s.setString(6, destination.getQualifiedName()); 998 s.setLong(7, 11); // entry out of priority range 999 1000 if (s.executeUpdate() != 1) { 1001 throw new IOException("Could not create ack record for destination: " + destination); 1002 } 1003 } finally { 1004 cleanupExclusiveLock.readLock().unlock(); 1005 close(s); 1006 } 1007 } 1008 1009 @Override 1010 public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException { 1011 PreparedStatement s = null; 1012 ResultSet rs = null; 1013 cleanupExclusiveLock.readLock().lock(); 1014 try { 1015 s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement()); 1016 rs = s.executeQuery(); 1017 while (rs.next()) { 1018 long id = rs.getLong(1); 1019 String encodedString = rs.getString(2); 1020 byte[] encodedXid = parseBase64Binary(encodedString); 1021 if (encodedXid[0] == '+') { 1022 jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3)); 1023 } else { 1024 jdbcMemoryTransactionStore.recoverAck(id, encodedXid, getBinaryData(rs, 3)); 1025 } 1026 } 1027 1028 close(rs); 1029 close(s); 1030 1031 s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement()); 1032 rs = s.executeQuery(); 1033 while (rs.next()) { 1034 String encodedString = rs.getString(1); 1035 byte[] encodedXid = parseBase64Binary(encodedString); 1036 String destination = rs.getString(2); 1037 String subName = rs.getString(3); 1038 String subId = rs.getString(4); 1039 jdbcMemoryTransactionStore.recoverLastAck(encodedXid, 1040 ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE), 1041 subName, subId); 1042 } 1043 } finally { 1044 close(rs); 1045 cleanupExclusiveLock.readLock().unlock(); 1046 close(s); 1047 } 1048 } 1049 1050 @Override 1051 public void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException { 1052 PreparedStatement s = null; 1053 cleanupExclusiveLock.readLock().lock(); 1054 try { 1055 s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement()); 1056 s.setLong(1, sequence); 1057 s.setLong(2, preparedSequence); 1058 if (s.executeUpdate() != 1) { 1059 throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence); 1060 } 1061 } finally { 1062 cleanupExclusiveLock.readLock().unlock(); 1063 close(s); 1064 } 1065 } 1066 1067 1068 public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, 1069 IOException { 1070 PreparedStatement s = null; 1071 ResultSet rs = null; 1072 int result = 0; 1073 cleanupExclusiveLock.readLock().lock(); 1074 try { 1075 s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement()); 1076 s.setString(1, destination.getQualifiedName()); 1077 rs = s.executeQuery(); 1078 if (rs.next()) { 1079 result = rs.getInt(1); 1080 } 1081 } finally { 1082 cleanupExclusiveLock.readLock().unlock(); 1083 close(rs); 1084 close(s); 1085 } 1086 return result; 1087 } 1088 1089 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long maxSeq, long lastRecoveredSeq, 1090 long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception { 1091 PreparedStatement s = null; 1092 ResultSet rs = null; 1093 cleanupExclusiveLock.readLock().lock(); 1094 try { 1095 if (isPrioritizedMessages) { 1096 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement()); 1097 } else { 1098 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement()); 1099 } 1100 s.setMaxRows(Math.min(maxReturned * 2, maxRows)); 1101 s.setString(1, destination.getQualifiedName()); 1102 s.setLong(2, lastRecoveredSeq); 1103 s.setLong(3, maxSeq); 1104 if (isPrioritizedMessages) { 1105 s.setLong(4, priority); 1106 s.setLong(5, priority); 1107 } 1108 rs = s.executeQuery(); 1109 int count = 0; 1110 if (this.statements.isUseExternalMessageReferences()) { 1111 while (rs.next() && count < maxReturned) { 1112 if (listener.recoverMessageReference(rs.getString(1))) { 1113 count++; 1114 } else { 1115 LOG.debug("Stopped recover next messages"); 1116 break; 1117 } 1118 } 1119 } else { 1120 while (rs.next() && count < maxReturned) { 1121 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 1122 count++; 1123 } else { 1124 LOG.debug("Stopped recover next messages"); 1125 break; 1126 } 1127 } 1128 } 1129 } catch (Exception e) { 1130 e.printStackTrace(); 1131 } finally { 1132 cleanupExclusiveLock.readLock().unlock(); 1133 close(rs); 1134 close(s); 1135 } 1136 } 1137 1138 public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) 1139 throws SQLException, IOException { 1140 PreparedStatement s = null; 1141 ResultSet rs = null; 1142 cleanupExclusiveLock.readLock().lock(); 1143 try { 1144 s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement()); 1145 s.setString(1, id.toString()); 1146 rs = s.executeQuery(); 1147 long seq = -1; 1148 if (rs.next()) { 1149 seq = rs.getLong(1); 1150 } 1151 return seq; 1152 } finally { 1153 cleanupExclusiveLock.readLock().unlock(); 1154 close(rs); 1155 close(s); 1156 } 1157 } 1158 1159 public static void dumpTables(Connection c, String destinationName, String clientId, String 1160 subscriptionName) throws SQLException { 1161 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 1162 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 1163 PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 1164 + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 1165 + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 1166 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 1167 + " ORDER BY M.ID"); 1168 s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName); 1169 printQuery(s,System.out); } 1170 1171 public static void dumpTables(java.sql.Connection c) throws SQLException { 1172 printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_MSGS", System.out); 1173 1174 //printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_ACKS", System.out); 1175 1176 //printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out); 1177 //printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 1178 } 1179 1180 public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out) 1181 throws SQLException { 1182 printQuery(c.prepareStatement(query), out); 1183 } 1184 1185 public static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out) 1186 throws SQLException { 1187 1188 ResultSet set = null; 1189 try { 1190 set = s.executeQuery(); 1191 java.sql.ResultSetMetaData metaData = set.getMetaData(); 1192 for (int i = 1; i <= metaData.getColumnCount(); i++) { 1193 if (i == 1) 1194 out.print("||"); 1195 out.print(metaData.getColumnName(i) + "||"); 1196 } 1197 out.println(); 1198 while (set.next()) { 1199 for (int i = 1; i <= metaData.getColumnCount(); i++) { 1200 if (i == 1) 1201 out.print("|"); 1202 out.print(set.getString(i) + "|"); 1203 } 1204 out.println(); 1205 } 1206 } finally { 1207 try { 1208 set.close(); 1209 } catch (Throwable ignore) { 1210 } 1211 try { 1212 s.close(); 1213 } catch (Throwable ignore) { 1214 } 1215 } 1216 } 1217 1218}