001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.tool; 018 019import java.util.concurrent.atomic.AtomicInteger; 020 021import javax.jms.ConnectionFactory; 022import javax.jms.Destination; 023import javax.jms.JMSException; 024import javax.jms.Message; 025import javax.jms.MessageConsumer; 026import javax.jms.MessageListener; 027import javax.jms.Topic; 028 029import org.apache.activemq.command.ActiveMQDestination; 030import org.apache.activemq.tool.properties.JmsClientProperties; 031import org.apache.activemq.tool.properties.JmsConsumerProperties; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035public class JmsConsumerClient extends AbstractJmsMeasurableClient { 036 private static final Logger LOG = LoggerFactory.getLogger(JmsConsumerClient.class); 037 038 protected MessageConsumer jmsConsumer; 039 protected JmsConsumerProperties client; 040 041 public JmsConsumerClient(ConnectionFactory factory) { 042 this(new JmsConsumerProperties(), factory); 043 } 044 045 public JmsConsumerClient(JmsConsumerProperties clientProps, ConnectionFactory factory) { 046 super(factory); 047 client = clientProps; 048 } 049 050 public void receiveMessages() throws JMSException { 051 if (client.isAsyncRecv()) { 052 if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) { 053 receiveAsyncTimeBasedMessages(client.getRecvDuration()); 054 } else { 055 receiveAsyncCountBasedMessages(client.getRecvCount()); 056 } 057 } else { 058 if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) { 059 receiveSyncTimeBasedMessages(client.getRecvDuration()); 060 } else { 061 receiveSyncCountBasedMessages(client.getRecvCount()); 062 } 063 } 064 } 065 066 public void receiveMessages(int destCount) throws JMSException { 067 this.destCount = destCount; 068 receiveMessages(); 069 } 070 071 public void receiveMessages(int destIndex, int destCount) throws JMSException { 072 this.destIndex = destIndex; 073 receiveMessages(destCount); 074 } 075 076 public void receiveSyncTimeBasedMessages(long duration) throws JMSException { 077 if (getJmsConsumer() == null) { 078 createJmsConsumer(); 079 } 080 081 try { 082 getConnection().start(); 083 084 LOG.info("Starting to synchronously receive messages for " + duration + " ms..."); 085 long endTime = System.currentTimeMillis() + duration; 086 087 while (System.currentTimeMillis() < endTime) { 088 getJmsConsumer().receive(); 089 incThroughput(); 090 sleep(); 091 commitTxIfNecessary(); 092 } 093 } finally { 094 if (client.isDurable() && client.isUnsubscribe()) { 095 LOG.info("Unsubscribing durable subscriber: " + getClientName()); 096 getJmsConsumer().close(); 097 getSession().unsubscribe(getClientName()); 098 } 099 getConnection().close(); 100 } 101 } 102 103 public void receiveSyncCountBasedMessages(long count) throws JMSException { 104 if (getJmsConsumer() == null) { 105 createJmsConsumer(); 106 } 107 108 try { 109 getConnection().start(); 110 LOG.info("Starting to synchronously receive " + count + " messages..."); 111 112 int recvCount = 0; 113 while (recvCount < count) { 114 getJmsConsumer().receive(); 115 incThroughput(); 116 recvCount++; 117 sleep(); 118 commitTxIfNecessary(); 119 } 120 } finally { 121 if (client.isDurable() && client.isUnsubscribe()) { 122 LOG.info("Unsubscribing durable subscriber: " + getClientName()); 123 getJmsConsumer().close(); 124 getSession().unsubscribe(getClientName()); 125 } 126 getConnection().close(); 127 } 128 } 129 130 public void receiveAsyncTimeBasedMessages(long duration) throws JMSException { 131 if (getJmsConsumer() == null) { 132 createJmsConsumer(); 133 } 134 135 getJmsConsumer().setMessageListener(new MessageListener() { 136 @Override 137 public void onMessage(Message msg) { 138 incThroughput(); 139 sleep(); 140 try { 141 commitTxIfNecessary(); 142 } catch (JMSException ex) { 143 LOG.error("Error committing transaction: " + ex.getMessage()); 144 } 145 } 146 }); 147 148 try { 149 getConnection().start(); 150 LOG.info("Starting to asynchronously receive messages for " + duration + " ms..."); 151 try { 152 Thread.sleep(duration); 153 } catch (InterruptedException e) { 154 throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage()); 155 } 156 } finally { 157 if (client.isDurable() && client.isUnsubscribe()) { 158 LOG.info("Unsubscribing durable subscriber: " + getClientName()); 159 getJmsConsumer().close(); 160 getSession().unsubscribe(getClientName()); 161 } 162 getConnection().close(); 163 } 164 } 165 166 public void receiveAsyncCountBasedMessages(long count) throws JMSException { 167 if (getJmsConsumer() == null) { 168 createJmsConsumer(); 169 } 170 171 final AtomicInteger recvCount = new AtomicInteger(0); 172 getJmsConsumer().setMessageListener(new MessageListener() { 173 @Override 174 public void onMessage(Message msg) { 175 incThroughput(); 176 recvCount.incrementAndGet(); 177 synchronized (recvCount) { 178 recvCount.notify(); 179 } 180 181 try { 182 commitTxIfNecessary(); 183 } catch (JMSException ex) { 184 LOG.error("Error committing transaction: " + ex.getMessage()); 185 } 186 } 187 }); 188 189 try { 190 getConnection().start(); 191 LOG.info("Starting to asynchronously receive " + client.getRecvCount() + " messages..."); 192 try { 193 while (recvCount.get() < count) { 194 synchronized (recvCount) { 195 recvCount.wait(); 196 } 197 } 198 } catch (InterruptedException e) { 199 throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage()); 200 } 201 } finally { 202 if (client.isDurable() && client.isUnsubscribe()) { 203 LOG.info("Unsubscribing durable subscriber: " + getClientName()); 204 getJmsConsumer().close(); 205 getSession().unsubscribe(getClientName()); 206 } 207 getConnection().close(); 208 } 209 } 210 211 public MessageConsumer createJmsConsumer() throws JMSException { 212 Destination[] dest = createDestinations(destCount); 213 214 Destination consumedDestination = dest[0]; 215 if (dest.length > 1) { 216 String destinationName = ((ActiveMQDestination) consumedDestination).getPhysicalName(); 217 LOG.warn("Multiple destinations requested for consumer; using only first: {}", destinationName); 218 } 219 220 if (this.client.getMessageSelector() == null) { 221 return createJmsConsumer(consumedDestination); 222 } else { 223 return createJmsConsumer(consumedDestination, this.client.getMessageSelector(), false); 224 } 225 } 226 227 public MessageConsumer createJmsConsumer(Destination dest) throws JMSException { 228 if (client.isDurable()) { 229 String clientName = getClientName(); 230 if (clientName == null) { 231 clientName = "JmsConsumer"; 232 setClientName(clientName); 233 } 234 LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString()); 235 jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName); 236 } else { 237 LOG.info("Creating non-durable consumer to: " + dest.toString()); 238 jmsConsumer = getSession().createConsumer(dest); 239 } 240 return jmsConsumer; 241 } 242 243 public MessageConsumer createJmsConsumer(Destination dest, String selector, boolean noLocal) throws JMSException { 244 if (client.isDurable()) { 245 String clientName = getClientName(); 246 if (clientName == null) { 247 clientName = "JmsConsumer"; 248 setClientName(clientName); 249 } 250 LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString()); 251 jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName, selector, noLocal); 252 } else { 253 LOG.info("Creating non-durable consumer to: " + dest.toString()); 254 jmsConsumer = getSession().createConsumer(dest, selector, noLocal); 255 } 256 return jmsConsumer; 257 } 258 259 public MessageConsumer getJmsConsumer() { 260 return jmsConsumer; 261 } 262 263 @Override 264 public JmsClientProperties getClient() { 265 return client; 266 } 267 268 @Override 269 public void setClient(JmsClientProperties clientProps) { 270 client = (JmsConsumerProperties)clientProps; 271 } 272 273 /** 274 * A way to throttle the consumer. Time to sleep is 275 * configured via recvDelay property. 276 */ 277 protected void sleep() { 278 if (client.getRecvDelay() > 0) { 279 try { 280 LOG.trace("Sleeping for " + client.getRecvDelay() + " milliseconds"); 281 Thread.sleep(client.getRecvDelay()); 282 } catch (java.lang.InterruptedException ex) { 283 LOG.warn(ex.getMessage()); 284 } 285 } 286 } 287}