001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.tool;
018
019import java.io.BufferedReader;
020import java.io.File;
021import java.io.FileNotFoundException;
022import java.io.FileReader;
023import java.io.IOException;
024import java.util.Arrays;
025import java.util.Set;
026
027import javax.jms.ConnectionFactory;
028import javax.jms.DeliveryMode;
029import javax.jms.Destination;
030import javax.jms.JMSException;
031import javax.jms.MessageProducer;
032import javax.jms.TextMessage;
033
034import org.apache.activemq.command.ActiveMQDestination;
035import org.apache.activemq.tool.properties.JmsClientProperties;
036import org.apache.activemq.tool.properties.JmsProducerProperties;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040public class JmsProducerClient extends AbstractJmsMeasurableClient {
041    private static final Logger LOG = LoggerFactory.getLogger(JmsProducerClient.class);
042
043    protected JmsProducerProperties client;
044    protected MessageProducer jmsProducer;
045    protected TextMessage jmsTextMessage;
046
047    public JmsProducerClient(ConnectionFactory factory) {
048        this(new JmsProducerProperties(), factory);
049    }
050
051    public JmsProducerClient(JmsProducerProperties clientProps, ConnectionFactory factory) {
052        super(factory);
053        this.client = clientProps;
054    }
055
056    public void sendMessages() throws JMSException {
057        // Send a specific number of messages
058        if (client.getSendType().equalsIgnoreCase(JmsProducerProperties.COUNT_BASED_SENDING)) {
059            long sendCount = client.getSendCount();
060            sendCountBasedMessages(sendCount);
061        // Send messages for a specific duration
062        } else {
063            long sendDuration = client.getSendDuration();
064            sendTimeBasedMessages(sendDuration);
065        }
066    }
067
068    public void sendMessages(int destCount) throws JMSException {
069        this.destCount = destCount;
070        sendMessages();
071    }
072
073    public void sendMessages(int destIndex, int destCount) throws JMSException {
074        this.destIndex = destIndex;
075        sendMessages(destCount);
076    }
077
078    public void sendCountBasedMessages(long messageCount) throws JMSException {
079        // Parse through different ways to send messages
080        // Avoided putting the condition inside the loop to prevent effect on performance
081        Destination[] dest = createDestinations(destCount);
082
083        // Create a producer, if none is created.
084        if (getJmsProducer() == null) {
085            if (dest.length == 1) {
086                createJmsProducer(dest[0]);
087            } else {
088                createJmsProducer();
089            }
090        }
091        try {
092            getConnection().start();
093            if (client.getMsgFileName() != null) {
094                LOG.info("Starting to publish " +
095                    messageCount +
096                    " messages from file " +
097                    client.getMsgFileName()
098                );
099            } else {
100                LOG.info("Starting to publish " +
101                    messageCount +
102                    " messages of size " +
103                    client.getMessageSize() +
104                    " byte(s)."
105                );
106            }
107
108            // Send one type of message only, avoiding the creation of different messages on sending
109            if (!client.isCreateNewMsg()) {
110                // Create only a single message
111                createJmsTextMessage();
112
113                // Send to more than one actual destination
114                if (dest.length > 1) {
115                    for (int i = 0; i < messageCount; i++) {
116                        for (int j = 0; j < dest.length; j++) {
117                            getJmsProducer().send(dest[j], getJmsTextMessage());
118                            incThroughput();
119                            sleep();
120                            commitTxIfNecessary();
121                        }
122                    }
123                    // Send to only one actual destination
124                } else {
125                    for (int i = 0; i < messageCount; i++) {
126                        getJmsProducer().send(getJmsTextMessage());
127                        incThroughput();
128                        sleep();
129                        commitTxIfNecessary();
130                    }
131                }
132
133                // Send different type of messages using indexing to identify each one.
134                // Message size will vary. Definitely slower, since messages properties
135                // will be set individually each send.
136            } else {
137                // Send to more than one actual destination
138                if (dest.length > 1) {
139                    for (int i = 0; i < messageCount; i++) {
140                        for (int j = 0; j < dest.length; j++) {
141                            getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]"));
142                            incThroughput();
143                            sleep();
144                            commitTxIfNecessary();
145                        }
146                    }
147
148                    // Send to only one actual destination
149                } else {
150                    for (int i = 0; i < messageCount; i++) {
151                        getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]"));
152                        incThroughput();
153                        sleep();
154                        commitTxIfNecessary();
155                    }
156                }
157            }
158        } finally {
159            LOG.info("Finished sending");
160            getConnection().close();
161        }
162    }
163
164    public void sendTimeBasedMessages(long duration) throws JMSException {
165        long endTime = System.currentTimeMillis() + duration;
166        // Parse through different ways to send messages
167        // Avoided putting the condition inside the loop to prevent effect on performance
168
169        Destination[] dest = createDestinations(destCount);
170
171        // Create a producer, if none is created.
172        if (getJmsProducer() == null) {
173            if (dest.length == 1) {
174                createJmsProducer(dest[0]);
175            } else {
176                createJmsProducer();
177            }
178        }
179
180        try {
181            getConnection().start();
182            if (client.getMsgFileName() != null) {
183                LOG.info("Starting to publish messages from file " +
184                        client.getMsgFileName() +
185                        " for " +
186                        duration +
187                        " ms");
188            } else {
189                LOG.info("Starting to publish " +
190                        client.getMessageSize() +
191                        " byte(s) messages for " +
192                        duration +
193                        " ms");
194            }
195            // Send one type of message only, avoiding the creation of different messages on sending
196            if (!client.isCreateNewMsg()) {
197                // Create only a single message
198                createJmsTextMessage();
199
200                // Send to more than one actual destination
201                if (dest.length > 1) {
202                    while (System.currentTimeMillis() < endTime) {
203                        for (int j = 0; j < dest.length; j++) {
204                            getJmsProducer().send(dest[j], getJmsTextMessage());
205                            incThroughput();
206                            sleep();
207                            commitTxIfNecessary();
208                        }
209                    }
210                    // Send to only one actual destination
211                } else {
212                    while (System.currentTimeMillis() < endTime) {
213                        getJmsProducer().send(getJmsTextMessage());
214                        incThroughput();
215                        sleep();
216                        commitTxIfNecessary();
217                    }
218                }
219
220                // Send different type of messages using indexing to identify each one.
221                // Message size will vary. Definitely slower, since messages properties
222                // will be set individually each send.
223            } else {
224                // Send to more than one actual destination
225                long count = 1;
226                if (dest.length > 1) {
227                    while (System.currentTimeMillis() < endTime) {
228                        for (int j = 0; j < dest.length; j++) {
229                            getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]"));
230                            incThroughput();
231                            sleep();
232                            commitTxIfNecessary();
233                        }
234                    }
235
236                    // Send to only one actual destination
237                } else {
238                    while (System.currentTimeMillis() < endTime) {
239
240                        getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]"));
241                        incThroughput();
242                        sleep();
243                        commitTxIfNecessary();
244                    }
245                }
246            }
247        } finally {
248            LOG.info("Finished sending");
249            getConnection().close();
250        }
251    }
252
253    public MessageProducer createJmsProducer() throws JMSException {
254        jmsProducer = getSession().createProducer(null);
255        if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_PERSISTENT)) {
256            LOG.info("Creating producer to possible multiple destinations with persistent delivery.");
257            jmsProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
258        } else if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_NON_PERSISTENT)) {
259            LOG.info("Creating producer to possible multiple destinations with non-persistent delivery.");
260            jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
261        } else {
262            LOG.warn("Unknown deliveryMode value. Defaulting to non-persistent.");
263            jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
264        }
265        return jmsProducer;
266    }
267
268    public MessageProducer createJmsProducer(Destination dest) throws JMSException {
269        jmsProducer = getSession().createProducer(dest);
270        if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_PERSISTENT)) {
271            LOG.info("Creating producer to: " + dest.toString() + " with persistent delivery.");
272            jmsProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
273        } else if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_NON_PERSISTENT)) {
274            LOG.info("Creating  producer to: " + dest.toString() + " with non-persistent delivery.");
275            jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
276        } else {
277            LOG.warn("Unknown deliveryMode value. Defaulting to non-persistent.");
278            jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
279        }
280        return jmsProducer;
281    }
282
283    public MessageProducer getJmsProducer() {
284        return jmsProducer;
285    }
286
287    public TextMessage createJmsTextMessage() throws JMSException {
288        if (client.getMsgFileName() != null) {
289            return loadJmsMessage();
290        } else {
291          return createJmsTextMessage(client.getMessageSize());
292        }
293    }
294
295    public TextMessage createJmsTextMessage(int size) throws JMSException {
296        jmsTextMessage = getSession().createTextMessage(buildText("", size));
297
298        // support for adding message headers
299        Set<String> headerKeys = this.client.getHeaderKeys();
300        for (String key : headerKeys) {
301            jmsTextMessage.setObjectProperty(key, this.client.getHeaderValue(key));
302        }
303
304        return jmsTextMessage;
305    }
306
307    public TextMessage createJmsTextMessage(String text) throws JMSException {
308        jmsTextMessage = getSession().createTextMessage(buildText(text, client.getMessageSize()));
309        return jmsTextMessage;
310    }
311
312    public TextMessage getJmsTextMessage() {
313        return jmsTextMessage;
314    }
315
316    @Override
317    public JmsClientProperties getClient() {
318        return client;
319    }
320
321    @Override
322    public void setClient(JmsClientProperties clientProps) {
323        client = (JmsProducerProperties)clientProps;
324    }
325
326    @Override
327    protected Destination createTemporaryDestination(String destName) throws JMSException {
328        String simpleName = getSimpleName(destName);
329        byte destinationType = getDestinationType(destName);
330
331        // when we produce to temp destinations, we publish to them as
332        // though they were normal queues or topics
333        if (destinationType == ActiveMQDestination.TEMP_QUEUE_TYPE) {
334            LOG.info("Creating queue: {}", destName);
335            return getSession().createQueue(simpleName);
336        } else if (destinationType == ActiveMQDestination.TEMP_TOPIC_TYPE) {
337            LOG.info("Creating topic: {}", destName);
338            return getSession().createTopic(simpleName);
339        } else {
340            throw new IllegalArgumentException("Unrecognized destination type: " + destinationType);
341        }
342    }
343
344    protected String buildText(String text, int size) {
345        byte[] data = new byte[size - text.length()];
346        Arrays.fill(data, (byte) 0);
347        return text + new String(data);
348    }
349
350    protected void sleep() {
351        if (client.getSendDelay() > 0) {
352            try {
353                LOG.trace("Sleeping for " + client.getSendDelay() + " milliseconds");
354                Thread.sleep(client.getSendDelay());
355            } catch (java.lang.InterruptedException ex) {
356                LOG.warn(ex.getMessage());
357            }
358        }
359    }
360
361    /**
362     * loads the message to be sent from the specified TextFile
363     */
364    protected TextMessage loadJmsMessage() throws JMSException {
365        try {
366            // couple of sanity checks upfront
367            if (client.getMsgFileName() == null) {
368                throw new JMSException("Invalid filename specified.");
369            }
370
371            File f = new File(client.getMsgFileName());
372            if (f.isDirectory()) {
373                throw new JMSException("Cannot load from " +
374                        client.getMsgFileName() +
375                        " as it is a directory not a text file.");
376            }
377
378            // try to load file
379            BufferedReader br = new BufferedReader(new FileReader(f));
380            StringBuffer payload = new StringBuffer();
381            String tmp = null;
382            while ((tmp = br.readLine()) != null) {
383                payload.append(tmp);
384            }
385            br.close();
386            jmsTextMessage = getSession().createTextMessage(payload.toString());
387            return jmsTextMessage;
388        } catch (FileNotFoundException ex) {
389            throw new JMSException(ex.getMessage());
390        } catch (IOException iox) {
391            throw new JMSException(iox.getMessage());
392        }
393    }
394}