Delay in consuming messages from topic

Latest response

Issue:

Hi,

We have topic with durable subscribers. We could publish and consume the messages , however we notice some weird behavior while consuming the messages from the topic , there is a minute long delays while consuming message for particular subscriber.

We haven't used any message listener or MDBs for the topic, because, we don't have to consume the message as soon as message available on topic for particular subscriber.

The requirement is to publish the message to subscriber and read all the messages for particular subscriber using message selector later some point of time.

Here is the code for subscription,publishing and consuming messages from Topic. Please let me know if am missing anything.

 

Environment:

JBoss EAP 5.x

 

Code:

{code}

public void subscribeUser(String userName) {
    TopicConnection topicConnection = null;
    TopicSession topicSession = null;
    TopicSubscriber topicSubscriber = null;
    TopicPublisher topicPublisher = null;
    try {
      topicConnection = connectionFactory.createTopicConnection(notificationQUser, notificationQPwd);
      topicConnection.setClientID("SomeId");
      topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
      Topic topic = topicSession.createTopic(topicName);
      topicSubscriber = getTopicSubscriber(topicSession,topic, userName);
      topicPublisher = topicSession.createPublisher(topic);
    } catch (JMSException e) {
      throw new RuntimeException("Error while subscribing to Topic", e);
    } finally {
      closeTopicResources(topicSubscriber, topicPublisher, topicSession, topicConnection);
    }
  }

 public void publishMessage(CustomMessage customMessage, Long timeToLive, String userProperty) {
    TopicConnection topicConnection = null;
    TopicSession topicSession = null;
    TopicPublisher topicPublisher = null;
    try {
      topicConnection = connectionFactory.createTopicConnection();
      topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
      Topic topic = topicSession.createTopic(topicName);
      topicPublisher = topicSession.createPublisher(topic);
      ObjectMessage message = topicSession.createObjectMessage(customMessage);
      message.setStringProperty("user", userProperty);
      topicPublisher.publish(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, timeToLive);
    } catch (JMSException e) {
      throw new RuntimeException("Error Sending Message", e);
    } finally {
      closeTopicResources(null, topicPublisher, topicSession, topicConnection);
    }
  }

 public Collection<CustomMessage> findAllMessages(String userId, int maxResults) {
    Collection<CustomMessage> out = new ArrayList<CustomMessage>();
    TopicConnection topicConnection = null;
    TopicSession topicSession = null;
    TopicSubscriber topicSubscriber = null;
    try {
      topicConnection = connectionFactory.createTopicConnection(notificationQUser, notificationQPwd);
      topicConnection.setClientID("someId");
      topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
      Topic topic = topicSession.createTopic(topicName);
      topicSubscriber = getTopicSubscriber(topicSession, topic, userId);
      topicConnection.start();
      Message msg = null;
      int count = 0;
      do {
        msg = topicSubscriber.receiveNoWait();
        if (msg instanceof ObjectMessage) {
          ObjectMessage om = (ObjectMessage) msg;
          if (om.getObject() instanceof CustomMessage) {
            CustomMessage model = (CustomMessage) om.getObject();
            out.add(model);
          } else {
            log.error(String.format("Skipping unexpected message payload of type %s", om.getObject().getClass().getSimpleName()));
          }
        } else if (msg != null) {
          log.error(String.format("Skipping unexpected message type %s", msg.getClass().getSimpleName()));
        }
        
      } while (msg != null && out.size() <= maxResults);
    } catch (JMSException e) {
      throw new RuntimeException("Error retrieving  Messages", e);
    } finally {
      closeTopicResources(topicSubscriber, null, topicSession, topicConnection);
    }
    return out;
  }

 private String getSubscriptionName(String userName) {
    return String.format("user:%s", userName);
  }

  private TopicSubscriber getTopicSubscriber(TopicSession topicSession, Topic topic, String userName) throws JMSException {
    return topicSession.createDurableSubscriber(topic, getSubscriptionName(userName), String.format("user = 'ALL' OR user = '%s'", userName), false);
  }

{code}

Conguration:

In destination-service.xml

<mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=UserNotificationTopic" xmbean-dd="xmdesc/Topic-xmbean.xml">
        <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
        <depends>jboss.messaging:service=PostOffice</depends>
    </mbean>

 

In jms-ds.xml, added new connection factory

    <tx-connection-factory>
        <jndi-name>NotificationJmsXA</jndi-name>
        <xa-transaction />
        <rar-name>jms-ra.rar</rar-name>
        <connection-definition>org.jboss.resource.adapter.jms.JmsConnectionFactory</connection-definition>
        <config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
        <config-property name="JmsProviderAdapterJNDI" type="java.lang.String">java:/DefaultJMSProvider</config-property>
        <config-property name="Strict" type="java.lang.Boolean">false</config-property>
        <security-domain-and-application>JmsXARealm</security-domain-and-application>
        <max-pool-size>60</max-pool-size>
    </tx-connection-factory>

Responses

Hi,

 

You say "there is a minute long delays while consuming message for particular subscriber" what do you mean by "particular subscriber" in this context?

Hi Colin,

 

We have a topic with n number of subscribers(Users of our app). Admin can publish a message to all the users subscribed to topic or few users. Whenever users logs in to application, He/She (Subscriber ) should be able to see all the messages available for him. To achieve this, I am using message selector  the filters the messages based on the userId property as in the code above. Here, the user referes to a particular subscriber.  I hope you are clear now.

 

We are running a timer on UI  that pings the topic and tries to read the messages for logged in user on specified time interval(say every 2 min).  However, the messages are not reading even after many calls from UI. .  Let me know if I am missing something in the code or config.

Hi,

 

As far as I can see what you are doing sounds fine, in order to get to the bottom of this I would suggest you open a support case with if possible a simple test case.

it would also be interesting to know what results you get if you changed .receiveNoWait() to something like .receive(300), see the following wiki for a discussion on .receiveNoWait() behavior:

 

http://community.jboss.org/wiki/JBMReceiveNoWait

Hi Colin,

 

Thanks for the inputs. The receive(1000L*2) seems to be working.Wondering  why we need delay of 2 seconds to consume the messages .  I went through the link which you suggested, I couldn't get the clear picture about receiveNoWait(). Could you please elaborate if you have some points.

 

Also, I have one more issue in the same piece of code. Even though I am setting timetolive for messages, I still able to see the messages on topic after expiry of timetolive. As I am using durable subscriber, the message will there on topic till the  subscriber consumes the message, Is timetolive not applicable for durable subscribers ? or Am I missing something ?

 

Thanks

I don't think I can really add much to what is explained in the wiki but the important part is that receiveNoWait is not guaranteed to return a message even if in theory there should be one waiting.

With regard to the timetolive, the JMS spec simply states that messages should not be delivered after the TTL is exceeded. It does not require them to be actively removed from queues before delivery.

With JBoss messaging if you tried to consume those message, they would get expired at the point of delivery and would not be received by the consumer.

Hi Colin,

Thanks for clarifying TTL. I am having one more issue with  creating durable subscribers, I get below exception if the TopicSession is already created a durable scriber on topic using  "Unique property Name",.

  1. javax.jms.IllegalStateException: Cannot create a subscriber on the durable subscription since it already has subscriber(s)  
  2.     at org.jboss.jms.server.endpoint.ServerSessionEndpoint.createConsumerDelegateInternal(ServerSessionEndpoint.java:1989)  
  3.     at org.jboss.jms.server.endpoint.ServerSessionEndpoint.createConsumerDelegate(ServerSessionEndpoint.java:260)  
  4.     at org.jboss.jms.server.endpoint.advised.SessionAdvised.org$jboss$jms$server$endpoint$advised$SessionAdvised$createConsumerDelegate$aop(SessionAdvised.java:94)  
  5.     at org.jboss.jms.server.endpoint.advised.SessionAdvised$createConsumerDelegate_8721389917985689973.invokeTarget(SessionAdvised$createConsumerDelegate_8721389917985689973.java)

As our client tries to read the messages for a subscriber from UI,  there might be chances that , user can use the APP simultaneously in multiple browsers and this causes to throw that exception. I understand JMS uses unique NAME property to distinguish multiple subscribers therefore the NAME property cannot be same for two clients calls at a time.

 

Is there way to check if durable subscriber is already created & active on Topic before creating a new one to avoid this exception ?   Does any changes on topic behavior if we catch the exception and proceed ?

 

Thanks

You can probably get a list of all of the durable subscriptions using JMX but I don't think it would help in this situation. I would just capture the exception and proceed with a new unique name.