001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.tool;
018
019import java.util.ArrayList;
020import java.util.List;
021
022import javax.jms.JMSException;
023import javax.jms.Message;
024import javax.jms.MessageListener;
025
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/**
030 * A simple container of messages for performing testing and rendezvous style
031 * code. You can use this class a {@link MessageListener} and then make
032 * assertions about how many messages it has received allowing a certain maximum
033 * amount of time to ensure that the test does not hang forever.
034 * <p/>
035 * Also you can chain these instances together with the
036 * {@link #setParent(MessageListener)} method so that you can aggregate the
037 * total number of messages consumed across a number of consumers.
038 *
039 * 
040 */
041public class MemMessageIdList implements MessageListener {
042
043    protected static final Logger LOG = LoggerFactory.getLogger(MemMessageIdList.class);
044
045    private List<String> messageIds = new ArrayList<String>();
046    private Object semaphore;
047    private boolean verbose;
048    private MessageListener parent;
049    private long maximumDuration = 15000L;
050
051    public MemMessageIdList() {
052        this(new Object());
053    }
054
055    public MemMessageIdList(Object semaphore) {
056        this.semaphore = semaphore;
057    }
058
059    public boolean equals(Object that) {
060        if (that instanceof MemMessageIdList) {
061            MemMessageIdList thatListMem = (MemMessageIdList) that;
062            return getMessageIds().equals(thatListMem.getMessageIds());
063        }
064        return false;
065    }
066
067    public int hashCode() {
068        synchronized (semaphore) {
069            return messageIds.hashCode() + 1;
070        }
071    }
072
073    public String toString() {
074        synchronized (semaphore) {
075            return messageIds.toString();
076        }
077    }
078
079    /**
080     * @return all the messages on the list so far, clearing the buffer
081     */
082    public List<String> flushMessages() {
083        synchronized (semaphore) {
084            List<String> answer = new ArrayList<String>(messageIds);
085            messageIds.clear();
086            return answer;
087        }
088    }
089
090    public synchronized List<String> getMessageIds() {
091        synchronized (semaphore) {
092            return new ArrayList<String>(messageIds);
093        }
094    }
095
096    public void onMessage(Message message) {
097        String id = null;
098        try {
099            id = message.getJMSMessageID();
100            synchronized (semaphore) {
101                messageIds.add(id);
102                semaphore.notifyAll();
103            }
104            if (verbose) {
105                LOG.info("Received message: " + message);
106            }
107        } catch (JMSException e) {
108            e.printStackTrace();
109        }
110        if (parent != null) {
111            parent.onMessage(message);
112        }
113    }
114
115    public int getMessageCount() {
116        synchronized (semaphore) {
117            return messageIds.size();
118        }
119    }
120
121    public void waitForMessagesToArrive(int messageCount) {
122        LOG.info("Waiting for " + messageCount + " message(s) to arrive");
123
124        long start = System.currentTimeMillis();
125
126        for (int i = 0; i < messageCount; i++) {
127            try {
128                if (hasReceivedMessages(messageCount)) {
129                    break;
130                }
131                long duration = System.currentTimeMillis() - start;
132                if (duration >= maximumDuration) {
133                    break;
134                }
135                synchronized (semaphore) {
136                    semaphore.wait(maximumDuration - duration);
137                }
138            } catch (InterruptedException e) {
139                LOG.info("Caught: " + e);
140            }
141        }
142        long end = System.currentTimeMillis() - start;
143
144        LOG.info("End of wait for " + end + " millis and received: " + getMessageCount() + " messages");
145    }
146
147
148    public boolean hasReceivedMessage() {
149        return getMessageCount() == 0;
150    }
151
152    public boolean hasReceivedMessages(int messageCount) {
153        return getMessageCount() >= messageCount;
154    }
155
156    public boolean isVerbose() {
157        return verbose;
158    }
159
160    public void setVerbose(boolean verbose) {
161        this.verbose = verbose;
162    }
163
164    public MessageListener getParent() {
165        return parent;
166    }
167
168    /**
169     * Allows a parent listener to be specified such as to aggregate messages
170     * consumed across consumers
171     */
172    public void setParent(MessageListener parent) {
173        this.parent = parent;
174    }
175
176}