001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.jms.pool; 019 020import java.util.List; 021import java.util.concurrent.CopyOnWriteArrayList; 022import java.util.concurrent.atomic.AtomicBoolean; 023 024import javax.jms.Connection; 025import javax.jms.ExceptionListener; 026import javax.jms.IllegalStateException; 027import javax.jms.JMSException; 028import javax.jms.Session; 029import javax.jms.TemporaryQueue; 030import javax.jms.TemporaryTopic; 031 032import org.apache.commons.pool.KeyedPoolableObjectFactory; 033import org.apache.commons.pool.impl.GenericKeyedObjectPool; 034import org.apache.commons.pool.impl.GenericObjectPool; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Holds a real JMS connection along with the session pools associated with it. 040 * <p/> 041 * Instances of this class are shared amongst one or more PooledConnection object and must 042 * track the session objects that are loaned out for cleanup on close as well as ensuring 043 * that the temporary destinations of the managed Connection are purged when all references 044 * to this ConnectionPool are released. 045 */ 046public class ConnectionPool implements ExceptionListener { 047 private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class); 048 049 protected Connection connection; 050 private int referenceCount; 051 private long lastUsed = System.currentTimeMillis(); 052 private final long firstUsed = lastUsed; 053 private boolean hasExpired; 054 private int idleTimeout = 30 * 1000; 055 private long expiryTimeout = 0l; 056 private boolean useAnonymousProducers = true; 057 058 private final AtomicBoolean started = new AtomicBoolean(false); 059 private final GenericKeyedObjectPool<SessionKey, SessionHolder> sessionPool; 060 private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>(); 061 private boolean reconnectOnException; 062 private ExceptionListener parentExceptionListener; 063 064 public ConnectionPool(Connection connection) { 065 066 this.connection = wrap(connection); 067 068 // Create our internal Pool of session instances. 069 this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>( 070 new KeyedPoolableObjectFactory<SessionKey, SessionHolder>() { 071 072 @Override 073 public void activateObject(SessionKey key, SessionHolder session) throws Exception { 074 } 075 076 @Override 077 public void destroyObject(SessionKey key, SessionHolder session) throws Exception { 078 session.close(); 079 } 080 081 @Override 082 public SessionHolder makeObject(SessionKey key) throws Exception { 083 return new SessionHolder(makeSession(key)); 084 } 085 086 @Override 087 public void passivateObject(SessionKey key, SessionHolder session) throws Exception { 088 } 089 090 @Override 091 public boolean validateObject(SessionKey key, SessionHolder session) { 092 return true; 093 } 094 } 095 ); 096 } 097 098 // useful when external failure needs to force expiry 099 public void setHasExpired(boolean val) { 100 hasExpired = val; 101 } 102 103 protected Session makeSession(SessionKey key) throws JMSException { 104 return connection.createSession(key.isTransacted(), key.getAckMode()); 105 } 106 107 protected Connection wrap(Connection connection) { 108 return connection; 109 } 110 111 protected void unWrap(Connection connection) { 112 } 113 114 public void start() throws JMSException { 115 if (started.compareAndSet(false, true)) { 116 try { 117 connection.start(); 118 } catch (JMSException e) { 119 started.set(false); 120 throw(e); 121 } 122 } 123 } 124 125 public synchronized Connection getConnection() { 126 return connection; 127 } 128 129 public Session createSession(boolean transacted, int ackMode) throws JMSException { 130 SessionKey key = new SessionKey(transacted, ackMode); 131 PooledSession session; 132 try { 133 session = new PooledSession(key, sessionPool.borrowObject(key), sessionPool, key.isTransacted(), useAnonymousProducers); 134 session.addSessionEventListener(new PooledSessionEventListener() { 135 136 @Override 137 public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { 138 } 139 140 @Override 141 public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { 142 } 143 144 @Override 145 public void onSessionClosed(PooledSession session) { 146 ConnectionPool.this.loanedSessions.remove(session); 147 } 148 }); 149 this.loanedSessions.add(session); 150 } catch (Exception e) { 151 IllegalStateException illegalStateException = new IllegalStateException(e.toString()); 152 illegalStateException.initCause(e); 153 throw illegalStateException; 154 } 155 return session; 156 } 157 158 public synchronized void close() { 159 if (connection != null) { 160 try { 161 sessionPool.close(); 162 } catch (Exception e) { 163 } finally { 164 try { 165 connection.close(); 166 } catch (Exception e) { 167 } finally { 168 connection = null; 169 } 170 } 171 } 172 } 173 174 public synchronized void incrementReferenceCount() { 175 referenceCount++; 176 lastUsed = System.currentTimeMillis(); 177 } 178 179 public synchronized void decrementReferenceCount() { 180 referenceCount--; 181 lastUsed = System.currentTimeMillis(); 182 if (referenceCount == 0) { 183 // Loaned sessions are those that are active in the sessionPool and 184 // have not been closed by the client before closing the connection. 185 // These need to be closed so that all session's reflect the fact 186 // that the parent Connection is closed. 187 for (PooledSession session : this.loanedSessions) { 188 try { 189 session.close(); 190 } catch (Exception e) { 191 } 192 } 193 this.loanedSessions.clear(); 194 195 unWrap(getConnection()); 196 197 expiredCheck(); 198 } 199 } 200 201 /** 202 * Determines if this Connection has expired. 203 * <p/> 204 * A ConnectionPool is considered expired when all references to it are released AND either 205 * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed. 206 * Once a ConnectionPool is determined to have expired its underlying Connection is closed. 207 * 208 * @return true if this connection has expired. 209 */ 210 public synchronized boolean expiredCheck() { 211 212 boolean expired = false; 213 214 if (connection == null) { 215 return true; 216 } 217 218 if (hasExpired) { 219 if (referenceCount == 0) { 220 close(); 221 expired = true; 222 } 223 } 224 225 if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) { 226 hasExpired = true; 227 if (referenceCount == 0) { 228 close(); 229 expired = true; 230 } 231 } 232 233 // Only set hasExpired here is no references, as a Connection with references is by 234 // definition not idle at this time. 235 if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) { 236 hasExpired = true; 237 close(); 238 expired = true; 239 } 240 241 return expired; 242 } 243 244 public int getIdleTimeout() { 245 return idleTimeout; 246 } 247 248 public void setIdleTimeout(int idleTimeout) { 249 this.idleTimeout = idleTimeout; 250 } 251 252 public void setExpiryTimeout(long expiryTimeout) { 253 this.expiryTimeout = expiryTimeout; 254 } 255 256 public long getExpiryTimeout() { 257 return expiryTimeout; 258 } 259 260 public int getMaximumActiveSessionPerConnection() { 261 return this.sessionPool.getMaxActive(); 262 } 263 264 public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) { 265 this.sessionPool.setMaxActive(maximumActiveSessionPerConnection); 266 } 267 268 public boolean isUseAnonymousProducers() { 269 return this.useAnonymousProducers; 270 } 271 272 public void setUseAnonymousProducers(boolean value) { 273 this.useAnonymousProducers = value; 274 } 275 276 /** 277 * @return the total number of Pooled session including idle sessions that are not 278 * currently loaned out to any client. 279 */ 280 public int getNumSessions() { 281 return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive(); 282 } 283 284 /** 285 * @return the total number of Sessions that are in the Session pool but not loaned out. 286 */ 287 public int getNumIdleSessions() { 288 return this.sessionPool.getNumIdle(); 289 } 290 291 /** 292 * @return the total number of Session's that have been loaned to PooledConnection instances. 293 */ 294 public int getNumActiveSessions() { 295 return this.sessionPool.getNumActive(); 296 } 297 298 /** 299 * Configure whether the createSession method should block when there are no more idle sessions and the 300 * pool already contains the maximum number of active sessions. If false the create method will fail 301 * and throw an exception. 302 * 303 * @param block 304 * Indicates whether blocking should be used to wait for more space to create a session. 305 */ 306 public void setBlockIfSessionPoolIsFull(boolean block) { 307 this.sessionPool.setWhenExhaustedAction( 308 (block ? GenericObjectPool.WHEN_EXHAUSTED_BLOCK : GenericObjectPool.WHEN_EXHAUSTED_FAIL)); 309 } 310 311 public boolean isBlockIfSessionPoolIsFull() { 312 return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK; 313 } 314 315 /** 316 * Returns the timeout to use for blocking creating new sessions 317 * 318 * @return true if the pooled Connection createSession method will block when the limit is hit. 319 * @see #setBlockIfSessionPoolIsFull(boolean) 320 */ 321 public long getBlockIfSessionPoolIsFullTimeout() { 322 return this.sessionPool.getMaxWait(); 323 } 324 325 /** 326 * Controls the behavior of the internal session pool. By default the call to 327 * Connection.getSession() will block if the session pool is full. This setting 328 * will affect how long it blocks and throws an exception after the timeout. 329 * 330 * The size of the session pool is controlled by the @see #maximumActive 331 * property. 332 * 333 * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull 334 * property 335 * 336 * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true, 337 * then use this setting to configure how long to block before retry 338 */ 339 public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) { 340 this.sessionPool.setMaxWait(blockIfSessionPoolIsFullTimeout); 341 } 342 343 /** 344 * @return true if the underlying connection will be renewed on JMSException, false otherwise 345 */ 346 public boolean isReconnectOnException() { 347 return reconnectOnException; 348 } 349 350 /** 351 * Controls weather the underlying connection should be reset (and renewed) on JMSException 352 * 353 * @param reconnectOnException 354 * Boolean value that configures whether reconnect on exception should happen 355 */ 356 public void setReconnectOnException(boolean reconnectOnException) { 357 this.reconnectOnException = reconnectOnException; 358 try { 359 if (isReconnectOnException()) { 360 if (connection.getExceptionListener() != null) { 361 parentExceptionListener = connection.getExceptionListener(); 362 } 363 connection.setExceptionListener(this); 364 } else { 365 if (parentExceptionListener != null) { 366 connection.setExceptionListener(parentExceptionListener); 367 } 368 parentExceptionListener = null; 369 } 370 } catch (JMSException jmse) { 371 LOG.warn("Cannot set reconnect exception listener", jmse); 372 } 373 } 374 375 @Override 376 public void onException(JMSException exception) { 377 close(); 378 if (parentExceptionListener != null) { 379 parentExceptionListener.onException(exception); 380 } 381 } 382 383 @Override 384 public String toString() { 385 return "ConnectionPool[" + connection + "]"; 386 } 387}