001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.SQLException; 021import java.util.Iterator; 022import java.util.LinkedHashSet; 023import java.util.LinkedList; 024import java.util.Set; 025import java.util.concurrent.atomic.AtomicLong; 026 027import org.apache.activemq.ActiveMQMessageAudit; 028import org.apache.activemq.broker.ConnectionContext; 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.command.Message; 031import org.apache.activemq.command.MessageAck; 032import org.apache.activemq.command.MessageId; 033import org.apache.activemq.command.XATransactionId; 034import org.apache.activemq.store.AbstractMessageStore; 035import org.apache.activemq.store.IndexListener; 036import org.apache.activemq.store.MessageRecoveryListener; 037import org.apache.activemq.util.ByteSequence; 038import org.apache.activemq.util.ByteSequenceData; 039import org.apache.activemq.util.IOExceptionSupport; 040import org.apache.activemq.wireformat.WireFormat; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * 046 */ 047public class JDBCMessageStore extends AbstractMessageStore { 048 049 class Duration { 050 static final int LIMIT = 100; 051 final long start = System.currentTimeMillis(); 052 final String name; 053 054 Duration(String name) { 055 this.name = name; 056 } 057 void end() { 058 end(null); 059 } 060 void end(Object o) { 061 long duration = System.currentTimeMillis() - start; 062 063 if (duration > LIMIT) { 064 System.err.println(name + " took a long time: " + duration + "ms " + o); 065 } 066 } 067 } 068 private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class); 069 protected final WireFormat wireFormat; 070 protected final JDBCAdapter adapter; 071 protected final JDBCPersistenceAdapter persistenceAdapter; 072 protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1); 073 protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1); 074 protected ActiveMQMessageAudit audit; 075 protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>(); 076 077 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException { 078 super(destination); 079 this.persistenceAdapter = persistenceAdapter; 080 this.adapter = adapter; 081 this.wireFormat = wireFormat; 082 this.audit = audit; 083 084 if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) { 085 recordDestinationCreation(destination); 086 } 087 } 088 089 private void recordDestinationCreation(ActiveMQDestination destination) throws IOException { 090 TransactionContext c = persistenceAdapter.getTransactionContext(); 091 try { 092 c = persistenceAdapter.getTransactionContext(); 093 if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) { 094 adapter.doRecordDestination(c, destination); 095 } 096 } catch (SQLException e) { 097 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 098 throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e); 099 } finally { 100 c.close(); 101 } 102 } 103 104 public void addMessage(final ConnectionContext context, final Message message) throws IOException { 105 MessageId messageId = message.getMessageId(); 106 if (audit != null && audit.isDuplicate(message)) { 107 if (LOG.isDebugEnabled()) { 108 LOG.debug(destination.getPhysicalName() 109 + " ignoring duplicated (add) message, already stored: " 110 + messageId); 111 } 112 return; 113 } 114 115 // if xaXid present - this is a prepare - so we don't yet have an outcome 116 final XATransactionId xaXid = context != null ? context.getXid() : null; 117 118 // Serialize the Message.. 119 byte data[]; 120 try { 121 ByteSequence packet = wireFormat.marshal(message); 122 data = ByteSequenceData.toByteArray(packet); 123 } catch (IOException e) { 124 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 125 } 126 127 // Get a connection and insert the message into the DB. 128 TransactionContext c = persistenceAdapter.getTransactionContext(context); 129 long sequenceId; 130 synchronized (pendingAdditions) { 131 sequenceId = persistenceAdapter.getNextSequenceId(); 132 final long sequence = sequenceId; 133 message.getMessageId().setEntryLocator(sequence); 134 135 if (xaXid == null) { 136 pendingAdditions.add(sequence); 137 138 c.onCompletion(new Runnable() { 139 public void run() { 140 // jdbc close or jms commit - while futureOrSequenceLong==null ordered 141 // work will remain pending on the Queue 142 message.getMessageId().setFutureOrSequenceLong(sequence); 143 } 144 }); 145 146 if (indexListener != null) { 147 indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { 148 @Override 149 public void run() { 150 // cursor add complete 151 synchronized (pendingAdditions) { pendingAdditions.remove(sequence); } 152 } 153 })); 154 } else { 155 pendingAdditions.remove(sequence); 156 } 157 } 158 } 159 try { 160 adapter.doAddMessage(c, sequenceId, messageId, destination, data, message.getExpiration(), 161 this.isPrioritizedMessages() ? message.getPriority() : 0, xaXid); 162 } catch (SQLException e) { 163 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 164 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 165 } finally { 166 c.close(); 167 } 168 if (xaXid == null) { 169 onAdd(message, sequenceId, message.getPriority()); 170 } 171 } 172 173 // jdbc commit order is random with concurrent connections - limit scan to lowest pending 174 private long minPendingSequeunceId() { 175 synchronized (pendingAdditions) { 176 if (!pendingAdditions.isEmpty()) { 177 return pendingAdditions.get(0); 178 } else { 179 // nothing pending, ensure scan is limited to current state 180 return persistenceAdapter.sequenceGenerator.getLastSequenceId() + 1; 181 } 182 } 183 } 184 185 @Override 186 public void updateMessage(Message message) throws IOException { 187 TransactionContext c = persistenceAdapter.getTransactionContext(); 188 try { 189 adapter.doUpdateMessage(c, destination, message.getMessageId(), ByteSequenceData.toByteArray(wireFormat.marshal(message))); 190 } catch (SQLException e) { 191 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 192 throw IOExceptionSupport.create("Failed to update message: " + message.getMessageId() + " in container: " + e, e); 193 } finally { 194 c.close(); 195 } 196 } 197 198 protected void onAdd(Message message, long sequenceId, byte priority) {} 199 200 public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException { 201 // Get a connection and insert the message into the DB. 202 TransactionContext c = persistenceAdapter.getTransactionContext(context); 203 try { 204 adapter.doAddMessageReference(c, persistenceAdapter.getNextSequenceId(), messageId, destination, expirationTime, messageRef); 205 } catch (SQLException e) { 206 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 207 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 208 } finally { 209 c.close(); 210 } 211 } 212 213 public Message getMessage(MessageId messageId) throws IOException { 214 // Get a connection and pull the message out of the DB 215 TransactionContext c = persistenceAdapter.getTransactionContext(); 216 try { 217 byte data[] = adapter.doGetMessage(c, messageId); 218 if (data == null) { 219 return null; 220 } 221 222 Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data)); 223 return answer; 224 } catch (IOException e) { 225 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 226 } catch (SQLException e) { 227 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 228 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 229 } finally { 230 c.close(); 231 } 232 } 233 234 public String getMessageReference(MessageId messageId) throws IOException { 235 long id = messageId.getBrokerSequenceId(); 236 237 // Get a connection and pull the message out of the DB 238 TransactionContext c = persistenceAdapter.getTransactionContext(); 239 try { 240 return adapter.doGetMessageReference(c, id); 241 } catch (IOException e) { 242 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 243 } catch (SQLException e) { 244 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 245 throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e); 246 } finally { 247 c.close(); 248 } 249 } 250 251 public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { 252 253 long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ? 254 (Long) ack.getLastMessageId().getFutureOrSequenceLong() : 255 persistenceAdapter.getStoreSequenceIdForMessageId(context, ack.getLastMessageId(), destination)[0]; 256 257 // Get a connection and remove the message from the DB 258 TransactionContext c = persistenceAdapter.getTransactionContext(context); 259 try { 260 adapter.doRemoveMessage(c, seq, context != null ? context.getXid() : null); 261 } catch (SQLException e) { 262 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 263 throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e); 264 } finally { 265 c.close(); 266 } 267 } 268 269 public void recover(final MessageRecoveryListener listener) throws Exception { 270 271 // Get all the Message ids out of the database. 272 TransactionContext c = persistenceAdapter.getTransactionContext(); 273 try { 274 c = persistenceAdapter.getTransactionContext(); 275 adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() { 276 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 277 Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); 278 msg.getMessageId().setBrokerSequenceId(sequenceId); 279 return listener.recoverMessage(msg); 280 } 281 282 public boolean recoverMessageReference(String reference) throws Exception { 283 return listener.recoverMessageReference(new MessageId(reference)); 284 } 285 }); 286 } catch (SQLException e) { 287 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 288 throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e); 289 } finally { 290 c.close(); 291 } 292 } 293 294 /** 295 * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) 296 */ 297 public void removeAllMessages(ConnectionContext context) throws IOException { 298 // Get a connection and remove the message from the DB 299 TransactionContext c = persistenceAdapter.getTransactionContext(context); 300 try { 301 adapter.doRemoveAllMessages(c, destination); 302 } catch (SQLException e) { 303 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 304 throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e); 305 } finally { 306 c.close(); 307 } 308 } 309 310 public int getMessageCount() throws IOException { 311 int result = 0; 312 TransactionContext c = persistenceAdapter.getTransactionContext(); 313 try { 314 315 result = adapter.doGetMessageCount(c, destination); 316 317 } catch (SQLException e) { 318 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 319 throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e); 320 } finally { 321 c.close(); 322 } 323 return result; 324 } 325 326 /** 327 * @param maxReturned 328 * @param listener 329 * @throws Exception 330 * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, 331 * org.apache.activemq.store.MessageRecoveryListener) 332 */ 333 public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception { 334 TransactionContext c = persistenceAdapter.getTransactionContext(); 335 try { 336 if (LOG.isTraceEnabled()) { 337 LOG.trace(this + " recoverNext lastRecovered:" + lastRecoveredSequenceId.get() + ", minPending:" + minPendingSequeunceId()); 338 } 339 adapter.doRecoverNextMessages(c, destination, minPendingSequeunceId(), lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), 340 maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { 341 342 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { 343 Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); 344 msg.getMessageId().setBrokerSequenceId(sequenceId); 345 msg.getMessageId().setFutureOrSequenceLong(sequenceId); 346 listener.recoverMessage(msg); 347 lastRecoveredSequenceId.set(sequenceId); 348 lastRecoveredPriority.set(msg.getPriority()); 349 return true; 350 } 351 352 public boolean recoverMessageReference(String reference) throws Exception { 353 if (listener.hasSpace()) { 354 listener.recoverMessageReference(new MessageId(reference)); 355 return true; 356 } 357 return false; 358 } 359 360 }); 361 } catch (SQLException e) { 362 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 363 } finally { 364 c.close(); 365 } 366 367 } 368 369 /** 370 * @see org.apache.activemq.store.MessageStore#resetBatching() 371 */ 372 public void resetBatching() { 373 if (LOG.isTraceEnabled()) { 374 LOG.trace(this + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get()); 375 } 376 lastRecoveredSequenceId.set(-1); 377 lastRecoveredPriority.set(Byte.MAX_VALUE - 1); 378 379 } 380 381 @Override 382 public void setBatch(MessageId messageId) { 383 try { 384 long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(null, messageId, destination); 385 lastRecoveredSequenceId.set(storedValues[0]); 386 lastRecoveredPriority.set(storedValues[1]); 387 } catch (IOException ignoredAsAlreadyLogged) { 388 lastRecoveredSequenceId.set(-1); 389 lastRecoveredPriority.set(Byte.MAX_VALUE -1); 390 } 391 if (LOG.isTraceEnabled()) { 392 LOG.trace(this + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get() 393 + ", priority: " + lastRecoveredPriority.get()); 394 } 395 } 396 397 398 public void setPrioritizedMessages(boolean prioritizedMessages) { 399 super.setPrioritizedMessages(prioritizedMessages); 400 } 401 402 @Override 403 public String toString() { 404 return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size(); 405 } 406}