Why my consumer application cannot consume message in a new thread

Solution Verified - Updated -

Issue

I am developing AMQ applications. My consumer application needs to be multi-thread, meaning that one main thread is responsible for initializing the connectionFactory, create the connection, session and consumer (but do not start the connection yet). My business application requirement needs another thread to set a flag(ActivemqQueueConsumerAsyn.flag) to the main thread indicating the consumer application is now ready to pull the messages from designed queue. Before the flag is set to true, the consumer is not able to process any messages.

In my application, I found that when there are messages already in the queue, the consumer application will not be able to process any messages, it get stuck in the sleep forever.

My sample consumer source code snippet:

  • ConsumerMain.java
public class ConsumerMain {
    public static void main(String[] args) throws InterruptedException, JMSException {
        PropertyConfigurator.configure(LOG4J_PATH);
        ActivemqQueueConsumerAsyn consumer = new ActivemqQueueConsumerAsyn();
        logger.info("consumer B is listening");
        consumer.recive();
        logger.info("consumer B finishes listening");
    }
}
  • ActivemqQueueConsumerAsync.java:
public class ActivemqQueueConsumerAsyn implements MessageListener {
    //custom flag to identify if the messages are ready to process
    public static boolean flag = false;
    public void initialize() throws JMSException {
        ActiveMQConnectionFactory connectFactory = new ActiveMQConnectionFactory(MqConstants.BROKER_URL);
        connectFactory.setAlwaysSessionAsync(false);
        Connection connection = connectFactory.createConnection(MqConstants.BROKER_USERNAME, MqConstants.BROKER_PASSWORD);
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(MqConstants.BROKER_QUEUE_NAME);
        consumer = session.createConsumer(destination);
        consumer.setMessageListener(this);
       // Note the connection is started in a new thread
        TestT1 testT1 = new TestT1(connection);
        testT1.start();
    }

    @Override
    public void onMessage(Message arg0) {
        logger.info("onMessage  is started ");
        try {

            if (arg0 instanceof TextMessage) {
                logger.info("sleep 10s. The message is not ack yet");
                Thread.sleep(10000);
                TextMessage txtMsg = (TextMessage) arg0;
                logger.info("consumer() async recive:" + txtMsg.getText());
            }
               //loop forever when the flag is false
            while (ActivemqQueueConsumerAsyn.flag == false) {
                Thread.sleep(20000);
                logger.info("sleep 20s. The message is not ack yet");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
}
  • TestT1.java:
public class TestT1 extends Thread{
    public TestT1(Connection connection) {
        super();
        this.connection = connection;
    }
    private Connection connection;
    @Override
    public void run() {
        try {
            connection.start();
           //Set the flag to true only after the connection is started
            ActivemqQueueConsumerAsyn.flag=true;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

The stack trace shows:

"Thread-2" #14 prio=5 os_prio=0 tid=0x000000001f6bc800 nid=0x41e0 waiting on condition [0x000000002018e000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
    at java.lang.Thread.sleep(Native Method)
    at com.activemq.queuedemo.consumer.ActivemqQueueConsumerAsyn.onMessage(ActivemqQueueConsumerAsyn.java:56)
    at org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1385)
    - locked <0x000000076c2c18d8> (a java.lang.Object)
    at org.apache.activemq.ActiveMQMessageConsumer.iterate(ActiveMQMessageConsumer.java:1556)
    at org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:191)
    at org.apache.activemq.ActiveMQSessionExecutor.wakeup(ActiveMQSessionExecutor.java:111)
    at org.apache.activemq.ActiveMQMessageConsumer.start(ActiveMQMessageConsumer.java:1531)
    at org.apache.activemq.ActiveMQSession.start(ActiveMQSession.java:1847)
    at org.apache.activemq.ActiveMQConnection.start(ActiveMQConnection.java:528)
    at com.activemq.queuedemo.consumer.TestT1.run(TestT1.java:15)

We can see the new thread stack trace has both connection.start() and consumer.onMessage() in the same thread. This caused the messages unable to be blocked.

Sometimes in the log I can also see below warning:

2019-12-12 10:23:48,639  WARN ActiveMQSessionExecutor:73 | Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: ActiveMQConnection {id=ID:terry-pc-52682-1576117424729-1:1,clientId=ID:terry-pc-52682-1576117424729-0:1,started=false} Received: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:terry-pc-52682-1576117424729-1:1:1:1, destination = queue://queue-test-a, message = ActiveMQTextMessage {commandId = 6, responseRequired = true, messageId = ID:terry-pc-52673-1576117250524-1:1:1:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:terry-pc-52673-1576117250524-1:1:1:1, destination = queue://queue-test-a, transactionId = null, expiration = 0, timestamp = 1576117252698, arrival = 0, brokerInTime = 1576117252698, brokerOutTime = 1576117428627, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@10b23ce6, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Hello,world,mq test!!!1111}, redeliveryCounter = 0}

Environment

  • Red Hat AMQ
    • 6.x

Subscriber exclusive content

A Red Hat subscription provides unlimited access to our knowledgebase, tools, and much more.

Current Customers and Partners

Log in for full access

Log In

New to Red Hat?

Learn more about Red Hat subscriptions

Using a Red Hat product through a public cloud?

How to access this content