001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.jms.pool;
019
020import java.util.List;
021import java.util.concurrent.CopyOnWriteArrayList;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024import javax.jms.Connection;
025import javax.jms.ExceptionListener;
026import javax.jms.IllegalStateException;
027import javax.jms.JMSException;
028import javax.jms.Session;
029import javax.jms.TemporaryQueue;
030import javax.jms.TemporaryTopic;
031
032import org.apache.commons.pool.KeyedPoolableObjectFactory;
033import org.apache.commons.pool.impl.GenericKeyedObjectPool;
034import org.apache.commons.pool.impl.GenericObjectPool;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Holds a real JMS connection along with the session pools associated with it.
040 * <p/>
041 * Instances of this class are shared amongst one or more PooledConnection object and must
042 * track the session objects that are loaned out for cleanup on close as well as ensuring
043 * that the temporary destinations of the managed Connection are purged when all references
044 * to this ConnectionPool are released.
045 */
046public class ConnectionPool implements ExceptionListener {
047    private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);
048
049    protected Connection connection;
050    private int referenceCount;
051    private long lastUsed = System.currentTimeMillis();
052    private final long firstUsed = lastUsed;
053    private boolean hasExpired;
054    private int idleTimeout = 30 * 1000;
055    private long expiryTimeout = 0l;
056    private boolean useAnonymousProducers = true;
057
058    private final AtomicBoolean started = new AtomicBoolean(false);
059    private final GenericKeyedObjectPool<SessionKey, SessionHolder> sessionPool;
060    private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
061    private boolean reconnectOnException;
062    private ExceptionListener parentExceptionListener;
063
064    public ConnectionPool(Connection connection) {
065
066        this.connection = wrap(connection);
067
068        // Create our internal Pool of session instances.
069        this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>(
070            new KeyedPoolableObjectFactory<SessionKey, SessionHolder>() {
071
072                @Override
073                public void activateObject(SessionKey key, SessionHolder session) throws Exception {
074                }
075
076                @Override
077                public void destroyObject(SessionKey key, SessionHolder session) throws Exception {
078                    session.close();
079                }
080
081                @Override
082                public SessionHolder makeObject(SessionKey key) throws Exception {
083                    return new SessionHolder(makeSession(key));
084                }
085
086                @Override
087                public void passivateObject(SessionKey key, SessionHolder session) throws Exception {
088                }
089
090                @Override
091                public boolean validateObject(SessionKey key, SessionHolder session) {
092                    return true;
093                }
094            }
095        );
096    }
097
098    // useful when external failure needs to force expiry
099    public void setHasExpired(boolean val) {
100        hasExpired = val;
101    }
102
103    protected Session makeSession(SessionKey key) throws JMSException {
104        return connection.createSession(key.isTransacted(), key.getAckMode());
105    }
106
107    protected Connection wrap(Connection connection) {
108        return connection;
109    }
110
111    protected void unWrap(Connection connection) {
112    }
113
114    public void start() throws JMSException {
115        if (started.compareAndSet(false, true)) {
116            try {
117                connection.start();
118            } catch (JMSException e) {
119                started.set(false);
120                throw(e);
121            }
122        }
123    }
124
125    public synchronized Connection getConnection() {
126        return connection;
127    }
128
129    public Session createSession(boolean transacted, int ackMode) throws JMSException {
130        SessionKey key = new SessionKey(transacted, ackMode);
131        PooledSession session;
132        try {
133            session = new PooledSession(key, sessionPool.borrowObject(key), sessionPool, key.isTransacted(), useAnonymousProducers);
134            session.addSessionEventListener(new PooledSessionEventListener() {
135
136                @Override
137                public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
138                }
139
140                @Override
141                public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
142                }
143
144                @Override
145                public void onSessionClosed(PooledSession session) {
146                    ConnectionPool.this.loanedSessions.remove(session);
147                }
148            });
149            this.loanedSessions.add(session);
150        } catch (Exception e) {
151            IllegalStateException illegalStateException = new IllegalStateException(e.toString());
152            illegalStateException.initCause(e);
153            throw illegalStateException;
154        }
155        return session;
156    }
157
158    public synchronized void close() {
159        if (connection != null) {
160            try {
161                sessionPool.close();
162            } catch (Exception e) {
163            } finally {
164                try {
165                    connection.close();
166                } catch (Exception e) {
167                } finally {
168                    connection = null;
169                }
170            }
171        }
172    }
173
174    public synchronized void incrementReferenceCount() {
175        referenceCount++;
176        lastUsed = System.currentTimeMillis();
177    }
178
179    public synchronized void decrementReferenceCount() {
180        referenceCount--;
181        lastUsed = System.currentTimeMillis();
182        if (referenceCount == 0) {
183            // Loaned sessions are those that are active in the sessionPool and
184            // have not been closed by the client before closing the connection.
185            // These need to be closed so that all session's reflect the fact
186            // that the parent Connection is closed.
187            for (PooledSession session : this.loanedSessions) {
188                try {
189                    session.close();
190                } catch (Exception e) {
191                }
192            }
193            this.loanedSessions.clear();
194
195            unWrap(getConnection());
196
197            expiredCheck();
198        }
199    }
200
201    /**
202     * Determines if this Connection has expired.
203     * <p/>
204     * A ConnectionPool is considered expired when all references to it are released AND either
205     * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed.
206     * Once a ConnectionPool is determined to have expired its underlying Connection is closed.
207     *
208     * @return true if this connection has expired.
209     */
210    public synchronized boolean expiredCheck() {
211
212        boolean expired = false;
213
214        if (connection == null) {
215            return true;
216        }
217
218        if (hasExpired) {
219            if (referenceCount == 0) {
220                close();
221                expired = true;
222            }
223        }
224
225        if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
226            hasExpired = true;
227            if (referenceCount == 0) {
228                close();
229                expired = true;
230            }
231        }
232
233        // Only set hasExpired here is no references, as a Connection with references is by
234        // definition not idle at this time.
235        if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) {
236            hasExpired = true;
237            close();
238            expired = true;
239        }
240
241        return expired;
242    }
243
244    public int getIdleTimeout() {
245        return idleTimeout;
246    }
247
248    public void setIdleTimeout(int idleTimeout) {
249        this.idleTimeout = idleTimeout;
250    }
251
252    public void setExpiryTimeout(long expiryTimeout) {
253        this.expiryTimeout = expiryTimeout;
254    }
255
256    public long getExpiryTimeout() {
257        return expiryTimeout;
258    }
259
260    public int getMaximumActiveSessionPerConnection() {
261        return this.sessionPool.getMaxActive();
262    }
263
264    public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
265        this.sessionPool.setMaxActive(maximumActiveSessionPerConnection);
266    }
267
268    public boolean isUseAnonymousProducers() {
269        return this.useAnonymousProducers;
270    }
271
272    public void setUseAnonymousProducers(boolean value) {
273        this.useAnonymousProducers = value;
274    }
275
276    /**
277     * @return the total number of Pooled session including idle sessions that are not
278     *          currently loaned out to any client.
279     */
280    public int getNumSessions() {
281        return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive();
282    }
283
284    /**
285     * @return the total number of Sessions that are in the Session pool but not loaned out.
286     */
287    public int getNumIdleSessions() {
288        return this.sessionPool.getNumIdle();
289    }
290
291    /**
292     * @return the total number of Session's that have been loaned to PooledConnection instances.
293     */
294    public int getNumActiveSessions() {
295        return this.sessionPool.getNumActive();
296    }
297
298    /**
299     * Configure whether the createSession method should block when there are no more idle sessions and the
300     * pool already contains the maximum number of active sessions.  If false the create method will fail
301     * and throw an exception.
302     *
303     * @param block
304     *          Indicates whether blocking should be used to wait for more space to create a session.
305     */
306    public void setBlockIfSessionPoolIsFull(boolean block) {
307        this.sessionPool.setWhenExhaustedAction(
308                (block ? GenericObjectPool.WHEN_EXHAUSTED_BLOCK : GenericObjectPool.WHEN_EXHAUSTED_FAIL));
309    }
310
311    public boolean isBlockIfSessionPoolIsFull() {
312        return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
313    }
314
315    /**
316     * Returns the timeout to use for blocking creating new sessions
317     *
318     * @return true if the pooled Connection createSession method will block when the limit is hit.
319     * @see #setBlockIfSessionPoolIsFull(boolean)
320     */
321    public long getBlockIfSessionPoolIsFullTimeout() {
322        return this.sessionPool.getMaxWait();
323    }
324
325    /**
326     * Controls the behavior of the internal session pool. By default the call to
327     * Connection.getSession() will block if the session pool is full.  This setting
328     * will affect how long it blocks and throws an exception after the timeout.
329     *
330     * The size of the session pool is controlled by the @see #maximumActive
331     * property.
332     *
333     * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull
334     * property
335     *
336     * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true,
337     *                                        then use this setting to configure how long to block before retry
338     */
339    public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
340        this.sessionPool.setMaxWait(blockIfSessionPoolIsFullTimeout);
341    }
342
343    /**
344     * @return true if the underlying connection will be renewed on JMSException, false otherwise
345     */
346    public boolean isReconnectOnException() {
347        return reconnectOnException;
348    }
349
350    /**
351     * Controls weather the underlying connection should be reset (and renewed) on JMSException
352     *
353     * @param reconnectOnException
354     *          Boolean value that configures whether reconnect on exception should happen
355     */
356    public void setReconnectOnException(boolean reconnectOnException) {
357        this.reconnectOnException = reconnectOnException;
358        try {
359            if (isReconnectOnException()) {
360                if (connection.getExceptionListener() != null) {
361                    parentExceptionListener = connection.getExceptionListener();
362                }
363                connection.setExceptionListener(this);
364            } else {
365                if (parentExceptionListener != null) {
366                    connection.setExceptionListener(parentExceptionListener);
367                }
368                parentExceptionListener = null;
369            }
370        } catch (JMSException jmse) {
371            LOG.warn("Cannot set reconnect exception listener", jmse);
372        }
373    }
374
375    @Override
376    public void onException(JMSException exception) {
377        close();
378        if (parentExceptionListener != null) {
379            parentExceptionListener.onException(exception);
380        }
381    }
382
383    @Override
384    public String toString() {
385        return "ConnectionPool[" + connection + "]";
386    }
387}