001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.tool; 018 019import java.util.ArrayList; 020import java.util.List; 021 022import javax.jms.*; 023 024import org.apache.activemq.command.ActiveMQDestination; 025import org.apache.activemq.tool.properties.JmsClientProperties; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029public abstract class AbstractJmsClient { 030 031 private static final Logger LOG = LoggerFactory.getLogger(AbstractJmsClient.class); 032 033 private static final String QUEUE_SCHEME = "queue://"; 034 private static final String TOPIC_SCHEME = "topic://"; 035 private static final String TEMP_QUEUE_SCHEME = "temp-queue://"; 036 private static final String TEMP_TOPIC_SCHEME = "temp-topic://"; 037 public static final String DESTINATION_SEPARATOR = ","; 038 039 protected ConnectionFactory factory; 040 protected Connection jmsConnection; 041 protected Session jmsSession; 042 043 protected int destCount = 1; 044 protected int destIndex; 045 protected String clientName = ""; 046 047 private int internalTxCounter = 0; 048 049 public AbstractJmsClient(ConnectionFactory factory) { 050 this.factory = factory; 051 } 052 053 public abstract JmsClientProperties getClient(); 054 055 public abstract void setClient(JmsClientProperties client); 056 057 public ConnectionFactory getFactory() { 058 return factory; 059 } 060 061 public void setFactory(ConnectionFactory factory) { 062 this.factory = factory; 063 } 064 065 public int getDestCount() { 066 return destCount; 067 } 068 069 public void setDestCount(int destCount) { 070 this.destCount = destCount; 071 } 072 073 public int getDestIndex() { 074 return destIndex; 075 } 076 077 public void setDestIndex(int destIndex) { 078 this.destIndex = destIndex; 079 } 080 081 public String getClientName() { 082 return clientName; 083 } 084 085 public void setClientName(String clientName) { 086 this.clientName = clientName; 087 } 088 089 public Connection getConnection() throws JMSException { 090 if (jmsConnection == null) { 091 jmsConnection = factory.createConnection(); 092 jmsConnection.setClientID(getClientName()); 093 LOG.info("Creating JMS Connection: Provider=" + getClient().getJmsProvider() + ", JMS Spec=" + getClient().getJmsVersion()); 094 } 095 return jmsConnection; 096 } 097 098 public Session getSession() throws JMSException { 099 if (jmsSession == null) { 100 int ackMode; 101 if (getClient().getSessAckMode().equalsIgnoreCase(JmsClientProperties.SESSION_AUTO_ACKNOWLEDGE)) { 102 ackMode = Session.AUTO_ACKNOWLEDGE; 103 } else if (getClient().getSessAckMode().equalsIgnoreCase(JmsClientProperties.SESSION_CLIENT_ACKNOWLEDGE)) { 104 ackMode = Session.CLIENT_ACKNOWLEDGE; 105 } else if (getClient().getSessAckMode().equalsIgnoreCase(JmsClientProperties.SESSION_DUPS_OK_ACKNOWLEDGE)) { 106 ackMode = Session.DUPS_OK_ACKNOWLEDGE; 107 } else if (getClient().getSessAckMode().equalsIgnoreCase(JmsClientProperties.SESSION_TRANSACTED)) { 108 ackMode = Session.SESSION_TRANSACTED; 109 } else { 110 ackMode = Session.AUTO_ACKNOWLEDGE; 111 } 112 jmsSession = getConnection().createSession(getClient().isSessTransacted(), ackMode); 113 } 114 return jmsSession; 115 } 116 117 public Destination[] createDestinations(int destCount) throws JMSException { 118 final String destName = getClient().getDestName(); 119 ArrayList<Destination> destinations = new ArrayList<>(); 120 if (destName.contains(DESTINATION_SEPARATOR)) { 121 if (getClient().isDestComposite() && (destCount == 1)) { 122 // user was explicit about which destinations to make composite 123 String[] simpleNames = mapToSimpleNames(destName.split(DESTINATION_SEPARATOR)); 124 String joinedSimpleNames = join(simpleNames, DESTINATION_SEPARATOR); 125 126 // use the type of the 1st destination for the Destination instance 127 byte destinationType = getDestinationType(destName); 128 destinations.add(createCompositeDestination(destinationType, joinedSimpleNames, 1)); 129 } else { 130 LOG.info("User requested multiple destinations, splitting: {}", destName); 131 // either composite with multiple destinations to be suffixed 132 // or multiple non-composite destinations 133 String[] destinationNames = destName.split(DESTINATION_SEPARATOR); 134 for (String splitDestName : destinationNames) { 135 addDestinations(destinations, splitDestName, destCount); 136 } 137 } 138 } else { 139 addDestinations(destinations, destName, destCount); 140 } 141 return destinations.toArray(new Destination[] {}); 142 } 143 144 private String join(String[] stings, String separator) { 145 StringBuffer sb = new StringBuffer(); 146 for (int i = 0; i < stings.length; i++) { 147 if (i > 0) { 148 sb.append(separator); 149 } 150 sb.append(stings[i]); 151 } 152 return sb.toString(); 153 } 154 155 private void addDestinations(List<Destination> destinations, String destName, int destCount) throws JMSException { 156 boolean destComposite = getClient().isDestComposite(); 157 if ((destComposite) && (destCount > 1)) { 158 destinations.add(createCompositeDestination(destName, destCount)); 159 } else { 160 for (int i = 0; i < destCount; i++) { 161 destinations.add(createDestination(withDestinationSuffix(destName, i, destCount))); 162 } 163 } 164 } 165 166 private String withDestinationSuffix(String name, int destIndex, int destCount) { 167 return (destCount == 1) ? name : name + "." + destIndex; 168 } 169 170 protected Destination createCompositeDestination(String destName, int destCount) throws JMSException { 171 return createCompositeDestination(getDestinationType(destName), destName, destCount); 172 } 173 174 protected Destination createCompositeDestination(byte destinationType, String destName, int destCount) throws JMSException { 175 String simpleName = getSimpleName(destName); 176 177 String compDestName = ""; 178 for (int i = 0; i < destCount; i++) { 179 if (i > 0) { 180 compDestName += ","; 181 } 182 compDestName += withDestinationSuffix(simpleName, i, destCount); 183 } 184 185 LOG.info("Creating composite destination: {}", compDestName); 186 Destination destination; 187 Session session = getSession(); 188 if (destinationType == ActiveMQDestination.TOPIC_TYPE) { 189 destination = session.createTopic(compDestName); 190 } else if (destinationType == ActiveMQDestination.QUEUE_TYPE) { 191 destination = session.createQueue(compDestName); 192 } else { 193 throw new UnsupportedOperationException( 194 "Cannot create composite destinations using temporary queues or topics."); 195 } 196 assert (destination != null); 197 return destination; 198 } 199 200 private String[] mapToSimpleNames(String[] destNames) { 201 assert (destNames != null); 202 String[] simpleNames = new String[destNames.length]; 203 for (int i = 0; i < destNames.length; i++) { 204 simpleNames[i] = getSimpleName(destNames[i]); 205 } 206 return simpleNames; 207 } 208 209 protected String getSimpleName(String destName) { 210 String simpleName; 211 if (destName.startsWith(QUEUE_SCHEME)) { 212 simpleName = destName.substring(QUEUE_SCHEME.length()); 213 } else if (destName.startsWith(TOPIC_SCHEME)) { 214 simpleName = destName.substring(TOPIC_SCHEME.length()); 215 } else if (destName.startsWith(TEMP_QUEUE_SCHEME)) { 216 simpleName = destName.substring(TEMP_QUEUE_SCHEME.length()); 217 } else if (destName.startsWith(TEMP_TOPIC_SCHEME)) { 218 simpleName = destName.substring(TEMP_TOPIC_SCHEME.length()); 219 } else { 220 simpleName = destName; 221 } 222 return simpleName; 223 } 224 225 protected byte getDestinationType(String destName) { 226 assert (destName != null); 227 if (destName.startsWith(QUEUE_SCHEME)) { 228 return ActiveMQDestination.QUEUE_TYPE; 229 } else if (destName.startsWith(TEMP_QUEUE_SCHEME)) { 230 return ActiveMQDestination.TEMP_QUEUE_TYPE; 231 } else if (destName.startsWith(TEMP_TOPIC_SCHEME)) { 232 return ActiveMQDestination.TEMP_TOPIC_TYPE; 233 } else { 234 return ActiveMQDestination.TOPIC_TYPE; 235 } 236 } 237 238 protected Destination createDestination(String destName) throws JMSException { 239 String simpleName = getSimpleName(destName); 240 byte destinationType = getDestinationType(destName); 241 242 if (destinationType == ActiveMQDestination.QUEUE_TYPE) { 243 LOG.info("Creating queue: {}", destName); 244 return getSession().createQueue(simpleName); 245 } else if (destinationType == ActiveMQDestination.TOPIC_TYPE) { 246 LOG.info("Creating topic: {}", destName); 247 return getSession().createTopic(simpleName); 248 } else { 249 return createTemporaryDestination(destName); 250 } 251 } 252 253 protected Destination createTemporaryDestination(String destName) throws JMSException { 254 byte destinationType = getDestinationType(destName); 255 256 if (destinationType == ActiveMQDestination.TEMP_QUEUE_TYPE) { 257 LOG.warn("Creating temporary queue. Requested name ({}) ignored.", destName); 258 TemporaryQueue temporaryQueue = getSession().createTemporaryQueue(); 259 LOG.info("Temporary queue created: {}", temporaryQueue.getQueueName()); 260 return temporaryQueue; 261 } else if (destinationType == ActiveMQDestination.TEMP_TOPIC_TYPE) { 262 LOG.warn("Creating temporary topic. Requested name ({}) ignored.", destName); 263 TemporaryTopic temporaryTopic = getSession().createTemporaryTopic(); 264 LOG.info("Temporary topic created: {}", temporaryTopic.getTopicName()); 265 return temporaryTopic; 266 } else { 267 throw new IllegalArgumentException("Unrecognized destination type: " + destinationType); 268 } 269 } 270 271 /** 272 * Helper method that checks if session is 273 * transacted and whether to commit the tx based on commitAfterXMsgs 274 * property. 275 * 276 * @return true if transaction was committed. 277 * @throws JMSException in case the call to JMS Session.commit() fails. 278 */ 279 public boolean commitTxIfNecessary() throws JMSException { 280 internalTxCounter++; 281 if (getClient().isSessTransacted()) { 282 if ((internalTxCounter % getClient().getCommitAfterXMsgs()) == 0) { 283 LOG.debug("Committing transaction."); 284 internalTxCounter = 0; 285 getSession().commit(); 286 return true; 287 } 288 } 289 return false; 290 } 291}