001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.tool; 018 019import java.io.BufferedReader; 020import java.io.File; 021import java.io.FileNotFoundException; 022import java.io.FileReader; 023import java.io.IOException; 024import java.util.Arrays; 025import java.util.Set; 026 027import javax.jms.ConnectionFactory; 028import javax.jms.DeliveryMode; 029import javax.jms.Destination; 030import javax.jms.JMSException; 031import javax.jms.MessageProducer; 032import javax.jms.TextMessage; 033 034import org.apache.activemq.command.ActiveMQDestination; 035import org.apache.activemq.tool.properties.JmsClientProperties; 036import org.apache.activemq.tool.properties.JmsProducerProperties; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040public class JmsProducerClient extends AbstractJmsMeasurableClient { 041 private static final Logger LOG = LoggerFactory.getLogger(JmsProducerClient.class); 042 043 protected JmsProducerProperties client; 044 protected MessageProducer jmsProducer; 045 protected TextMessage jmsTextMessage; 046 047 public JmsProducerClient(ConnectionFactory factory) { 048 this(new JmsProducerProperties(), factory); 049 } 050 051 public JmsProducerClient(JmsProducerProperties clientProps, ConnectionFactory factory) { 052 super(factory); 053 this.client = clientProps; 054 } 055 056 public void sendMessages() throws JMSException { 057 // Send a specific number of messages 058 if (client.getSendType().equalsIgnoreCase(JmsProducerProperties.COUNT_BASED_SENDING)) { 059 long sendCount = client.getSendCount(); 060 sendCountBasedMessages(sendCount); 061 // Send messages for a specific duration 062 } else { 063 long sendDuration = client.getSendDuration(); 064 sendTimeBasedMessages(sendDuration); 065 } 066 } 067 068 public void sendMessages(int destCount) throws JMSException { 069 this.destCount = destCount; 070 sendMessages(); 071 } 072 073 public void sendMessages(int destIndex, int destCount) throws JMSException { 074 this.destIndex = destIndex; 075 sendMessages(destCount); 076 } 077 078 public void sendCountBasedMessages(long messageCount) throws JMSException { 079 // Parse through different ways to send messages 080 // Avoided putting the condition inside the loop to prevent effect on performance 081 Destination[] dest = createDestinations(destCount); 082 083 // Create a producer, if none is created. 084 if (getJmsProducer() == null) { 085 if (dest.length == 1) { 086 createJmsProducer(dest[0]); 087 } else { 088 createJmsProducer(); 089 } 090 } 091 try { 092 getConnection().start(); 093 if (client.getMsgFileName() != null) { 094 LOG.info("Starting to publish " + 095 messageCount + 096 " messages from file " + 097 client.getMsgFileName() 098 ); 099 } else { 100 LOG.info("Starting to publish " + 101 messageCount + 102 " messages of size " + 103 client.getMessageSize() + 104 " byte(s)." 105 ); 106 } 107 108 // Send one type of message only, avoiding the creation of different messages on sending 109 if (!client.isCreateNewMsg()) { 110 // Create only a single message 111 createJmsTextMessage(); 112 113 // Send to more than one actual destination 114 if (dest.length > 1) { 115 for (int i = 0; i < messageCount; i++) { 116 for (int j = 0; j < dest.length; j++) { 117 getJmsProducer().send(dest[j], getJmsTextMessage()); 118 incThroughput(); 119 sleep(); 120 commitTxIfNecessary(); 121 } 122 } 123 // Send to only one actual destination 124 } else { 125 for (int i = 0; i < messageCount; i++) { 126 getJmsProducer().send(getJmsTextMessage()); 127 incThroughput(); 128 sleep(); 129 commitTxIfNecessary(); 130 } 131 } 132 133 // Send different type of messages using indexing to identify each one. 134 // Message size will vary. Definitely slower, since messages properties 135 // will be set individually each send. 136 } else { 137 // Send to more than one actual destination 138 if (dest.length > 1) { 139 for (int i = 0; i < messageCount; i++) { 140 for (int j = 0; j < dest.length; j++) { 141 getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + i + "]")); 142 incThroughput(); 143 sleep(); 144 commitTxIfNecessary(); 145 } 146 } 147 148 // Send to only one actual destination 149 } else { 150 for (int i = 0; i < messageCount; i++) { 151 getJmsProducer().send(createJmsTextMessage("Text Message [" + i + "]")); 152 incThroughput(); 153 sleep(); 154 commitTxIfNecessary(); 155 } 156 } 157 } 158 } finally { 159 LOG.info("Finished sending"); 160 getConnection().close(); 161 } 162 } 163 164 public void sendTimeBasedMessages(long duration) throws JMSException { 165 long endTime = System.currentTimeMillis() + duration; 166 // Parse through different ways to send messages 167 // Avoided putting the condition inside the loop to prevent effect on performance 168 169 Destination[] dest = createDestinations(destCount); 170 171 // Create a producer, if none is created. 172 if (getJmsProducer() == null) { 173 if (dest.length == 1) { 174 createJmsProducer(dest[0]); 175 } else { 176 createJmsProducer(); 177 } 178 } 179 180 try { 181 getConnection().start(); 182 if (client.getMsgFileName() != null) { 183 LOG.info("Starting to publish messages from file " + 184 client.getMsgFileName() + 185 " for " + 186 duration + 187 " ms"); 188 } else { 189 LOG.info("Starting to publish " + 190 client.getMessageSize() + 191 " byte(s) messages for " + 192 duration + 193 " ms"); 194 } 195 // Send one type of message only, avoiding the creation of different messages on sending 196 if (!client.isCreateNewMsg()) { 197 // Create only a single message 198 createJmsTextMessage(); 199 200 // Send to more than one actual destination 201 if (dest.length > 1) { 202 while (System.currentTimeMillis() < endTime) { 203 for (int j = 0; j < dest.length; j++) { 204 getJmsProducer().send(dest[j], getJmsTextMessage()); 205 incThroughput(); 206 sleep(); 207 commitTxIfNecessary(); 208 } 209 } 210 // Send to only one actual destination 211 } else { 212 while (System.currentTimeMillis() < endTime) { 213 getJmsProducer().send(getJmsTextMessage()); 214 incThroughput(); 215 sleep(); 216 commitTxIfNecessary(); 217 } 218 } 219 220 // Send different type of messages using indexing to identify each one. 221 // Message size will vary. Definitely slower, since messages properties 222 // will be set individually each send. 223 } else { 224 // Send to more than one actual destination 225 long count = 1; 226 if (dest.length > 1) { 227 while (System.currentTimeMillis() < endTime) { 228 for (int j = 0; j < dest.length; j++) { 229 getJmsProducer().send(dest[j], createJmsTextMessage("Text Message [" + count++ + "]")); 230 incThroughput(); 231 sleep(); 232 commitTxIfNecessary(); 233 } 234 } 235 236 // Send to only one actual destination 237 } else { 238 while (System.currentTimeMillis() < endTime) { 239 240 getJmsProducer().send(createJmsTextMessage("Text Message [" + count++ + "]")); 241 incThroughput(); 242 sleep(); 243 commitTxIfNecessary(); 244 } 245 } 246 } 247 } finally { 248 LOG.info("Finished sending"); 249 getConnection().close(); 250 } 251 } 252 253 public MessageProducer createJmsProducer() throws JMSException { 254 jmsProducer = getSession().createProducer(null); 255 if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_PERSISTENT)) { 256 LOG.info("Creating producer to possible multiple destinations with persistent delivery."); 257 jmsProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 258 } else if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_NON_PERSISTENT)) { 259 LOG.info("Creating producer to possible multiple destinations with non-persistent delivery."); 260 jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 261 } else { 262 LOG.warn("Unknown deliveryMode value. Defaulting to non-persistent."); 263 jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 264 } 265 return jmsProducer; 266 } 267 268 public MessageProducer createJmsProducer(Destination dest) throws JMSException { 269 jmsProducer = getSession().createProducer(dest); 270 if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_PERSISTENT)) { 271 LOG.info("Creating producer to: " + dest.toString() + " with persistent delivery."); 272 jmsProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 273 } else if (client.getDeliveryMode().equalsIgnoreCase(JmsProducerProperties.DELIVERY_MODE_NON_PERSISTENT)) { 274 LOG.info("Creating producer to: " + dest.toString() + " with non-persistent delivery."); 275 jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 276 } else { 277 LOG.warn("Unknown deliveryMode value. Defaulting to non-persistent."); 278 jmsProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 279 } 280 return jmsProducer; 281 } 282 283 public MessageProducer getJmsProducer() { 284 return jmsProducer; 285 } 286 287 public TextMessage createJmsTextMessage() throws JMSException { 288 if (client.getMsgFileName() != null) { 289 return loadJmsMessage(); 290 } else { 291 return createJmsTextMessage(client.getMessageSize()); 292 } 293 } 294 295 public TextMessage createJmsTextMessage(int size) throws JMSException { 296 jmsTextMessage = getSession().createTextMessage(buildText("", size)); 297 298 // support for adding message headers 299 Set<String> headerKeys = this.client.getHeaderKeys(); 300 for (String key : headerKeys) { 301 jmsTextMessage.setObjectProperty(key, this.client.getHeaderValue(key)); 302 } 303 304 return jmsTextMessage; 305 } 306 307 public TextMessage createJmsTextMessage(String text) throws JMSException { 308 jmsTextMessage = getSession().createTextMessage(buildText(text, client.getMessageSize())); 309 return jmsTextMessage; 310 } 311 312 public TextMessage getJmsTextMessage() { 313 return jmsTextMessage; 314 } 315 316 @Override 317 public JmsClientProperties getClient() { 318 return client; 319 } 320 321 @Override 322 public void setClient(JmsClientProperties clientProps) { 323 client = (JmsProducerProperties)clientProps; 324 } 325 326 @Override 327 protected Destination createTemporaryDestination(String destName) throws JMSException { 328 String simpleName = getSimpleName(destName); 329 byte destinationType = getDestinationType(destName); 330 331 // when we produce to temp destinations, we publish to them as 332 // though they were normal queues or topics 333 if (destinationType == ActiveMQDestination.TEMP_QUEUE_TYPE) { 334 LOG.info("Creating queue: {}", destName); 335 return getSession().createQueue(simpleName); 336 } else if (destinationType == ActiveMQDestination.TEMP_TOPIC_TYPE) { 337 LOG.info("Creating topic: {}", destName); 338 return getSession().createTopic(simpleName); 339 } else { 340 throw new IllegalArgumentException("Unrecognized destination type: " + destinationType); 341 } 342 } 343 344 protected String buildText(String text, int size) { 345 byte[] data = new byte[size - text.length()]; 346 Arrays.fill(data, (byte) 0); 347 return text + new String(data); 348 } 349 350 protected void sleep() { 351 if (client.getSendDelay() > 0) { 352 try { 353 LOG.trace("Sleeping for " + client.getSendDelay() + " milliseconds"); 354 Thread.sleep(client.getSendDelay()); 355 } catch (java.lang.InterruptedException ex) { 356 LOG.warn(ex.getMessage()); 357 } 358 } 359 } 360 361 /** 362 * loads the message to be sent from the specified TextFile 363 */ 364 protected TextMessage loadJmsMessage() throws JMSException { 365 try { 366 // couple of sanity checks upfront 367 if (client.getMsgFileName() == null) { 368 throw new JMSException("Invalid filename specified."); 369 } 370 371 File f = new File(client.getMsgFileName()); 372 if (f.isDirectory()) { 373 throw new JMSException("Cannot load from " + 374 client.getMsgFileName() + 375 " as it is a directory not a text file."); 376 } 377 378 // try to load file 379 BufferedReader br = new BufferedReader(new FileReader(f)); 380 StringBuffer payload = new StringBuffer(); 381 String tmp = null; 382 while ((tmp = br.readLine()) != null) { 383 payload.append(tmp); 384 } 385 br.close(); 386 jmsTextMessage = getSession().createTextMessage(payload.toString()); 387 return jmsTextMessage; 388 } catch (FileNotFoundException ex) { 389 throw new JMSException(ex.getMessage()); 390 } catch (IOException iox) { 391 throw new JMSException(iox.getMessage()); 392 } 393 } 394}