001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.net.URI; 020import java.net.URISyntaxException; 021import java.security.AccessController; 022import java.security.PrivilegedAction; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.Properties; 026import java.util.concurrent.RejectedExecutionHandler; 027 028import javax.jms.Connection; 029import javax.jms.ConnectionFactory; 030import javax.jms.ExceptionListener; 031import javax.jms.JMSException; 032import javax.jms.QueueConnection; 033import javax.jms.QueueConnectionFactory; 034import javax.jms.TopicConnection; 035import javax.jms.TopicConnectionFactory; 036import javax.naming.Context; 037 038import org.apache.activemq.blob.BlobTransferPolicy; 039import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; 040import org.apache.activemq.jndi.JNDIBaseStorable; 041import org.apache.activemq.management.JMSStatsImpl; 042import org.apache.activemq.management.StatsCapable; 043import org.apache.activemq.management.StatsImpl; 044import org.apache.activemq.thread.TaskRunnerFactory; 045import org.apache.activemq.transport.Transport; 046import org.apache.activemq.transport.TransportFactory; 047import org.apache.activemq.transport.TransportListener; 048import org.apache.activemq.util.IdGenerator; 049import org.apache.activemq.util.IntrospectionSupport; 050import org.apache.activemq.util.JMSExceptionSupport; 051import org.apache.activemq.util.URISupport; 052import org.apache.activemq.util.URISupport.CompositeData; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * A ConnectionFactory is an an Administered object, and is used for creating 058 * Connections. <p/> This class also implements QueueConnectionFactory and 059 * TopicConnectionFactory. You can use this connection to create both 060 * QueueConnections and TopicConnections. 061 * 062 * 063 * @see javax.jms.ConnectionFactory 064 */ 065public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable { 066 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionFactory.class); 067 private static final String DEFAULT_BROKER_HOST; 068 private static final int DEFAULT_BROKER_PORT; 069 static{ 070 String host = null; 071 String port = null; 072 try { 073 host = AccessController.doPrivileged(new PrivilegedAction<String>() { 074 @Override 075 public String run() { 076 String result = System.getProperty("org.apache.activemq.AMQ_HOST"); 077 result = (result==null||result.isEmpty()) ? System.getProperty("AMQ_HOST","localhost") : result; 078 return result; 079 } 080 }); 081 port = AccessController.doPrivileged(new PrivilegedAction<String>() { 082 @Override 083 public String run() { 084 String result = System.getProperty("org.apache.activemq.AMQ_PORT"); 085 result = (result==null||result.isEmpty()) ? System.getProperty("AMQ_PORT","61616") : result; 086 return result; 087 } 088 }); 089 }catch(Throwable e){ 090 LOG.debug("Failed to look up System properties for host and port",e); 091 } 092 host = (host == null || host.isEmpty()) ? "localhost" : host; 093 port = (port == null || port.isEmpty()) ? "61616" : port; 094 DEFAULT_BROKER_HOST = host; 095 DEFAULT_BROKER_PORT = Integer.parseInt(port); 096 } 097 098 099 public static final String DEFAULT_BROKER_BIND_URL; 100 101 static{ 102 final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; 103 String bindURL = null; 104 105 try { 106 bindURL = AccessController.doPrivileged(new PrivilegedAction<String>() { 107 @Override 108 public String run() { 109 String result = System.getProperty("org.apache.activemq.BROKER_BIND_URL"); 110 result = (result==null||result.isEmpty()) ? System.getProperty("BROKER_BIND_URL",defaultURL) : result; 111 return result; 112 } 113 }); 114 }catch(Throwable e){ 115 LOG.debug("Failed to look up System properties for host and port",e); 116 } 117 bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL; 118 DEFAULT_BROKER_BIND_URL = bindURL; 119 } 120 121 public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL; 122 public static final String DEFAULT_USER = null; 123 public static final String DEFAULT_PASSWORD = null; 124 public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0; 125 126 protected URI brokerURL; 127 protected String userName; 128 protected String password; 129 protected String clientID; 130 protected boolean dispatchAsync=true; 131 protected boolean alwaysSessionAsync=true; 132 133 JMSStatsImpl factoryStats = new JMSStatsImpl(); 134 135 private IdGenerator clientIdGenerator; 136 private String clientIDPrefix; 137 private IdGenerator connectionIdGenerator; 138 private String connectionIDPrefix; 139 140 // client policies 141 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 142 private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap(); 143 { 144 redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy()); 145 } 146 private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 147 private MessageTransformer transformer; 148 149 private boolean disableTimeStampsByDefault; 150 private boolean optimizedMessageDispatch = true; 151 private long optimizeAcknowledgeTimeOut = 300; 152 private long optimizedAckScheduledAckInterval = 0; 153 private boolean copyMessageOnSend = true; 154 private boolean useCompression; 155 private boolean objectMessageSerializationDefered; 156 private boolean useAsyncSend; 157 private boolean optimizeAcknowledge; 158 private int closeTimeout = 15000; 159 private boolean useRetroactiveConsumer; 160 private boolean exclusiveConsumer; 161 private boolean nestedMapAndListEnabled = true; 162 private boolean alwaysSyncSend; 163 private boolean watchTopicAdvisories = true; 164 private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; 165 private long warnAboutUnstartedConnectionTimeout = 500L; 166 private int sendTimeout = 0; 167 private boolean sendAcksAsync=true; 168 private TransportListener transportListener; 169 private ExceptionListener exceptionListener; 170 private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE; 171 private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT; 172 private boolean useDedicatedTaskRunner; 173 private long consumerFailoverRedeliveryWaitPeriod = 0; 174 private boolean checkForDuplicates = true; 175 private ClientInternalExceptionListener clientInternalExceptionListener; 176 private boolean messagePrioritySupported = false; 177 private boolean transactedIndividualAck = false; 178 private boolean nonBlockingRedelivery = false; 179 private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE; 180 private TaskRunnerFactory sessionTaskRunner; 181 private RejectedExecutionHandler rejectedTaskHandler = null; 182 protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class 183 private boolean rmIdFromConnectionId = false; 184 private boolean consumerExpiryCheckEnabled = true; 185 186 // ///////////////////////////////////////////// 187 // 188 // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods 189 // 190 // ///////////////////////////////////////////// 191 192 public ActiveMQConnectionFactory() { 193 this(DEFAULT_BROKER_URL); 194 } 195 196 public ActiveMQConnectionFactory(String brokerURL) { 197 this(createURI(brokerURL)); 198 } 199 200 public ActiveMQConnectionFactory(URI brokerURL) { 201 setBrokerURL(brokerURL.toString()); 202 } 203 204 public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) { 205 setUserName(userName); 206 setPassword(password); 207 setBrokerURL(brokerURL.toString()); 208 } 209 210 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) { 211 setUserName(userName); 212 setPassword(password); 213 setBrokerURL(brokerURL); 214 } 215 216 /** 217 * Returns a copy of the given connection factory 218 */ 219 public ActiveMQConnectionFactory copy() { 220 try { 221 return (ActiveMQConnectionFactory)super.clone(); 222 } catch (CloneNotSupportedException e) { 223 throw new RuntimeException("This should never happen: " + e, e); 224 } 225 } 226 227 /*boolean* 228 * @param brokerURL 229 * @return 230 * @throws URISyntaxException 231 */ 232 private static URI createURI(String brokerURL) { 233 try { 234 return new URI(brokerURL); 235 } catch (URISyntaxException e) { 236 throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e); 237 } 238 } 239 240 /** 241 * @return Returns the Connection. 242 */ 243 @Override 244 public Connection createConnection() throws JMSException { 245 return createActiveMQConnection(); 246 } 247 248 /** 249 * @return Returns the Connection. 250 */ 251 @Override 252 public Connection createConnection(String userName, String password) throws JMSException { 253 return createActiveMQConnection(userName, password); 254 } 255 256 /** 257 * @return Returns the QueueConnection. 258 * @throws JMSException 259 */ 260 @Override 261 public QueueConnection createQueueConnection() throws JMSException { 262 return createActiveMQConnection().enforceQueueOnlyConnection(); 263 } 264 265 /** 266 * @return Returns the QueueConnection. 267 */ 268 @Override 269 public QueueConnection createQueueConnection(String userName, String password) throws JMSException { 270 return createActiveMQConnection(userName, password).enforceQueueOnlyConnection(); 271 } 272 273 /** 274 * @return Returns the TopicConnection. 275 * @throws JMSException 276 */ 277 @Override 278 public TopicConnection createTopicConnection() throws JMSException { 279 return createActiveMQConnection(); 280 } 281 282 /** 283 * @return Returns the TopicConnection. 284 */ 285 @Override 286 public TopicConnection createTopicConnection(String userName, String password) throws JMSException { 287 return createActiveMQConnection(userName, password); 288 } 289 290 /** 291 * @returns the StatsImpl associated with this ConnectionFactory. 292 */ 293 @Override 294 public StatsImpl getStats() { 295 return this.factoryStats; 296 } 297 298 // ///////////////////////////////////////////// 299 // 300 // Implementation methods. 301 // 302 // ///////////////////////////////////////////// 303 304 protected ActiveMQConnection createActiveMQConnection() throws JMSException { 305 return createActiveMQConnection(userName, password); 306 } 307 308 /** 309 * Creates a Transport based on this object's connection settings. Separated 310 * from createActiveMQConnection to allow for subclasses to override. 311 * 312 * @return The newly created Transport. 313 * @throws JMSException If unable to create trasnport. 314 */ 315 protected Transport createTransport() throws JMSException { 316 try { 317 return TransportFactory.connect(brokerURL); 318 } catch (Exception e) { 319 throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e); 320 } 321 } 322 323 /** 324 * @return Returns the Connection. 325 */ 326 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { 327 if (brokerURL == null) { 328 throw new ConfigurationException("brokerURL not set."); 329 } 330 ActiveMQConnection connection = null; 331 try { 332 Transport transport = createTransport(); 333 connection = createActiveMQConnection(transport, factoryStats); 334 335 connection.setUserName(userName); 336 connection.setPassword(password); 337 338 configureConnection(connection); 339 340 transport.start(); 341 342 if (clientID != null) { 343 connection.setDefaultClientID(clientID); 344 } 345 346 return connection; 347 } catch (JMSException e) { 348 // Clean up! 349 try { 350 connection.close(); 351 } catch (Throwable ignore) { 352 } 353 throw e; 354 } catch (Exception e) { 355 // Clean up! 356 try { 357 connection.close(); 358 } catch (Throwable ignore) { 359 } 360 throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); 361 } 362 } 363 364 protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { 365 ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(), 366 getConnectionIdGenerator(), stats); 367 return connection; 368 } 369 370 protected void configureConnection(ActiveMQConnection connection) throws JMSException { 371 connection.setPrefetchPolicy(getPrefetchPolicy()); 372 connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault()); 373 connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch()); 374 connection.setCopyMessageOnSend(isCopyMessageOnSend()); 375 connection.setUseCompression(isUseCompression()); 376 connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered()); 377 connection.setDispatchAsync(isDispatchAsync()); 378 connection.setUseAsyncSend(isUseAsyncSend()); 379 connection.setAlwaysSyncSend(isAlwaysSyncSend()); 380 connection.setAlwaysSessionAsync(isAlwaysSessionAsync()); 381 connection.setOptimizeAcknowledge(isOptimizeAcknowledge()); 382 connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut()); 383 connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval()); 384 connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer()); 385 connection.setExclusiveConsumer(isExclusiveConsumer()); 386 connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap()); 387 connection.setTransformer(getTransformer()); 388 connection.setBlobTransferPolicy(getBlobTransferPolicy().copy()); 389 connection.setWatchTopicAdvisories(isWatchTopicAdvisories()); 390 connection.setProducerWindowSize(getProducerWindowSize()); 391 connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); 392 connection.setSendTimeout(getSendTimeout()); 393 connection.setCloseTimeout(getCloseTimeout()); 394 connection.setSendAcksAsync(isSendAcksAsync()); 395 connection.setAuditDepth(getAuditDepth()); 396 connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber()); 397 connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner()); 398 connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod()); 399 connection.setCheckForDuplicates(isCheckForDuplicates()); 400 connection.setMessagePrioritySupported(isMessagePrioritySupported()); 401 connection.setTransactedIndividualAck(isTransactedIndividualAck()); 402 connection.setNonBlockingRedelivery(isNonBlockingRedelivery()); 403 connection.setMaxThreadPoolSize(getMaxThreadPoolSize()); 404 connection.setSessionTaskRunner(getSessionTaskRunner()); 405 connection.setRejectedTaskHandler(getRejectedTaskHandler()); 406 connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled()); 407 connection.setRmIdFromConnectionId(isRmIdFromConnectionId()); 408 connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled()); 409 if (transportListener != null) { 410 connection.addTransportListener(transportListener); 411 } 412 if (exceptionListener != null) { 413 connection.setExceptionListener(exceptionListener); 414 } 415 if (clientInternalExceptionListener != null) { 416 connection.setClientInternalExceptionListener(clientInternalExceptionListener); 417 } 418 } 419 420 // ///////////////////////////////////////////// 421 // 422 // Property Accessors 423 // 424 // ///////////////////////////////////////////// 425 426 public String getBrokerURL() { 427 return brokerURL == null ? null : brokerURL.toString(); 428 } 429 430 /** 431 * Sets the <a 432 * href="http://activemq.apache.org/configuring-transports.html">connection 433 * URL</a> used to connect to the ActiveMQ broker. 434 */ 435 public void setBrokerURL(String brokerURL) { 436 this.brokerURL = createURI(brokerURL); 437 438 // Use all the properties prefixed with 'jms.' to set the connection 439 // factory 440 // options. 441 if (this.brokerURL.getQuery() != null) { 442 // It might be a standard URI or... 443 try { 444 445 Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery()); 446 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms."); 447 if (buildFromMap(jmsOptionsMap)) { 448 if (!jmsOptionsMap.isEmpty()) { 449 String msg = "There are " + jmsOptionsMap.size() 450 + " jms options that couldn't be set on the ConnectionFactory." 451 + " Check the options are spelled correctly." 452 + " Unknown parameters=[" + jmsOptionsMap + "]." 453 + " This connection factory cannot be started."; 454 throw new IllegalArgumentException(msg); 455 } 456 457 this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map); 458 } 459 460 } catch (URISyntaxException e) { 461 } 462 463 } else { 464 465 // It might be a composite URI. 466 try { 467 CompositeData data = URISupport.parseComposite(this.brokerURL); 468 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms."); 469 if (buildFromMap(jmsOptionsMap)) { 470 if (!jmsOptionsMap.isEmpty()) { 471 String msg = "There are " + jmsOptionsMap.size() 472 + " jms options that couldn't be set on the ConnectionFactory." 473 + " Check the options are spelled correctly." 474 + " Unknown parameters=[" + jmsOptionsMap + "]." 475 + " This connection factory cannot be started."; 476 throw new IllegalArgumentException(msg); 477 } 478 479 this.brokerURL = data.toURI(); 480 } 481 } catch (URISyntaxException e) { 482 } 483 } 484 } 485 486 public String getClientID() { 487 return clientID; 488 } 489 490 /** 491 * Sets the JMS clientID to use for the created connection. Note that this 492 * can only be used by one connection at once so generally its a better idea 493 * to set the clientID on a Connection 494 */ 495 public void setClientID(String clientID) { 496 this.clientID = clientID; 497 } 498 499 public boolean isCopyMessageOnSend() { 500 return copyMessageOnSend; 501 } 502 503 /** 504 * Should a JMS message be copied to a new JMS Message object as part of the 505 * send() method in JMS. This is enabled by default to be compliant with the 506 * JMS specification. You can disable it if you do not mutate JMS messages 507 * after they are sent for a performance boost 508 */ 509 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 510 this.copyMessageOnSend = copyMessageOnSend; 511 } 512 513 public boolean isDisableTimeStampsByDefault() { 514 return disableTimeStampsByDefault; 515 } 516 517 /** 518 * Sets whether or not timestamps on messages should be disabled or not. If 519 * you disable them it adds a small performance boost. 520 */ 521 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) { 522 this.disableTimeStampsByDefault = disableTimeStampsByDefault; 523 } 524 525 public boolean isOptimizedMessageDispatch() { 526 return optimizedMessageDispatch; 527 } 528 529 /** 530 * If this flag is set then an larger prefetch limit is used - only 531 * applicable for durable topic subscribers. 532 */ 533 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) { 534 this.optimizedMessageDispatch = optimizedMessageDispatch; 535 } 536 537 public String getPassword() { 538 return password; 539 } 540 541 /** 542 * Sets the JMS password used for connections created from this factory 543 */ 544 public void setPassword(String password) { 545 this.password = password; 546 } 547 548 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 549 return prefetchPolicy; 550 } 551 552 /** 553 * Sets the <a 554 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 555 * policy</a> for consumers created by this connection. 556 */ 557 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 558 this.prefetchPolicy = prefetchPolicy; 559 } 560 561 public boolean isUseAsyncSend() { 562 return useAsyncSend; 563 } 564 565 public BlobTransferPolicy getBlobTransferPolicy() { 566 return blobTransferPolicy; 567 } 568 569 /** 570 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 571 * OBjects) are transferred from producers to brokers to consumers 572 */ 573 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 574 this.blobTransferPolicy = blobTransferPolicy; 575 } 576 577 /** 578 * Forces the use of <a 579 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 580 * adds a massive performance boost; but means that the send() method will 581 * return immediately whether the message has been sent or not which could 582 * lead to message loss. 583 */ 584 public void setUseAsyncSend(boolean useAsyncSend) { 585 this.useAsyncSend = useAsyncSend; 586 } 587 588 public synchronized boolean isWatchTopicAdvisories() { 589 return watchTopicAdvisories; 590 } 591 592 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 593 this.watchTopicAdvisories = watchTopicAdvisories; 594 } 595 596 /** 597 * @return true if always sync send messages 598 */ 599 public boolean isAlwaysSyncSend() { 600 return this.alwaysSyncSend; 601 } 602 603 /** 604 * Set true if always require messages to be sync sent 605 * 606 * @param alwaysSyncSend 607 */ 608 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 609 this.alwaysSyncSend = alwaysSyncSend; 610 } 611 612 public String getUserName() { 613 return userName; 614 } 615 616 /** 617 * Sets the JMS userName used by connections created by this factory 618 */ 619 public void setUserName(String userName) { 620 this.userName = userName; 621 } 622 623 public boolean isUseRetroactiveConsumer() { 624 return useRetroactiveConsumer; 625 } 626 627 /** 628 * Sets whether or not retroactive consumers are enabled. Retroactive 629 * consumers allow non-durable topic subscribers to receive old messages 630 * that were published before the non-durable subscriber started. 631 */ 632 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 633 this.useRetroactiveConsumer = useRetroactiveConsumer; 634 } 635 636 public boolean isExclusiveConsumer() { 637 return exclusiveConsumer; 638 } 639 640 /** 641 * Enables or disables whether or not queue consumers should be exclusive or 642 * not for example to preserve ordering when not using <a 643 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 644 * 645 * @param exclusiveConsumer 646 */ 647 public void setExclusiveConsumer(boolean exclusiveConsumer) { 648 this.exclusiveConsumer = exclusiveConsumer; 649 } 650 651 public RedeliveryPolicy getRedeliveryPolicy() { 652 return redeliveryPolicyMap.getDefaultEntry(); 653 } 654 655 /** 656 * Sets the global default redelivery policy to be used when a message is delivered 657 * but the session is rolled back 658 */ 659 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 660 this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy); 661 } 662 663 public RedeliveryPolicyMap getRedeliveryPolicyMap() { 664 return this.redeliveryPolicyMap; 665 } 666 667 /** 668 * Sets the global redelivery policy mapping to be used when a message is delivered 669 * but the session is rolled back 670 */ 671 public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) { 672 this.redeliveryPolicyMap = redeliveryPolicyMap; 673 } 674 675 public MessageTransformer getTransformer() { 676 return transformer; 677 } 678 679 /** 680 * @return the sendTimeout (in milliseconds) 681 */ 682 public int getSendTimeout() { 683 return sendTimeout; 684 } 685 686 /** 687 * @param sendTimeout the sendTimeout to set (in milliseconds) 688 */ 689 public void setSendTimeout(int sendTimeout) { 690 this.sendTimeout = sendTimeout; 691 } 692 693 /** 694 * @return the sendAcksAsync 695 */ 696 public boolean isSendAcksAsync() { 697 return sendAcksAsync; 698 } 699 700 /** 701 * @param sendAcksAsync the sendAcksAsync to set 702 */ 703 public void setSendAcksAsync(boolean sendAcksAsync) { 704 this.sendAcksAsync = sendAcksAsync; 705 } 706 707 /** 708 * @return the messagePrioritySupported 709 */ 710 public boolean isMessagePrioritySupported() { 711 return this.messagePrioritySupported; 712 } 713 714 /** 715 * @param messagePrioritySupported the messagePrioritySupported to set 716 */ 717 public void setMessagePrioritySupported(boolean messagePrioritySupported) { 718 this.messagePrioritySupported = messagePrioritySupported; 719 } 720 721 722 /** 723 * Sets the transformer used to transform messages before they are sent on 724 * to the JMS bus or when they are received from the bus but before they are 725 * delivered to the JMS client 726 */ 727 public void setTransformer(MessageTransformer transformer) { 728 this.transformer = transformer; 729 } 730 731 @SuppressWarnings({ "unchecked", "rawtypes" }) 732 @Override 733 public void buildFromProperties(Properties properties) { 734 735 if (properties == null) { 736 properties = new Properties(); 737 } 738 739 String temp = properties.getProperty(Context.PROVIDER_URL); 740 if (temp == null || temp.length() == 0) { 741 temp = properties.getProperty("brokerURL"); 742 } 743 if (temp != null && temp.length() > 0) { 744 setBrokerURL(temp); 745 } 746 747 Map<String, Object> p = new HashMap(properties); 748 buildFromMap(p); 749 } 750 751 public boolean buildFromMap(Map<String, Object> properties) { 752 boolean rc = false; 753 754 ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy(); 755 if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) { 756 setPrefetchPolicy(p); 757 rc = true; 758 } 759 760 RedeliveryPolicy rp = new RedeliveryPolicy(); 761 if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) { 762 setRedeliveryPolicy(rp); 763 rc = true; 764 } 765 766 BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy(); 767 if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) { 768 setBlobTransferPolicy(blobTransferPolicy); 769 rc = true; 770 } 771 772 rc |= IntrospectionSupport.setProperties(this, properties); 773 774 return rc; 775 } 776 777 @Override 778 public void populateProperties(Properties props) { 779 props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync())); 780 781 if (getBrokerURL() != null) { 782 props.setProperty(Context.PROVIDER_URL, getBrokerURL()); 783 props.setProperty("brokerURL", getBrokerURL()); 784 } 785 786 if (getClientID() != null) { 787 props.setProperty("clientID", getClientID()); 788 } 789 790 IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy."); 791 IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy."); 792 IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy."); 793 794 props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend())); 795 props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault())); 796 props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered())); 797 props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch())); 798 799 if (getPassword() != null) { 800 props.setProperty("password", getPassword()); 801 } 802 803 props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend())); 804 props.setProperty("useCompression", Boolean.toString(isUseCompression())); 805 props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer())); 806 props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories())); 807 808 if (getUserName() != null) { 809 props.setProperty("userName", getUserName()); 810 } 811 812 props.setProperty("closeTimeout", Integer.toString(getCloseTimeout())); 813 props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync())); 814 props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge())); 815 props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled())); 816 props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend())); 817 props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize())); 818 props.setProperty("sendTimeout", Integer.toString(getSendTimeout())); 819 props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync())); 820 props.setProperty("auditDepth", Integer.toString(getAuditDepth())); 821 props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber())); 822 props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates())); 823 props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported())); 824 props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck())); 825 props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery())); 826 props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize())); 827 props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled())); 828 props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod())); 829 props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId())); 830 props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled())); 831 } 832 833 public boolean isUseCompression() { 834 return useCompression; 835 } 836 837 /** 838 * Enables the use of compression of the message bodies 839 */ 840 public void setUseCompression(boolean useCompression) { 841 this.useCompression = useCompression; 842 } 843 844 public boolean isObjectMessageSerializationDefered() { 845 return objectMessageSerializationDefered; 846 } 847 848 /** 849 * When an object is set on an ObjectMessage, the JMS spec requires the 850 * object to be serialized by that set method. Enabling this flag causes the 851 * object to not get serialized. The object may subsequently get serialized 852 * if the message needs to be sent over a socket or stored to disk. 853 */ 854 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 855 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 856 } 857 858 public boolean isDispatchAsync() { 859 return dispatchAsync; 860 } 861 862 /** 863 * Enables or disables the default setting of whether or not consumers have 864 * their messages <a 865 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 866 * synchronously or asynchronously by the broker</a>. For non-durable 867 * topics for example we typically dispatch synchronously by default to 868 * minimize context switches which boost performance. However sometimes its 869 * better to go slower to ensure that a single blocked consumer socket does 870 * not block delivery to other consumers. 871 * 872 * @param asyncDispatch If true then consumers created on this connection 873 * will default to having their messages dispatched 874 * asynchronously. The default value is true. 875 */ 876 public void setDispatchAsync(boolean asyncDispatch) { 877 this.dispatchAsync = asyncDispatch; 878 } 879 880 /** 881 * @return Returns the closeTimeout. 882 */ 883 public int getCloseTimeout() { 884 return closeTimeout; 885 } 886 887 /** 888 * Sets the timeout before a close is considered complete. Normally a 889 * close() on a connection waits for confirmation from the broker; this 890 * allows that operation to timeout to save the client hanging if there is 891 * no broker 892 */ 893 public void setCloseTimeout(int closeTimeout) { 894 this.closeTimeout = closeTimeout; 895 } 896 897 /** 898 * @return Returns the alwaysSessionAsync. 899 */ 900 public boolean isAlwaysSessionAsync() { 901 return alwaysSessionAsync; 902 } 903 904 /** 905 * If this flag is not set then a separate thread is not used for dispatching messages for each Session in 906 * the Connection. However, a separate thread is always used if there is more than one session, or the session 907 * isn't in auto acknowledge or duplicates ok mode. By default this value is set to true and session dispatch 908 * happens asynchronously. 909 */ 910 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 911 this.alwaysSessionAsync = alwaysSessionAsync; 912 } 913 914 /** 915 * @return Returns the optimizeAcknowledge. 916 */ 917 public boolean isOptimizeAcknowledge() { 918 return optimizeAcknowledge; 919 } 920 921 /** 922 * @param optimizeAcknowledge The optimizeAcknowledge to set. 923 */ 924 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 925 this.optimizeAcknowledge = optimizeAcknowledge; 926 } 927 928 /** 929 * The max time in milliseconds between optimized ack batches 930 * @param optimizeAcknowledgeTimeOut 931 */ 932 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) { 933 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; 934 } 935 936 public long getOptimizeAcknowledgeTimeOut() { 937 return optimizeAcknowledgeTimeOut; 938 } 939 940 public boolean isNestedMapAndListEnabled() { 941 return nestedMapAndListEnabled; 942 } 943 944 /** 945 * Enables/disables whether or not Message properties and MapMessage entries 946 * support <a 947 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 948 * Structures</a> of Map and List objects 949 */ 950 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 951 this.nestedMapAndListEnabled = structuredMapsEnabled; 952 } 953 954 public String getClientIDPrefix() { 955 return clientIDPrefix; 956 } 957 958 /** 959 * Sets the prefix used by autogenerated JMS Client ID values which are used 960 * if the JMS client does not explicitly specify on. 961 * 962 * @param clientIDPrefix 963 */ 964 public void setClientIDPrefix(String clientIDPrefix) { 965 this.clientIDPrefix = clientIDPrefix; 966 } 967 968 protected synchronized IdGenerator getClientIdGenerator() { 969 if (clientIdGenerator == null) { 970 if (clientIDPrefix != null) { 971 clientIdGenerator = new IdGenerator(clientIDPrefix); 972 } else { 973 clientIdGenerator = new IdGenerator(); 974 } 975 } 976 return clientIdGenerator; 977 } 978 979 protected void setClientIdGenerator(IdGenerator clientIdGenerator) { 980 this.clientIdGenerator = clientIdGenerator; 981 } 982 983 /** 984 * Sets the prefix used by connection id generator 985 * @param connectionIDPrefix 986 */ 987 public void setConnectionIDPrefix(String connectionIDPrefix) { 988 this.connectionIDPrefix = connectionIDPrefix; 989 } 990 991 protected synchronized IdGenerator getConnectionIdGenerator() { 992 if (connectionIdGenerator == null) { 993 if (connectionIDPrefix != null) { 994 connectionIdGenerator = new IdGenerator(connectionIDPrefix); 995 } else { 996 connectionIdGenerator = new IdGenerator(); 997 } 998 } 999 return connectionIdGenerator; 1000 } 1001 1002 protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) { 1003 this.connectionIdGenerator = connectionIdGenerator; 1004 } 1005 1006 /** 1007 * @return the statsEnabled 1008 */ 1009 public boolean isStatsEnabled() { 1010 return this.factoryStats.isEnabled(); 1011 } 1012 1013 /** 1014 * @param statsEnabled the statsEnabled to set 1015 */ 1016 public void setStatsEnabled(boolean statsEnabled) { 1017 this.factoryStats.setEnabled(statsEnabled); 1018 } 1019 1020 public synchronized int getProducerWindowSize() { 1021 return producerWindowSize; 1022 } 1023 1024 public synchronized void setProducerWindowSize(int producerWindowSize) { 1025 this.producerWindowSize = producerWindowSize; 1026 } 1027 1028 public long getWarnAboutUnstartedConnectionTimeout() { 1029 return warnAboutUnstartedConnectionTimeout; 1030 } 1031 1032 /** 1033 * Enables the timeout from a connection creation to when a warning is 1034 * generated if the connection is not properly started via 1035 * {@link Connection#start()} and a message is received by a consumer. It is 1036 * a very common gotcha to forget to <a 1037 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 1038 * the connection</a> so this option makes the default case to create a 1039 * warning if the user forgets. To disable the warning just set the value to < 1040 * 0 (say -1). 1041 */ 1042 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 1043 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 1044 } 1045 1046 public TransportListener getTransportListener() { 1047 return transportListener; 1048 } 1049 1050 /** 1051 * Allows a listener to be configured on the ConnectionFactory so that when this factory is used 1052 * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register 1053 * a transport listener. 1054 * 1055 * @param transportListener sets the listener to be registered on all connections 1056 * created by this factory 1057 */ 1058 public void setTransportListener(TransportListener transportListener) { 1059 this.transportListener = transportListener; 1060 } 1061 1062 1063 public ExceptionListener getExceptionListener() { 1064 return exceptionListener; 1065 } 1066 1067 /** 1068 * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory 1069 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 1070 * an exception listener. 1071 * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than 1072 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 1073 * @param exceptionListener sets the exception listener to be registered on all connections 1074 * created by this factory 1075 */ 1076 public void setExceptionListener(ExceptionListener exceptionListener) { 1077 this.exceptionListener = exceptionListener; 1078 } 1079 1080 public int getAuditDepth() { 1081 return auditDepth; 1082 } 1083 1084 public void setAuditDepth(int auditDepth) { 1085 this.auditDepth = auditDepth; 1086 } 1087 1088 public int getAuditMaximumProducerNumber() { 1089 return auditMaximumProducerNumber; 1090 } 1091 1092 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 1093 this.auditMaximumProducerNumber = auditMaximumProducerNumber; 1094 } 1095 1096 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 1097 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 1098 } 1099 1100 public boolean isUseDedicatedTaskRunner() { 1101 return useDedicatedTaskRunner; 1102 } 1103 1104 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 1105 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 1106 } 1107 1108 public long getConsumerFailoverRedeliveryWaitPeriod() { 1109 return consumerFailoverRedeliveryWaitPeriod; 1110 } 1111 1112 public ClientInternalExceptionListener getClientInternalExceptionListener() { 1113 return clientInternalExceptionListener; 1114 } 1115 1116 /** 1117 * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory 1118 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register 1119 * an exception listener. 1120 * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than 1121 * on connection (as it will be if more than one connection is subsequently created by this connection factory) 1122 * @param clientInternalExceptionListener sets the exception listener to be registered on all connections 1123 * created by this factory 1124 */ 1125 public void setClientInternalExceptionListener( 1126 ClientInternalExceptionListener clientInternalExceptionListener) { 1127 this.clientInternalExceptionListener = clientInternalExceptionListener; 1128 } 1129 1130 /** 1131 * @return the checkForDuplicates 1132 */ 1133 public boolean isCheckForDuplicates() { 1134 return this.checkForDuplicates; 1135 } 1136 1137 /** 1138 * @param checkForDuplicates the checkForDuplicates to set 1139 */ 1140 public void setCheckForDuplicates(boolean checkForDuplicates) { 1141 this.checkForDuplicates = checkForDuplicates; 1142 } 1143 1144 public boolean isTransactedIndividualAck() { 1145 return transactedIndividualAck; 1146 } 1147 1148 /** 1149 * when true, submit individual transacted acks immediately rather than with transaction completion. 1150 * This allows the acks to represent delivery status which can be persisted on rollback 1151 * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean) true 1152 */ 1153 public void setTransactedIndividualAck(boolean transactedIndividualAck) { 1154 this.transactedIndividualAck = transactedIndividualAck; 1155 } 1156 1157 1158 public boolean isNonBlockingRedelivery() { 1159 return nonBlockingRedelivery; 1160 } 1161 1162 /** 1163 * When true a MessageConsumer will not stop Message delivery before re-delivering Messages 1164 * from a rolled back transaction. This implies that message order will not be preserved and 1165 * also will result in the TransactedIndividualAck option to be enabled. 1166 */ 1167 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { 1168 this.nonBlockingRedelivery = nonBlockingRedelivery; 1169 } 1170 1171 public int getMaxThreadPoolSize() { 1172 return maxThreadPoolSize; 1173 } 1174 1175 public void setMaxThreadPoolSize(int maxThreadPoolSize) { 1176 this.maxThreadPoolSize = maxThreadPoolSize; 1177 } 1178 1179 public TaskRunnerFactory getSessionTaskRunner() { 1180 return sessionTaskRunner; 1181 } 1182 1183 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { 1184 this.sessionTaskRunner = sessionTaskRunner; 1185 } 1186 1187 public RejectedExecutionHandler getRejectedTaskHandler() { 1188 return rejectedTaskHandler; 1189 } 1190 1191 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) { 1192 this.rejectedTaskHandler = rejectedTaskHandler; 1193 } 1194 1195 /** 1196 * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled 1197 * to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers 1198 * will not do any background Message acknowledgment. 1199 * 1200 * @return the scheduledOptimizedAckInterval 1201 */ 1202 public long getOptimizedAckScheduledAckInterval() { 1203 return optimizedAckScheduledAckInterval; 1204 } 1205 1206 /** 1207 * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that 1208 * have been configured with optimizeAcknowledge enabled. 1209 * 1210 * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set 1211 */ 1212 public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) { 1213 this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; 1214 } 1215 1216 1217 public boolean isRmIdFromConnectionId() { 1218 return rmIdFromConnectionId; 1219 } 1220 1221 /** 1222 * uses the connection id as the resource identity for XAResource.isSameRM 1223 * ensuring join will only occur on a single connection 1224 */ 1225 public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) { 1226 this.rmIdFromConnectionId = rmIdFromConnectionId; 1227 } 1228 1229 /** 1230 * @return true if MessageConsumer instance will check for expired messages before dispatch. 1231 */ 1232 public boolean isConsumerExpiryCheckEnabled() { 1233 return consumerExpiryCheckEnabled; 1234 } 1235 1236 /** 1237 * Controls whether message expiration checking is done in each MessageConsumer 1238 * prior to dispatching a message. Disabling this check can lead to consumption 1239 * of expired messages. 1240 * 1241 * @param consumerExpiryCheckEnabled 1242 * controls whether expiration checking is done prior to dispatch. 1243 */ 1244 public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { 1245 this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; 1246 } 1247}