001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.tool;
018
019import java.io.File;
020import java.io.FileInputStream;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Enumeration;
024import java.util.List;
025import java.util.Properties;
026import java.util.Set;
027import java.util.concurrent.CountDownLatch;
028
029import javax.jms.ConnectionFactory;
030import javax.jms.ConnectionMetaData;
031import javax.jms.JMSException;
032
033import org.apache.activemq.tool.properties.AbstractObjectProperties;
034import org.apache.activemq.tool.properties.JmsClientProperties;
035import org.apache.activemq.tool.properties.JmsClientSystemProperties;
036import org.apache.activemq.tool.properties.JmsFactoryProperties;
037import org.apache.activemq.tool.properties.ReflectionUtil;
038import org.apache.activemq.tool.reports.PerformanceReportWriter;
039import org.apache.activemq.tool.reports.VerbosePerfReportWriter;
040import org.apache.activemq.tool.reports.XmlFilePerfReportWriter;
041import org.apache.activemq.tool.sampler.CpuSamplerTask;
042import org.apache.activemq.tool.sampler.PerformanceSampler;
043import org.apache.activemq.tool.sampler.ThroughputSamplerTask;
044import org.apache.activemq.tool.spi.SPIConnectionFactory;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048public abstract class AbstractJmsClientSystem extends AbstractObjectProperties {
049    private static final Logger LOG = LoggerFactory.getLogger(AbstractJmsClientSystem.class);
050
051    protected ThreadGroup clientThreadGroup;
052    protected ConnectionFactory jmsConnFactory;
053
054    // Properties
055    protected JmsFactoryProperties factory = new JmsFactoryProperties();
056    protected ThroughputSamplerTask tpSampler = new ThroughputSamplerTask();
057
058    private int clientDestIndex;
059    private int clientDestCount;
060
061    public void runSystemTest() throws JMSException {
062        // Create connection factory
063        jmsConnFactory = loadJmsFactory(getSysTest().getSpiClass(), factory.getFactorySettings());
064
065        setProviderMetaData(jmsConnFactory.createConnection().getMetaData(), getJmsClientProperties());
066
067        // Create performance sampler
068        PerformanceReportWriter writer = createPerfWriter();
069        writer.openReportWriter();
070        writer.writeProperties("jvmSettings", System.getProperties());
071        writer.writeProperties("testSystemSettings", ReflectionUtil.retrieveObjectProperties(getSysTest()));
072        writer.writeProperties("jmsFactorySettings", ReflectionUtil.retrieveObjectProperties(jmsConnFactory));
073        writer.writeProperties("jmsClientSettings", ReflectionUtil.retrieveObjectProperties(getJmsClientProperties()));
074
075        // set up performance samplers indicated by the user
076        List<PerformanceSampler> samplers = new ArrayList<>();
077
078        Set<String> requestedSamplers = getSysTest().getSamplersSet();
079        if (requestedSamplers.contains(JmsClientSystemProperties.SAMPLER_TP)) {
080            writer.writeProperties("tpSamplerSettings", ReflectionUtil.retrieveObjectProperties(tpSampler));
081            samplers.add(tpSampler);
082        }
083
084        if (requestedSamplers.contains(JmsClientSystemProperties.SAMPLER_CPU)) {
085            CpuSamplerTask cpuSampler = new CpuSamplerTask();
086            writer.writeProperties("cpuSamplerSettings", ReflectionUtil.retrieveObjectProperties(cpuSampler));
087
088            try {
089                cpuSampler.createPlugin();
090                samplers.add(cpuSampler);
091            } catch (IOException e) {
092                LOG.warn("Unable to start CPU sampler plugin. Reason: " + e.getMessage());
093            }
094        }
095
096        // spawn client threads
097        clientThreadGroup = new ThreadGroup(getSysTest().getClientPrefix() + " Thread Group");
098
099        int numClients = getSysTest().getNumClients();
100        final CountDownLatch clientCompletionLatch = new CountDownLatch(numClients);
101        for (int i = 0; i < numClients; i++) {
102            distributeDestinations(getSysTest().getDestDistro(), i, numClients, getSysTest().getTotalDests());
103
104            final String clientName = getSysTest().getClientPrefix() + i;
105            final int clientDestIndex = this.clientDestIndex;
106            final int clientDestCount = this.clientDestCount;
107            Thread t = new Thread(clientThreadGroup, new Runnable() {
108                @Override
109                public void run() {
110                    runJmsClient(clientName, clientDestIndex, clientDestCount);
111                    LOG.info("Client completed");
112                    clientCompletionLatch.countDown();
113                }
114            });
115            t.setName(getSysTest().getClientPrefix() + i + " Thread");
116            t.start();
117        }
118
119        // start the samplers
120        final CountDownLatch samplerCompletionLatch = new CountDownLatch(requestedSamplers.size());
121        for (PerformanceSampler sampler : samplers) {
122            sampler.setPerfReportWriter(writer);
123            sampler.startSampler(samplerCompletionLatch, getClientRunBasis(), getClientRunDuration());
124        }
125
126        try {
127            // wait for the clients to finish
128            clientCompletionLatch.await();
129            LOG.debug("All clients completed");
130        } catch (InterruptedException e) {
131            e.printStackTrace();
132        } finally {
133            // if count-based, ramp-down time is not relevant, shut the samplers down
134            if (getClientRunBasis() == ClientRunBasis.count) {
135                for (PerformanceSampler sampler : samplers) {
136                    sampler.finishSampling();
137                }
138            }
139
140            try {
141                LOG.debug("Waiting for samplers to shut down");
142                samplerCompletionLatch.await();
143                LOG.debug("All samplers completed");
144            } catch (InterruptedException e) {
145                e.printStackTrace();
146            } finally {
147                writer.closeReportWriter();
148            }
149        }
150    }
151
152    protected abstract ClientRunBasis getClientRunBasis();
153
154    protected abstract long getClientRunDuration();
155
156    public ThroughputSamplerTask getTpSampler() {
157        return tpSampler;
158    }
159
160    public JmsFactoryProperties getFactory() {
161        return factory;
162    }
163
164    public void setFactory(JmsFactoryProperties factory) {
165        this.factory = factory;
166    }
167
168    public abstract JmsClientSystemProperties getSysTest();
169
170    public abstract void setSysTest(JmsClientSystemProperties sysTestProps);
171
172    public abstract JmsClientProperties getJmsClientProperties();
173
174    protected PerformanceReportWriter createPerfWriter() {
175        if (getSysTest().getReportType().equalsIgnoreCase(JmsClientSystemProperties.REPORT_XML_FILE)) {
176            String reportName;
177
178            if ((reportName = getSysTest().getReportName()) == null) {
179                reportName = getSysTest().getClientPrefix() + "_" + "numClients" + getSysTest().getNumClients() + "_" + "numDests" + getSysTest().getTotalDests() + "_" + getSysTest().getDestDistro();
180            }
181            return new XmlFilePerfReportWriter(getSysTest().getReportDir(), reportName);
182        } else if (getSysTest().getReportType().equalsIgnoreCase(JmsClientSystemProperties.REPORT_VERBOSE)) {
183            return new VerbosePerfReportWriter();
184        } else {
185            // Use verbose if unknown report type
186            return new VerbosePerfReportWriter();
187        }
188    }
189
190    protected void distributeDestinations(String distroType, int clientIndex, int numClients, int numDests) {
191        if (distroType.equalsIgnoreCase(JmsClientSystemProperties.DEST_DISTRO_ALL)) {
192            clientDestCount = numDests;
193            clientDestIndex = 0;
194        } else if (distroType.equalsIgnoreCase(JmsClientSystemProperties.DEST_DISTRO_EQUAL)) {
195            int destPerClient = numDests / numClients;
196            // There are equal or more destinations per client
197            if (destPerClient > 0) {
198                clientDestCount = destPerClient;
199                clientDestIndex = destPerClient * clientIndex;
200                // If there are more clients than destinations, share
201                // destinations per client
202            } else {
203                clientDestCount = 1; // At most one destination per client
204                clientDestIndex = clientIndex % numDests;
205            }
206        } else if (distroType.equalsIgnoreCase(JmsClientSystemProperties.DEST_DISTRO_DIVIDE)) {
207            int destPerClient = numDests / numClients;
208            // There are equal or more destinations per client
209            if (destPerClient > 0) {
210                int remain = numDests % numClients;
211                int nextIndex;
212                if (clientIndex < remain) {
213                    destPerClient++;
214                    nextIndex = clientIndex * destPerClient;
215                } else {
216                    nextIndex = (clientIndex * destPerClient) + remain;
217                }
218
219                clientDestCount = destPerClient;
220                clientDestIndex = nextIndex;
221
222                // If there are more clients than destinations, share
223                // destinations per client
224            } else {
225                clientDestCount = 1; // At most one destination per client
226                clientDestIndex = clientIndex % numDests;
227            }
228
229            // Send to all for unknown behavior
230        } else {
231            LOG.warn("Unknown destination distribution type: " + distroType);
232            clientDestCount = numDests;
233            clientDestIndex = 0;
234        }
235    }
236
237    protected ConnectionFactory loadJmsFactory(String spiClass, Properties factorySettings) throws JMSException {
238        try {
239            Class<?> spi = Class.forName(spiClass);
240            SPIConnectionFactory spiFactory = (SPIConnectionFactory)spi.newInstance();
241            ConnectionFactory jmsFactory = spiFactory.createConnectionFactory(factorySettings);
242            LOG.info("Created: " + jmsFactory.getClass().getName() + " using SPIConnectionFactory: " + spiFactory.getClass().getName());
243            return jmsFactory;
244        } catch (Exception e) {
245            e.printStackTrace();
246            throw new JMSException(e.getMessage());
247        }
248    }
249
250    protected void setProviderMetaData(ConnectionMetaData metaData, JmsClientProperties props) throws JMSException {
251        props.setJmsProvider(metaData.getJMSProviderName() + "-" + metaData.getProviderVersion());
252        props.setJmsVersion(metaData.getJMSVersion());
253
254        String jmsProperties = "";
255        Enumeration<?> jmsProps = metaData.getJMSXPropertyNames();
256        while (jmsProps.hasMoreElements()) {
257            jmsProperties += jmsProps.nextElement().toString() + ",";
258        }
259        if (jmsProperties.length() > 0) {
260            // Remove the last comma
261            jmsProperties = jmsProperties.substring(0, jmsProperties.length() - 1);
262        }
263        props.setJmsProperties(jmsProperties);
264    }
265
266    protected abstract void runJmsClient(String clientName, int clientDestIndex, int clientDestCount);
267
268    protected static Properties parseStringArgs(String[] args) {
269        File configFile = null;
270        Properties props = new Properties();
271
272        if (args == null || args.length == 0) {
273            return props; // Empty properties
274        }
275
276        for (int i = 0; i < args.length; i++) {
277            String arg = args[i];
278            if (arg.startsWith("-D") || arg.startsWith("-d")) {
279                arg = arg.substring(2);
280            }
281            int index = arg.indexOf("=");
282            String key = arg.substring(0, index);
283            String val = arg.substring(index + 1);
284
285            if (key.equalsIgnoreCase("sysTest.propsConfigFile")) {
286                if (!val.endsWith(".properties")) {
287                    val += ".properties";
288                }
289                configFile = new File(val);
290            }
291            props.setProperty(key, val);
292        }
293
294        Properties fileProps = new Properties();
295        try {
296            if (configFile != null) {
297                LOG.info("Loading properties file: " + configFile.getAbsolutePath());
298                fileProps.load(new FileInputStream(configFile));
299            }
300        } catch (IOException e) {
301            e.printStackTrace();
302        }
303        // Overwrite file settings with command line settings
304        fileProps.putAll(props);
305        return fileProps;
306    }
307}