001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.net.URI;
020import java.net.URISyntaxException;
021import java.security.AccessController;
022import java.security.PrivilegedAction;
023import java.util.HashMap;
024import java.util.Map;
025import java.util.Properties;
026import java.util.concurrent.RejectedExecutionHandler;
027
028import javax.jms.Connection;
029import javax.jms.ConnectionFactory;
030import javax.jms.ExceptionListener;
031import javax.jms.JMSException;
032import javax.jms.QueueConnection;
033import javax.jms.QueueConnectionFactory;
034import javax.jms.TopicConnection;
035import javax.jms.TopicConnectionFactory;
036import javax.naming.Context;
037
038import org.apache.activemq.blob.BlobTransferPolicy;
039import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
040import org.apache.activemq.jndi.JNDIBaseStorable;
041import org.apache.activemq.management.JMSStatsImpl;
042import org.apache.activemq.management.StatsCapable;
043import org.apache.activemq.management.StatsImpl;
044import org.apache.activemq.thread.TaskRunnerFactory;
045import org.apache.activemq.transport.Transport;
046import org.apache.activemq.transport.TransportFactory;
047import org.apache.activemq.transport.TransportListener;
048import org.apache.activemq.util.IdGenerator;
049import org.apache.activemq.util.IntrospectionSupport;
050import org.apache.activemq.util.JMSExceptionSupport;
051import org.apache.activemq.util.URISupport;
052import org.apache.activemq.util.URISupport.CompositeData;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * A ConnectionFactory is an an Administered object, and is used for creating
058 * Connections. <p/> This class also implements QueueConnectionFactory and
059 * TopicConnectionFactory. You can use this connection to create both
060 * QueueConnections and TopicConnections.
061 *
062 *
063 * @see javax.jms.ConnectionFactory
064 */
065public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
066    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionFactory.class);
067    private static final String DEFAULT_BROKER_HOST;
068    private static final int DEFAULT_BROKER_PORT;
069    static{
070        String host = null;
071        String port = null;
072        try {
073             host = AccessController.doPrivileged(new PrivilegedAction<String>() {
074                 @Override
075                 public String run() {
076                     String result = System.getProperty("org.apache.activemq.AMQ_HOST");
077                     result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_HOST","localhost") : result;
078                     return result;
079                 }
080             });
081             port = AccessController.doPrivileged(new PrivilegedAction<String>() {
082                 @Override
083                 public String run() {
084                     String result = System.getProperty("org.apache.activemq.AMQ_PORT");
085                     result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_PORT","61616") : result;
086                     return result;
087                 }
088             });
089        }catch(Throwable e){
090            LOG.debug("Failed to look up System properties for host and port",e);
091        }
092        host = (host == null || host.isEmpty()) ? "localhost" : host;
093        port = (port == null || port.isEmpty()) ? "61616" : port;
094        DEFAULT_BROKER_HOST = host;
095        DEFAULT_BROKER_PORT = Integer.parseInt(port);
096    }
097
098
099    public static final String DEFAULT_BROKER_BIND_URL;
100
101    static{
102        final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
103        String bindURL = null;
104
105        try {
106            bindURL = AccessController.doPrivileged(new PrivilegedAction<String>() {
107                @Override
108                public String run() {
109                    String result = System.getProperty("org.apache.activemq.BROKER_BIND_URL");
110                    result = (result==null||result.isEmpty()) ?  System.getProperty("BROKER_BIND_URL",defaultURL) : result;
111                    return result;
112                }
113            });
114        }catch(Throwable e){
115            LOG.debug("Failed to look up System properties for host and port",e);
116        }
117        bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL;
118        DEFAULT_BROKER_BIND_URL = bindURL;
119    }
120
121    public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
122    public static final String DEFAULT_USER = null;
123    public static final String DEFAULT_PASSWORD = null;
124    public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
125
126    protected URI brokerURL;
127    protected String userName;
128    protected String password;
129    protected String clientID;
130    protected boolean dispatchAsync=true;
131    protected boolean alwaysSessionAsync=true;
132
133    JMSStatsImpl factoryStats = new JMSStatsImpl();
134
135    private IdGenerator clientIdGenerator;
136    private String clientIDPrefix;
137    private IdGenerator connectionIdGenerator;
138    private String connectionIDPrefix;
139
140    // client policies
141    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
142    private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
143    {
144        redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy());
145    }
146    private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
147    private MessageTransformer transformer;
148
149    private boolean disableTimeStampsByDefault;
150    private boolean optimizedMessageDispatch = true;
151    private long optimizeAcknowledgeTimeOut = 300;
152    private long optimizedAckScheduledAckInterval = 0;
153    private boolean copyMessageOnSend = true;
154    private boolean useCompression;
155    private boolean objectMessageSerializationDefered;
156    private boolean useAsyncSend;
157    private boolean optimizeAcknowledge;
158    private int closeTimeout = 15000;
159    private boolean useRetroactiveConsumer;
160    private boolean exclusiveConsumer;
161    private boolean nestedMapAndListEnabled = true;
162    private boolean alwaysSyncSend;
163    private boolean watchTopicAdvisories = true;
164    private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
165    private long warnAboutUnstartedConnectionTimeout = 500L;
166    private int sendTimeout = 0;
167    private boolean sendAcksAsync=true;
168    private TransportListener transportListener;
169    private ExceptionListener exceptionListener;
170    private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
171    private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
172    private boolean useDedicatedTaskRunner;
173    private long consumerFailoverRedeliveryWaitPeriod = 0;
174    private boolean checkForDuplicates = true;
175    private ClientInternalExceptionListener clientInternalExceptionListener;
176    private boolean messagePrioritySupported = false;
177    private boolean transactedIndividualAck = false;
178    private boolean nonBlockingRedelivery = false;
179    private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
180    private TaskRunnerFactory sessionTaskRunner;
181    private RejectedExecutionHandler rejectedTaskHandler = null;
182    protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class
183    private boolean rmIdFromConnectionId = false;
184    private boolean consumerExpiryCheckEnabled = true;
185
186    // /////////////////////////////////////////////
187    //
188    // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
189    //
190    // /////////////////////////////////////////////
191
192    public ActiveMQConnectionFactory() {
193        this(DEFAULT_BROKER_URL);
194    }
195
196    public ActiveMQConnectionFactory(String brokerURL) {
197        this(createURI(brokerURL));
198    }
199
200    public ActiveMQConnectionFactory(URI brokerURL) {
201        setBrokerURL(brokerURL.toString());
202    }
203
204    public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
205        setUserName(userName);
206        setPassword(password);
207        setBrokerURL(brokerURL.toString());
208    }
209
210    public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
211        setUserName(userName);
212        setPassword(password);
213        setBrokerURL(brokerURL);
214    }
215
216    /**
217     * Returns a copy of the given connection factory
218     */
219    public ActiveMQConnectionFactory copy() {
220        try {
221            return (ActiveMQConnectionFactory)super.clone();
222        } catch (CloneNotSupportedException e) {
223            throw new RuntimeException("This should never happen: " + e, e);
224        }
225    }
226
227    /*boolean*
228     * @param brokerURL
229     * @return
230     * @throws URISyntaxException
231     */
232    private static URI createURI(String brokerURL) {
233        try {
234            return new URI(brokerURL);
235        } catch (URISyntaxException e) {
236            throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
237        }
238    }
239
240    /**
241     * @return Returns the Connection.
242     */
243    @Override
244    public Connection createConnection() throws JMSException {
245        return createActiveMQConnection();
246    }
247
248    /**
249     * @return Returns the Connection.
250     */
251    @Override
252    public Connection createConnection(String userName, String password) throws JMSException {
253        return createActiveMQConnection(userName, password);
254    }
255
256    /**
257     * @return Returns the QueueConnection.
258     * @throws JMSException
259     */
260    @Override
261    public QueueConnection createQueueConnection() throws JMSException {
262        return createActiveMQConnection().enforceQueueOnlyConnection();
263    }
264
265    /**
266     * @return Returns the QueueConnection.
267     */
268    @Override
269    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
270        return createActiveMQConnection(userName, password).enforceQueueOnlyConnection();
271    }
272
273    /**
274     * @return Returns the TopicConnection.
275     * @throws JMSException
276     */
277    @Override
278    public TopicConnection createTopicConnection() throws JMSException {
279        return createActiveMQConnection();
280    }
281
282    /**
283     * @return Returns the TopicConnection.
284     */
285    @Override
286    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
287        return createActiveMQConnection(userName, password);
288    }
289
290    /**
291     * @returns the StatsImpl associated with this ConnectionFactory.
292     */
293    @Override
294    public StatsImpl getStats() {
295        return this.factoryStats;
296    }
297
298    // /////////////////////////////////////////////
299    //
300    // Implementation methods.
301    //
302    // /////////////////////////////////////////////
303
304    protected ActiveMQConnection createActiveMQConnection() throws JMSException {
305        return createActiveMQConnection(userName, password);
306    }
307
308    /**
309     * Creates a Transport based on this object's connection settings. Separated
310     * from createActiveMQConnection to allow for subclasses to override.
311     *
312     * @return The newly created Transport.
313     * @throws JMSException If unable to create trasnport.
314     */
315    protected Transport createTransport() throws JMSException {
316        try {
317            return TransportFactory.connect(brokerURL);
318        } catch (Exception e) {
319            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
320        }
321    }
322
323    /**
324     * @return Returns the Connection.
325     */
326    protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
327        if (brokerURL == null) {
328            throw new ConfigurationException("brokerURL not set.");
329        }
330        ActiveMQConnection connection = null;
331        try {
332            Transport transport = createTransport();
333            connection = createActiveMQConnection(transport, factoryStats);
334
335            connection.setUserName(userName);
336            connection.setPassword(password);
337
338            configureConnection(connection);
339
340            transport.start();
341
342            if (clientID != null) {
343                connection.setDefaultClientID(clientID);
344            }
345
346            return connection;
347        } catch (JMSException e) {
348            // Clean up!
349            try {
350                connection.close();
351            } catch (Throwable ignore) {
352            }
353            throw e;
354        } catch (Exception e) {
355            // Clean up!
356            try {
357                connection.close();
358            } catch (Throwable ignore) {
359            }
360            throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
361        }
362    }
363
364    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
365        ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
366                getConnectionIdGenerator(), stats);
367        return connection;
368    }
369
370    protected void configureConnection(ActiveMQConnection connection) throws JMSException {
371        connection.setPrefetchPolicy(getPrefetchPolicy());
372        connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
373        connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
374        connection.setCopyMessageOnSend(isCopyMessageOnSend());
375        connection.setUseCompression(isUseCompression());
376        connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
377        connection.setDispatchAsync(isDispatchAsync());
378        connection.setUseAsyncSend(isUseAsyncSend());
379        connection.setAlwaysSyncSend(isAlwaysSyncSend());
380        connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
381        connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
382        connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
383        connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
384        connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
385        connection.setExclusiveConsumer(isExclusiveConsumer());
386        connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
387        connection.setTransformer(getTransformer());
388        connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
389        connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
390        connection.setProducerWindowSize(getProducerWindowSize());
391        connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
392        connection.setSendTimeout(getSendTimeout());
393        connection.setCloseTimeout(getCloseTimeout());
394        connection.setSendAcksAsync(isSendAcksAsync());
395        connection.setAuditDepth(getAuditDepth());
396        connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
397        connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
398        connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
399        connection.setCheckForDuplicates(isCheckForDuplicates());
400        connection.setMessagePrioritySupported(isMessagePrioritySupported());
401        connection.setTransactedIndividualAck(isTransactedIndividualAck());
402        connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
403        connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
404        connection.setSessionTaskRunner(getSessionTaskRunner());
405        connection.setRejectedTaskHandler(getRejectedTaskHandler());
406        connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
407        connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
408        connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled());
409        if (transportListener != null) {
410            connection.addTransportListener(transportListener);
411        }
412        if (exceptionListener != null) {
413            connection.setExceptionListener(exceptionListener);
414        }
415        if (clientInternalExceptionListener != null) {
416            connection.setClientInternalExceptionListener(clientInternalExceptionListener);
417        }
418    }
419
420    // /////////////////////////////////////////////
421    //
422    // Property Accessors
423    //
424    // /////////////////////////////////////////////
425
426    public String getBrokerURL() {
427        return brokerURL == null ? null : brokerURL.toString();
428    }
429
430    /**
431     * Sets the <a
432     * href="http://activemq.apache.org/configuring-transports.html">connection
433     * URL</a> used to connect to the ActiveMQ broker.
434     */
435    public void setBrokerURL(String brokerURL) {
436        this.brokerURL = createURI(brokerURL);
437
438        // Use all the properties prefixed with 'jms.' to set the connection
439        // factory
440        // options.
441        if (this.brokerURL.getQuery() != null) {
442            // It might be a standard URI or...
443            try {
444
445                Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery());
446                Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms.");
447                if (buildFromMap(jmsOptionsMap)) {
448                    if (!jmsOptionsMap.isEmpty()) {
449                        String msg = "There are " + jmsOptionsMap.size()
450                            + " jms options that couldn't be set on the ConnectionFactory."
451                            + " Check the options are spelled correctly."
452                            + " Unknown parameters=[" + jmsOptionsMap + "]."
453                            + " This connection factory cannot be started.";
454                        throw new IllegalArgumentException(msg);
455                    }
456
457                    this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
458                }
459
460            } catch (URISyntaxException e) {
461            }
462
463        } else {
464
465            // It might be a composite URI.
466            try {
467                CompositeData data = URISupport.parseComposite(this.brokerURL);
468                Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms.");
469                if (buildFromMap(jmsOptionsMap)) {
470                    if (!jmsOptionsMap.isEmpty()) {
471                        String msg = "There are " + jmsOptionsMap.size()
472                            + " jms options that couldn't be set on the ConnectionFactory."
473                            + " Check the options are spelled correctly."
474                            + " Unknown parameters=[" + jmsOptionsMap + "]."
475                            + " This connection factory cannot be started.";
476                        throw new IllegalArgumentException(msg);
477                    }
478
479                    this.brokerURL = data.toURI();
480                }
481            } catch (URISyntaxException e) {
482            }
483        }
484    }
485
486    public String getClientID() {
487        return clientID;
488    }
489
490    /**
491     * Sets the JMS clientID to use for the created connection. Note that this
492     * can only be used by one connection at once so generally its a better idea
493     * to set the clientID on a Connection
494     */
495    public void setClientID(String clientID) {
496        this.clientID = clientID;
497    }
498
499    public boolean isCopyMessageOnSend() {
500        return copyMessageOnSend;
501    }
502
503    /**
504     * Should a JMS message be copied to a new JMS Message object as part of the
505     * send() method in JMS. This is enabled by default to be compliant with the
506     * JMS specification. You can disable it if you do not mutate JMS messages
507     * after they are sent for a performance boost
508     */
509    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
510        this.copyMessageOnSend = copyMessageOnSend;
511    }
512
513    public boolean isDisableTimeStampsByDefault() {
514        return disableTimeStampsByDefault;
515    }
516
517    /**
518     * Sets whether or not timestamps on messages should be disabled or not. If
519     * you disable them it adds a small performance boost.
520     */
521    public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
522        this.disableTimeStampsByDefault = disableTimeStampsByDefault;
523    }
524
525    public boolean isOptimizedMessageDispatch() {
526        return optimizedMessageDispatch;
527    }
528
529    /**
530     * If this flag is set then an larger prefetch limit is used - only
531     * applicable for durable topic subscribers.
532     */
533    public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
534        this.optimizedMessageDispatch = optimizedMessageDispatch;
535    }
536
537    public String getPassword() {
538        return password;
539    }
540
541    /**
542     * Sets the JMS password used for connections created from this factory
543     */
544    public void setPassword(String password) {
545        this.password = password;
546    }
547
548    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
549        return prefetchPolicy;
550    }
551
552    /**
553     * Sets the <a
554     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
555     * policy</a> for consumers created by this connection.
556     */
557    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
558        this.prefetchPolicy = prefetchPolicy;
559    }
560
561    public boolean isUseAsyncSend() {
562        return useAsyncSend;
563    }
564
565    public BlobTransferPolicy getBlobTransferPolicy() {
566        return blobTransferPolicy;
567    }
568
569    /**
570     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
571     * OBjects) are transferred from producers to brokers to consumers
572     */
573    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
574        this.blobTransferPolicy = blobTransferPolicy;
575    }
576
577    /**
578     * Forces the use of <a
579     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
580     * adds a massive performance boost; but means that the send() method will
581     * return immediately whether the message has been sent or not which could
582     * lead to message loss.
583     */
584    public void setUseAsyncSend(boolean useAsyncSend) {
585        this.useAsyncSend = useAsyncSend;
586    }
587
588    public synchronized boolean isWatchTopicAdvisories() {
589        return watchTopicAdvisories;
590    }
591
592    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
593        this.watchTopicAdvisories = watchTopicAdvisories;
594    }
595
596    /**
597     * @return true if always sync send messages
598     */
599    public boolean isAlwaysSyncSend() {
600        return this.alwaysSyncSend;
601    }
602
603    /**
604     * Set true if always require messages to be sync sent
605     *
606     * @param alwaysSyncSend
607     */
608    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
609        this.alwaysSyncSend = alwaysSyncSend;
610    }
611
612    public String getUserName() {
613        return userName;
614    }
615
616    /**
617     * Sets the JMS userName used by connections created by this factory
618     */
619    public void setUserName(String userName) {
620        this.userName = userName;
621    }
622
623    public boolean isUseRetroactiveConsumer() {
624        return useRetroactiveConsumer;
625    }
626
627    /**
628     * Sets whether or not retroactive consumers are enabled. Retroactive
629     * consumers allow non-durable topic subscribers to receive old messages
630     * that were published before the non-durable subscriber started.
631     */
632    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
633        this.useRetroactiveConsumer = useRetroactiveConsumer;
634    }
635
636    public boolean isExclusiveConsumer() {
637        return exclusiveConsumer;
638    }
639
640    /**
641     * Enables or disables whether or not queue consumers should be exclusive or
642     * not for example to preserve ordering when not using <a
643     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
644     *
645     * @param exclusiveConsumer
646     */
647    public void setExclusiveConsumer(boolean exclusiveConsumer) {
648        this.exclusiveConsumer = exclusiveConsumer;
649    }
650
651    public RedeliveryPolicy getRedeliveryPolicy() {
652        return redeliveryPolicyMap.getDefaultEntry();
653    }
654
655    /**
656     * Sets the global default redelivery policy to be used when a message is delivered
657     * but the session is rolled back
658     */
659    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
660        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
661    }
662
663    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
664        return this.redeliveryPolicyMap;
665    }
666
667    /**
668     * Sets the global redelivery policy mapping to be used when a message is delivered
669     * but the session is rolled back
670     */
671    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
672        this.redeliveryPolicyMap = redeliveryPolicyMap;
673    }
674
675    public MessageTransformer getTransformer() {
676        return transformer;
677    }
678
679    /**
680     * @return the sendTimeout (in milliseconds)
681     */
682    public int getSendTimeout() {
683        return sendTimeout;
684    }
685
686    /**
687     * @param sendTimeout the sendTimeout to set (in milliseconds)
688     */
689    public void setSendTimeout(int sendTimeout) {
690        this.sendTimeout = sendTimeout;
691    }
692
693    /**
694     * @return the sendAcksAsync
695     */
696    public boolean isSendAcksAsync() {
697        return sendAcksAsync;
698    }
699
700    /**
701     * @param sendAcksAsync the sendAcksAsync to set
702     */
703    public void setSendAcksAsync(boolean sendAcksAsync) {
704        this.sendAcksAsync = sendAcksAsync;
705    }
706
707    /**
708     * @return the messagePrioritySupported
709     */
710    public boolean isMessagePrioritySupported() {
711        return this.messagePrioritySupported;
712    }
713
714    /**
715     * @param messagePrioritySupported the messagePrioritySupported to set
716     */
717    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
718        this.messagePrioritySupported = messagePrioritySupported;
719    }
720
721
722    /**
723     * Sets the transformer used to transform messages before they are sent on
724     * to the JMS bus or when they are received from the bus but before they are
725     * delivered to the JMS client
726     */
727    public void setTransformer(MessageTransformer transformer) {
728        this.transformer = transformer;
729    }
730
731    @SuppressWarnings({ "unchecked", "rawtypes" })
732    @Override
733    public void buildFromProperties(Properties properties) {
734
735        if (properties == null) {
736            properties = new Properties();
737        }
738
739        String temp = properties.getProperty(Context.PROVIDER_URL);
740        if (temp == null || temp.length() == 0) {
741            temp = properties.getProperty("brokerURL");
742        }
743        if (temp != null && temp.length() > 0) {
744            setBrokerURL(temp);
745        }
746
747        Map<String, Object> p = new HashMap(properties);
748        buildFromMap(p);
749    }
750
751    public boolean buildFromMap(Map<String, Object> properties) {
752        boolean rc = false;
753
754        ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
755        if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
756            setPrefetchPolicy(p);
757            rc = true;
758        }
759
760        RedeliveryPolicy rp = new RedeliveryPolicy();
761        if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
762            setRedeliveryPolicy(rp);
763            rc = true;
764        }
765
766        BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
767        if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
768            setBlobTransferPolicy(blobTransferPolicy);
769            rc = true;
770        }
771
772        rc |= IntrospectionSupport.setProperties(this, properties);
773
774        return rc;
775    }
776
777    @Override
778    public void populateProperties(Properties props) {
779        props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
780
781        if (getBrokerURL() != null) {
782            props.setProperty(Context.PROVIDER_URL, getBrokerURL());
783            props.setProperty("brokerURL", getBrokerURL());
784        }
785
786        if (getClientID() != null) {
787            props.setProperty("clientID", getClientID());
788        }
789
790        IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
791        IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
792        IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
793
794        props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
795        props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
796        props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
797        props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
798
799        if (getPassword() != null) {
800            props.setProperty("password", getPassword());
801        }
802
803        props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
804        props.setProperty("useCompression", Boolean.toString(isUseCompression()));
805        props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
806        props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
807
808        if (getUserName() != null) {
809            props.setProperty("userName", getUserName());
810        }
811
812        props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
813        props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
814        props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
815        props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
816        props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
817        props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
818        props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
819        props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
820        props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
821        props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
822        props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
823        props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
824        props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck()));
825        props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery()));
826        props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize()));
827        props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
828        props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod()));
829        props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId()));
830        props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled()));
831    }
832
833    public boolean isUseCompression() {
834        return useCompression;
835    }
836
837    /**
838     * Enables the use of compression of the message bodies
839     */
840    public void setUseCompression(boolean useCompression) {
841        this.useCompression = useCompression;
842    }
843
844    public boolean isObjectMessageSerializationDefered() {
845        return objectMessageSerializationDefered;
846    }
847
848    /**
849     * When an object is set on an ObjectMessage, the JMS spec requires the
850     * object to be serialized by that set method. Enabling this flag causes the
851     * object to not get serialized. The object may subsequently get serialized
852     * if the message needs to be sent over a socket or stored to disk.
853     */
854    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
855        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
856    }
857
858    public boolean isDispatchAsync() {
859        return dispatchAsync;
860    }
861
862    /**
863     * Enables or disables the default setting of whether or not consumers have
864     * their messages <a
865     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
866     * synchronously or asynchronously by the broker</a>. For non-durable
867     * topics for example we typically dispatch synchronously by default to
868     * minimize context switches which boost performance. However sometimes its
869     * better to go slower to ensure that a single blocked consumer socket does
870     * not block delivery to other consumers.
871     *
872     * @param asyncDispatch If true then consumers created on this connection
873     *                will default to having their messages dispatched
874     *                asynchronously. The default value is true.
875     */
876    public void setDispatchAsync(boolean asyncDispatch) {
877        this.dispatchAsync = asyncDispatch;
878    }
879
880    /**
881     * @return Returns the closeTimeout.
882     */
883    public int getCloseTimeout() {
884        return closeTimeout;
885    }
886
887    /**
888     * Sets the timeout before a close is considered complete. Normally a
889     * close() on a connection waits for confirmation from the broker; this
890     * allows that operation to timeout to save the client hanging if there is
891     * no broker
892     */
893    public void setCloseTimeout(int closeTimeout) {
894        this.closeTimeout = closeTimeout;
895    }
896
897    /**
898     * @return Returns the alwaysSessionAsync.
899     */
900    public boolean isAlwaysSessionAsync() {
901        return alwaysSessionAsync;
902    }
903
904    /**
905     * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
906     * the Connection. However, a separate thread is always used if there is more than one session, or the session
907     * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
908     * happens asynchronously.
909     */
910    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
911        this.alwaysSessionAsync = alwaysSessionAsync;
912    }
913
914    /**
915     * @return Returns the optimizeAcknowledge.
916     */
917    public boolean isOptimizeAcknowledge() {
918        return optimizeAcknowledge;
919    }
920
921    /**
922     * @param optimizeAcknowledge The optimizeAcknowledge to set.
923     */
924    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
925        this.optimizeAcknowledge = optimizeAcknowledge;
926    }
927
928    /**
929     * The max time in milliseconds between optimized ack batches
930     * @param optimizeAcknowledgeTimeOut
931     */
932    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
933        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
934    }
935
936    public long getOptimizeAcknowledgeTimeOut() {
937        return optimizeAcknowledgeTimeOut;
938    }
939
940    public boolean isNestedMapAndListEnabled() {
941        return nestedMapAndListEnabled;
942    }
943
944    /**
945     * Enables/disables whether or not Message properties and MapMessage entries
946     * support <a
947     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
948     * Structures</a> of Map and List objects
949     */
950    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
951        this.nestedMapAndListEnabled = structuredMapsEnabled;
952    }
953
954    public String getClientIDPrefix() {
955        return clientIDPrefix;
956    }
957
958    /**
959     * Sets the prefix used by autogenerated JMS Client ID values which are used
960     * if the JMS client does not explicitly specify on.
961     *
962     * @param clientIDPrefix
963     */
964    public void setClientIDPrefix(String clientIDPrefix) {
965        this.clientIDPrefix = clientIDPrefix;
966    }
967
968    protected synchronized IdGenerator getClientIdGenerator() {
969        if (clientIdGenerator == null) {
970            if (clientIDPrefix != null) {
971                clientIdGenerator = new IdGenerator(clientIDPrefix);
972            } else {
973                clientIdGenerator = new IdGenerator();
974            }
975        }
976        return clientIdGenerator;
977    }
978
979    protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
980        this.clientIdGenerator = clientIdGenerator;
981    }
982
983    /**
984     * Sets the prefix used by connection id generator
985     * @param connectionIDPrefix
986     */
987    public void setConnectionIDPrefix(String connectionIDPrefix) {
988        this.connectionIDPrefix = connectionIDPrefix;
989    }
990
991    protected synchronized IdGenerator getConnectionIdGenerator() {
992        if (connectionIdGenerator == null) {
993            if (connectionIDPrefix != null) {
994                connectionIdGenerator = new IdGenerator(connectionIDPrefix);
995            } else {
996                connectionIdGenerator = new IdGenerator();
997            }
998        }
999        return connectionIdGenerator;
1000    }
1001
1002    protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
1003        this.connectionIdGenerator = connectionIdGenerator;
1004    }
1005
1006    /**
1007     * @return the statsEnabled
1008     */
1009    public boolean isStatsEnabled() {
1010        return this.factoryStats.isEnabled();
1011    }
1012
1013    /**
1014     * @param statsEnabled the statsEnabled to set
1015     */
1016    public void setStatsEnabled(boolean statsEnabled) {
1017        this.factoryStats.setEnabled(statsEnabled);
1018    }
1019
1020    public synchronized int getProducerWindowSize() {
1021        return producerWindowSize;
1022    }
1023
1024    public synchronized void setProducerWindowSize(int producerWindowSize) {
1025        this.producerWindowSize = producerWindowSize;
1026    }
1027
1028    public long getWarnAboutUnstartedConnectionTimeout() {
1029        return warnAboutUnstartedConnectionTimeout;
1030    }
1031
1032    /**
1033     * Enables the timeout from a connection creation to when a warning is
1034     * generated if the connection is not properly started via
1035     * {@link Connection#start()} and a message is received by a consumer. It is
1036     * a very common gotcha to forget to <a
1037     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1038     * the connection</a> so this option makes the default case to create a
1039     * warning if the user forgets. To disable the warning just set the value to <
1040     * 0 (say -1).
1041     */
1042    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1043        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1044    }
1045
1046    public TransportListener getTransportListener() {
1047        return transportListener;
1048    }
1049
1050    /**
1051     * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
1052     * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
1053     * a transport listener.
1054     *
1055     * @param transportListener sets the listener to be registered on all connections
1056     * created by this factory
1057     */
1058    public void setTransportListener(TransportListener transportListener) {
1059        this.transportListener = transportListener;
1060    }
1061
1062
1063    public ExceptionListener getExceptionListener() {
1064        return exceptionListener;
1065    }
1066
1067    /**
1068     * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
1069     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1070     * an exception listener.
1071     * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
1072     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1073     * @param exceptionListener sets the exception listener to be registered on all connections
1074     * created by this factory
1075     */
1076    public void setExceptionListener(ExceptionListener exceptionListener) {
1077        this.exceptionListener = exceptionListener;
1078    }
1079
1080    public int getAuditDepth() {
1081        return auditDepth;
1082    }
1083
1084    public void setAuditDepth(int auditDepth) {
1085        this.auditDepth = auditDepth;
1086    }
1087
1088    public int getAuditMaximumProducerNumber() {
1089        return auditMaximumProducerNumber;
1090    }
1091
1092    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
1093        this.auditMaximumProducerNumber = auditMaximumProducerNumber;
1094    }
1095
1096    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1097        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1098    }
1099
1100    public boolean isUseDedicatedTaskRunner() {
1101        return useDedicatedTaskRunner;
1102    }
1103
1104    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
1105        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
1106    }
1107
1108    public long getConsumerFailoverRedeliveryWaitPeriod() {
1109        return consumerFailoverRedeliveryWaitPeriod;
1110    }
1111
1112    public ClientInternalExceptionListener getClientInternalExceptionListener() {
1113        return clientInternalExceptionListener;
1114    }
1115
1116    /**
1117     * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
1118     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1119     * an exception listener.
1120     * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
1121     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1122     * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
1123     * created by this factory
1124     */
1125    public void setClientInternalExceptionListener(
1126            ClientInternalExceptionListener clientInternalExceptionListener) {
1127        this.clientInternalExceptionListener = clientInternalExceptionListener;
1128    }
1129
1130    /**
1131     * @return the checkForDuplicates
1132     */
1133    public boolean isCheckForDuplicates() {
1134        return this.checkForDuplicates;
1135    }
1136
1137    /**
1138     * @param checkForDuplicates the checkForDuplicates to set
1139     */
1140    public void setCheckForDuplicates(boolean checkForDuplicates) {
1141        this.checkForDuplicates = checkForDuplicates;
1142    }
1143
1144    public boolean isTransactedIndividualAck() {
1145         return transactedIndividualAck;
1146     }
1147
1148     /**
1149      * when true, submit individual transacted acks immediately rather than with transaction completion.
1150      * This allows the acks to represent delivery status which can be persisted on rollback
1151      * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean)  true
1152      */
1153     public void setTransactedIndividualAck(boolean transactedIndividualAck) {
1154         this.transactedIndividualAck = transactedIndividualAck;
1155     }
1156
1157
1158     public boolean isNonBlockingRedelivery() {
1159         return nonBlockingRedelivery;
1160     }
1161
1162     /**
1163      * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
1164      * from a rolled back transaction.  This implies that message order will not be preserved and
1165      * also will result in the TransactedIndividualAck option to be enabled.
1166      */
1167     public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
1168         this.nonBlockingRedelivery = nonBlockingRedelivery;
1169     }
1170
1171    public int getMaxThreadPoolSize() {
1172        return maxThreadPoolSize;
1173    }
1174
1175    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
1176        this.maxThreadPoolSize = maxThreadPoolSize;
1177    }
1178
1179    public TaskRunnerFactory getSessionTaskRunner() {
1180        return sessionTaskRunner;
1181    }
1182
1183    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1184        this.sessionTaskRunner = sessionTaskRunner;
1185    }
1186
1187    public RejectedExecutionHandler getRejectedTaskHandler() {
1188        return rejectedTaskHandler;
1189    }
1190
1191    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
1192        this.rejectedTaskHandler = rejectedTaskHandler;
1193    }
1194
1195    /**
1196     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
1197     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
1198     * will not do any background Message acknowledgment.
1199     *
1200     * @return the scheduledOptimizedAckInterval
1201     */
1202    public long getOptimizedAckScheduledAckInterval() {
1203        return optimizedAckScheduledAckInterval;
1204    }
1205
1206    /**
1207     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
1208     * have been configured with optimizeAcknowledge enabled.
1209     *
1210     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
1211     */
1212    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
1213        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
1214    }
1215
1216
1217    public boolean isRmIdFromConnectionId() {
1218        return rmIdFromConnectionId;
1219    }
1220
1221    /**
1222     * uses the connection id as the resource identity for XAResource.isSameRM
1223     * ensuring join will only occur on a single connection
1224     */
1225    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
1226        this.rmIdFromConnectionId = rmIdFromConnectionId;
1227    }
1228
1229    /**
1230     * @return true if MessageConsumer instance will check for expired messages before dispatch.
1231     */
1232    public boolean isConsumerExpiryCheckEnabled() {
1233        return consumerExpiryCheckEnabled;
1234    }
1235
1236    /**
1237     * Controls whether message expiration checking is done in each MessageConsumer
1238     * prior to dispatching a message.  Disabling this check can lead to consumption
1239     * of expired messages.
1240     *
1241     * @param consumerExpiryCheckEnabled
1242     *        controls whether expiration checking is done prior to dispatch.
1243     */
1244    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
1245        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
1246    }
1247}