001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.tool;
018
019import java.util.concurrent.atomic.AtomicInteger;
020
021import javax.jms.ConnectionFactory;
022import javax.jms.Destination;
023import javax.jms.JMSException;
024import javax.jms.Message;
025import javax.jms.MessageConsumer;
026import javax.jms.MessageListener;
027import javax.jms.Topic;
028
029import org.apache.activemq.command.ActiveMQDestination;
030import org.apache.activemq.tool.properties.JmsClientProperties;
031import org.apache.activemq.tool.properties.JmsConsumerProperties;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035public class JmsConsumerClient extends AbstractJmsMeasurableClient {
036    private static final Logger LOG = LoggerFactory.getLogger(JmsConsumerClient.class);
037
038    protected MessageConsumer jmsConsumer;
039    protected JmsConsumerProperties client;
040
041    public JmsConsumerClient(ConnectionFactory factory) {
042        this(new JmsConsumerProperties(), factory);
043    }
044
045    public JmsConsumerClient(JmsConsumerProperties clientProps, ConnectionFactory factory) {
046        super(factory);
047        client = clientProps;
048    }
049
050    public void receiveMessages() throws JMSException {
051        if (client.isAsyncRecv()) {
052            if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) {
053                receiveAsyncTimeBasedMessages(client.getRecvDuration());
054            } else {
055                receiveAsyncCountBasedMessages(client.getRecvCount());
056            }
057        } else {
058            if (client.getRecvType().equalsIgnoreCase(JmsConsumerProperties.TIME_BASED_RECEIVING)) {
059                receiveSyncTimeBasedMessages(client.getRecvDuration());
060            } else {
061                receiveSyncCountBasedMessages(client.getRecvCount());
062            }
063        }
064    }
065
066    public void receiveMessages(int destCount) throws JMSException {
067        this.destCount = destCount;
068        receiveMessages();
069    }
070
071    public void receiveMessages(int destIndex, int destCount) throws JMSException {
072        this.destIndex = destIndex;
073        receiveMessages(destCount);
074    }
075
076    public void receiveSyncTimeBasedMessages(long duration) throws JMSException {
077        if (getJmsConsumer() == null) {
078            createJmsConsumer();
079        }
080
081        try {
082            getConnection().start();
083
084            LOG.info("Starting to synchronously receive messages for " + duration + " ms...");
085            long endTime = System.currentTimeMillis() + duration;
086
087            while (System.currentTimeMillis() < endTime) {
088                getJmsConsumer().receive();
089                incThroughput();
090                sleep();
091                commitTxIfNecessary();
092            }
093        } finally {
094            if (client.isDurable() && client.isUnsubscribe()) {
095                LOG.info("Unsubscribing durable subscriber: " + getClientName());
096                getJmsConsumer().close();
097                getSession().unsubscribe(getClientName());
098            }
099            getConnection().close();
100        }
101    }
102
103    public void receiveSyncCountBasedMessages(long count) throws JMSException {
104        if (getJmsConsumer() == null) {
105            createJmsConsumer();
106        }
107
108        try {
109            getConnection().start();
110            LOG.info("Starting to synchronously receive " + count + " messages...");
111
112            int recvCount = 0;
113            while (recvCount < count) {
114                getJmsConsumer().receive();
115                incThroughput();
116                recvCount++;
117                sleep();
118                commitTxIfNecessary();
119            }
120        } finally {
121            if (client.isDurable() && client.isUnsubscribe()) {
122                LOG.info("Unsubscribing durable subscriber: " + getClientName());
123                getJmsConsumer().close();
124                getSession().unsubscribe(getClientName());
125            }
126            getConnection().close();
127        }
128    }
129
130    public void receiveAsyncTimeBasedMessages(long duration) throws JMSException {
131        if (getJmsConsumer() == null) {
132            createJmsConsumer();
133        }
134
135        getJmsConsumer().setMessageListener(new MessageListener() {
136            @Override
137            public void onMessage(Message msg) {
138                incThroughput();
139                sleep();
140                try {
141                    commitTxIfNecessary();
142                } catch (JMSException ex) {
143                    LOG.error("Error committing transaction: " + ex.getMessage());
144                }
145            }
146        });
147
148        try {
149            getConnection().start();
150            LOG.info("Starting to asynchronously receive messages for " + duration + " ms...");
151            try {
152                Thread.sleep(duration);
153            } catch (InterruptedException e) {
154                throw new JMSException("JMS consumer thread sleep has been interrupted. Message: " + e.getMessage());
155            }
156        } finally {
157            if (client.isDurable() && client.isUnsubscribe()) {
158                LOG.info("Unsubscribing durable subscriber: " + getClientName());
159                getJmsConsumer().close();
160                getSession().unsubscribe(getClientName());
161            }
162            getConnection().close();
163        }
164    }
165
166    public void receiveAsyncCountBasedMessages(long count) throws JMSException {
167        if (getJmsConsumer() == null) {
168            createJmsConsumer();
169        }
170
171        final AtomicInteger recvCount = new AtomicInteger(0);
172        getJmsConsumer().setMessageListener(new MessageListener() {
173            @Override
174            public void onMessage(Message msg) {
175                incThroughput();
176                recvCount.incrementAndGet();
177                synchronized (recvCount) {
178                    recvCount.notify();
179                }
180
181                try {
182                    commitTxIfNecessary();
183                } catch (JMSException ex) {
184                    LOG.error("Error committing transaction: " + ex.getMessage());
185                }
186            }
187        });
188
189        try {
190            getConnection().start();
191            LOG.info("Starting to asynchronously receive " + client.getRecvCount() + " messages...");
192            try {
193                while (recvCount.get() < count) {
194                    synchronized (recvCount) {
195                        recvCount.wait();
196                    }
197                }
198            } catch (InterruptedException e) {
199                throw new JMSException("JMS consumer thread wait has been interrupted. Message: " + e.getMessage());
200            }
201        } finally {
202            if (client.isDurable() && client.isUnsubscribe()) {
203                LOG.info("Unsubscribing durable subscriber: " + getClientName());
204                getJmsConsumer().close();
205                getSession().unsubscribe(getClientName());
206            }
207            getConnection().close();
208        }
209    }
210
211    public MessageConsumer createJmsConsumer() throws JMSException {
212        Destination[] dest = createDestinations(destCount);
213
214        Destination consumedDestination = dest[0];
215        if (dest.length > 1) {
216            String destinationName = ((ActiveMQDestination) consumedDestination).getPhysicalName();
217            LOG.warn("Multiple destinations requested for consumer; using only first: {}", destinationName);
218        }
219
220        if (this.client.getMessageSelector() == null) {
221            return createJmsConsumer(consumedDestination);
222        } else {
223            return createJmsConsumer(consumedDestination, this.client.getMessageSelector(), false);
224        }
225    }
226
227    public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
228        if (client.isDurable()) {
229            String clientName = getClientName();
230            if (clientName == null) {
231                clientName = "JmsConsumer";
232                setClientName(clientName);
233            }
234            LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
235            jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName);
236        } else {
237            LOG.info("Creating non-durable consumer to: " + dest.toString());
238            jmsConsumer = getSession().createConsumer(dest);
239        }
240        return jmsConsumer;
241    }
242
243    public MessageConsumer createJmsConsumer(Destination dest, String selector, boolean noLocal) throws JMSException {
244        if (client.isDurable()) {
245            String clientName = getClientName();
246            if (clientName == null) {
247                clientName = "JmsConsumer";
248                setClientName(clientName);
249            }
250            LOG.info("Creating durable subscriber (" + clientName + ") to: " + dest.toString());
251            jmsConsumer = getSession().createDurableSubscriber((Topic) dest, clientName, selector, noLocal);
252        } else {
253            LOG.info("Creating non-durable consumer to: " + dest.toString());
254            jmsConsumer = getSession().createConsumer(dest, selector, noLocal);
255        }
256        return jmsConsumer;
257    }
258
259    public MessageConsumer getJmsConsumer() {
260        return jmsConsumer;
261    }
262
263    @Override
264    public JmsClientProperties getClient() {
265        return client;
266    }
267
268    @Override
269    public void setClient(JmsClientProperties clientProps) {
270        client = (JmsConsumerProperties)clientProps;
271    }
272
273    /**
274     * A way to throttle the consumer. Time to sleep is
275     * configured via recvDelay property.
276     */
277    protected void sleep() {
278        if (client.getRecvDelay() > 0) {
279            try {
280                LOG.trace("Sleeping for " + client.getRecvDelay() + " milliseconds");
281                Thread.sleep(client.getRecvDelay());
282            } catch (java.lang.InterruptedException ex) {
283                LOG.warn(ex.getMessage());
284            }
285        }
286    }
287}