001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.tool; 018 019import java.io.File; 020import java.io.FileInputStream; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Enumeration; 024import java.util.List; 025import java.util.Properties; 026import java.util.Set; 027import java.util.concurrent.CountDownLatch; 028 029import javax.jms.ConnectionFactory; 030import javax.jms.ConnectionMetaData; 031import javax.jms.JMSException; 032 033import org.apache.activemq.tool.properties.AbstractObjectProperties; 034import org.apache.activemq.tool.properties.JmsClientProperties; 035import org.apache.activemq.tool.properties.JmsClientSystemProperties; 036import org.apache.activemq.tool.properties.JmsFactoryProperties; 037import org.apache.activemq.tool.properties.ReflectionUtil; 038import org.apache.activemq.tool.reports.PerformanceReportWriter; 039import org.apache.activemq.tool.reports.VerbosePerfReportWriter; 040import org.apache.activemq.tool.reports.XmlFilePerfReportWriter; 041import org.apache.activemq.tool.sampler.CpuSamplerTask; 042import org.apache.activemq.tool.sampler.PerformanceSampler; 043import org.apache.activemq.tool.sampler.ThroughputSamplerTask; 044import org.apache.activemq.tool.spi.SPIConnectionFactory; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048public abstract class AbstractJmsClientSystem extends AbstractObjectProperties { 049 private static final Logger LOG = LoggerFactory.getLogger(AbstractJmsClientSystem.class); 050 051 protected ThreadGroup clientThreadGroup; 052 protected ConnectionFactory jmsConnFactory; 053 054 // Properties 055 protected JmsFactoryProperties factory = new JmsFactoryProperties(); 056 protected ThroughputSamplerTask tpSampler = new ThroughputSamplerTask(); 057 058 private int clientDestIndex; 059 private int clientDestCount; 060 061 public void runSystemTest() throws JMSException { 062 // Create connection factory 063 jmsConnFactory = loadJmsFactory(getSysTest().getSpiClass(), factory.getFactorySettings()); 064 065 setProviderMetaData(jmsConnFactory.createConnection().getMetaData(), getJmsClientProperties()); 066 067 // Create performance sampler 068 PerformanceReportWriter writer = createPerfWriter(); 069 writer.openReportWriter(); 070 writer.writeProperties("jvmSettings", System.getProperties()); 071 writer.writeProperties("testSystemSettings", ReflectionUtil.retrieveObjectProperties(getSysTest())); 072 writer.writeProperties("jmsFactorySettings", ReflectionUtil.retrieveObjectProperties(jmsConnFactory)); 073 writer.writeProperties("jmsClientSettings", ReflectionUtil.retrieveObjectProperties(getJmsClientProperties())); 074 075 // set up performance samplers indicated by the user 076 List<PerformanceSampler> samplers = new ArrayList<>(); 077 078 Set<String> requestedSamplers = getSysTest().getSamplersSet(); 079 if (requestedSamplers.contains(JmsClientSystemProperties.SAMPLER_TP)) { 080 writer.writeProperties("tpSamplerSettings", ReflectionUtil.retrieveObjectProperties(tpSampler)); 081 samplers.add(tpSampler); 082 } 083 084 if (requestedSamplers.contains(JmsClientSystemProperties.SAMPLER_CPU)) { 085 CpuSamplerTask cpuSampler = new CpuSamplerTask(); 086 writer.writeProperties("cpuSamplerSettings", ReflectionUtil.retrieveObjectProperties(cpuSampler)); 087 088 try { 089 cpuSampler.createPlugin(); 090 samplers.add(cpuSampler); 091 } catch (IOException e) { 092 LOG.warn("Unable to start CPU sampler plugin. Reason: " + e.getMessage()); 093 } 094 } 095 096 // spawn client threads 097 clientThreadGroup = new ThreadGroup(getSysTest().getClientPrefix() + " Thread Group"); 098 099 int numClients = getSysTest().getNumClients(); 100 final CountDownLatch clientCompletionLatch = new CountDownLatch(numClients); 101 for (int i = 0; i < numClients; i++) { 102 distributeDestinations(getSysTest().getDestDistro(), i, numClients, getSysTest().getTotalDests()); 103 104 final String clientName = getSysTest().getClientPrefix() + i; 105 final int clientDestIndex = this.clientDestIndex; 106 final int clientDestCount = this.clientDestCount; 107 Thread t = new Thread(clientThreadGroup, new Runnable() { 108 @Override 109 public void run() { 110 runJmsClient(clientName, clientDestIndex, clientDestCount); 111 LOG.info("Client completed"); 112 clientCompletionLatch.countDown(); 113 } 114 }); 115 t.setName(getSysTest().getClientPrefix() + i + " Thread"); 116 t.start(); 117 } 118 119 // start the samplers 120 final CountDownLatch samplerCompletionLatch = new CountDownLatch(requestedSamplers.size()); 121 for (PerformanceSampler sampler : samplers) { 122 sampler.setPerfReportWriter(writer); 123 sampler.startSampler(samplerCompletionLatch, getClientRunBasis(), getClientRunDuration()); 124 } 125 126 try { 127 // wait for the clients to finish 128 clientCompletionLatch.await(); 129 LOG.debug("All clients completed"); 130 } catch (InterruptedException e) { 131 e.printStackTrace(); 132 } finally { 133 // if count-based, ramp-down time is not relevant, shut the samplers down 134 if (getClientRunBasis() == ClientRunBasis.count) { 135 for (PerformanceSampler sampler : samplers) { 136 sampler.finishSampling(); 137 } 138 } 139 140 try { 141 LOG.debug("Waiting for samplers to shut down"); 142 samplerCompletionLatch.await(); 143 LOG.debug("All samplers completed"); 144 } catch (InterruptedException e) { 145 e.printStackTrace(); 146 } finally { 147 writer.closeReportWriter(); 148 } 149 } 150 } 151 152 protected abstract ClientRunBasis getClientRunBasis(); 153 154 protected abstract long getClientRunDuration(); 155 156 public ThroughputSamplerTask getTpSampler() { 157 return tpSampler; 158 } 159 160 public JmsFactoryProperties getFactory() { 161 return factory; 162 } 163 164 public void setFactory(JmsFactoryProperties factory) { 165 this.factory = factory; 166 } 167 168 public abstract JmsClientSystemProperties getSysTest(); 169 170 public abstract void setSysTest(JmsClientSystemProperties sysTestProps); 171 172 public abstract JmsClientProperties getJmsClientProperties(); 173 174 protected PerformanceReportWriter createPerfWriter() { 175 if (getSysTest().getReportType().equalsIgnoreCase(JmsClientSystemProperties.REPORT_XML_FILE)) { 176 String reportName; 177 178 if ((reportName = getSysTest().getReportName()) == null) { 179 reportName = getSysTest().getClientPrefix() + "_" + "numClients" + getSysTest().getNumClients() + "_" + "numDests" + getSysTest().getTotalDests() + "_" + getSysTest().getDestDistro(); 180 } 181 return new XmlFilePerfReportWriter(getSysTest().getReportDir(), reportName); 182 } else if (getSysTest().getReportType().equalsIgnoreCase(JmsClientSystemProperties.REPORT_VERBOSE)) { 183 return new VerbosePerfReportWriter(); 184 } else { 185 // Use verbose if unknown report type 186 return new VerbosePerfReportWriter(); 187 } 188 } 189 190 protected void distributeDestinations(String distroType, int clientIndex, int numClients, int numDests) { 191 if (distroType.equalsIgnoreCase(JmsClientSystemProperties.DEST_DISTRO_ALL)) { 192 clientDestCount = numDests; 193 clientDestIndex = 0; 194 } else if (distroType.equalsIgnoreCase(JmsClientSystemProperties.DEST_DISTRO_EQUAL)) { 195 int destPerClient = numDests / numClients; 196 // There are equal or more destinations per client 197 if (destPerClient > 0) { 198 clientDestCount = destPerClient; 199 clientDestIndex = destPerClient * clientIndex; 200 // If there are more clients than destinations, share 201 // destinations per client 202 } else { 203 clientDestCount = 1; // At most one destination per client 204 clientDestIndex = clientIndex % numDests; 205 } 206 } else if (distroType.equalsIgnoreCase(JmsClientSystemProperties.DEST_DISTRO_DIVIDE)) { 207 int destPerClient = numDests / numClients; 208 // There are equal or more destinations per client 209 if (destPerClient > 0) { 210 int remain = numDests % numClients; 211 int nextIndex; 212 if (clientIndex < remain) { 213 destPerClient++; 214 nextIndex = clientIndex * destPerClient; 215 } else { 216 nextIndex = (clientIndex * destPerClient) + remain; 217 } 218 219 clientDestCount = destPerClient; 220 clientDestIndex = nextIndex; 221 222 // If there are more clients than destinations, share 223 // destinations per client 224 } else { 225 clientDestCount = 1; // At most one destination per client 226 clientDestIndex = clientIndex % numDests; 227 } 228 229 // Send to all for unknown behavior 230 } else { 231 LOG.warn("Unknown destination distribution type: " + distroType); 232 clientDestCount = numDests; 233 clientDestIndex = 0; 234 } 235 } 236 237 protected ConnectionFactory loadJmsFactory(String spiClass, Properties factorySettings) throws JMSException { 238 try { 239 Class<?> spi = Class.forName(spiClass); 240 SPIConnectionFactory spiFactory = (SPIConnectionFactory)spi.newInstance(); 241 ConnectionFactory jmsFactory = spiFactory.createConnectionFactory(factorySettings); 242 LOG.info("Created: " + jmsFactory.getClass().getName() + " using SPIConnectionFactory: " + spiFactory.getClass().getName()); 243 return jmsFactory; 244 } catch (Exception e) { 245 e.printStackTrace(); 246 throw new JMSException(e.getMessage()); 247 } 248 } 249 250 protected void setProviderMetaData(ConnectionMetaData metaData, JmsClientProperties props) throws JMSException { 251 props.setJmsProvider(metaData.getJMSProviderName() + "-" + metaData.getProviderVersion()); 252 props.setJmsVersion(metaData.getJMSVersion()); 253 254 String jmsProperties = ""; 255 Enumeration<?> jmsProps = metaData.getJMSXPropertyNames(); 256 while (jmsProps.hasMoreElements()) { 257 jmsProperties += jmsProps.nextElement().toString() + ","; 258 } 259 if (jmsProperties.length() > 0) { 260 // Remove the last comma 261 jmsProperties = jmsProperties.substring(0, jmsProperties.length() - 1); 262 } 263 props.setJmsProperties(jmsProperties); 264 } 265 266 protected abstract void runJmsClient(String clientName, int clientDestIndex, int clientDestCount); 267 268 protected static Properties parseStringArgs(String[] args) { 269 File configFile = null; 270 Properties props = new Properties(); 271 272 if (args == null || args.length == 0) { 273 return props; // Empty properties 274 } 275 276 for (int i = 0; i < args.length; i++) { 277 String arg = args[i]; 278 if (arg.startsWith("-D") || arg.startsWith("-d")) { 279 arg = arg.substring(2); 280 } 281 int index = arg.indexOf("="); 282 String key = arg.substring(0, index); 283 String val = arg.substring(index + 1); 284 285 if (key.equalsIgnoreCase("sysTest.propsConfigFile")) { 286 if (!val.endsWith(".properties")) { 287 val += ".properties"; 288 } 289 configFile = new File(val); 290 } 291 props.setProperty(key, val); 292 } 293 294 Properties fileProps = new Properties(); 295 try { 296 if (configFile != null) { 297 LOG.info("Loading properties file: " + configFile.getAbsolutePath()); 298 fileProps.load(new FileInputStream(configFile)); 299 } 300 } catch (IOException e) { 301 e.printStackTrace(); 302 } 303 // Overwrite file settings with command line settings 304 fileProps.putAll(props); 305 return fileProps; 306 } 307}