Delay in consuming messages from topic

Latest response

Issue:

Hi,

We have topic with durable subscribers. We could publish and consume the messages , however we notice some weird behavior while consuming the messages from the topic , there is a minute long delays while consuming message for particular subscriber.

We haven't used any message listener or MDBs for the topic, because, we don't have to consume the message as soon as message available on topic for particular subscriber.

The requirement is to publish the message to subscriber and read all the messages for particular subscriber using message selector later some point of time.

Here is the code for subscription,publishing and consuming messages from Topic. Please let me know if am missing anything.

 

Environment:

JBoss EAP 5.x

 

Code:

{code}

public void subscribeUser(String userName) {
    TopicConnection topicConnection = null;
    TopicSession topicSession = null;
    TopicSubscriber topicSubscriber = null;
    TopicPublisher topicPublisher = null;
    try {
      topicConnection = connectionFactory.createTopicConnection(notificationQUser, notificationQPwd);
      topicConnection.setClientID("SomeId");
      topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
      Topic topic = topicSession.createTopic(topicName);
      topicSubscriber = getTopicSubscriber(topicSession,topic, userName);
      topicPublisher = topicSession.createPublisher(topic);
    } catch (JMSException e) {
      throw new RuntimeException("Error while subscribing to Topic", e);
    } finally {
      closeTopicResources(topicSubscriber, topicPublisher, topicSession, topicConnection);
    }
  }

 public void publishMessage(CustomMessage customMessage, Long timeToLive, String userProperty) {
    TopicConnection topicConnection = null;
    TopicSession topicSession = null;
    TopicPublisher topicPublisher = null;
    try {
      topicConnection = connectionFactory.createTopicConnection();
      topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
      Topic topic = topicSession.createTopic(topicName);
      topicPublisher = topicSession.createPublisher(topic);
      ObjectMessage message = topicSession.createObjectMessage(customMessage);
      message.setStringProperty("user", userProperty);
      topicPublisher.publish(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, timeToLive);
    } catch (JMSException e) {
      throw new RuntimeException("Error Sending Message", e);
    } finally {
      closeTopicResources(null, topicPublisher, topicSession, topicConnection);
    }
  }

 public Collection<CustomMessage> findAllMessages(String userId, int maxResults) {
    Collection<CustomMessage> out = new ArrayList<CustomMessage>();
    TopicConnection topicConnection = null;
    TopicSession topicSession = null;
    TopicSubscriber topicSubscriber = null;
    try {
      topicConnection = connectionFactory.createTopicConnection(notificationQUser, notificationQPwd);
      topicConnection.setClientID("someId");
      topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
      Topic topic = topicSession.createTopic(topicName);
      topicSubscriber = getTopicSubscriber(topicSession, topic, userId);
      topicConnection.start();
      Message msg = null;
      int count = 0;
      do {
        msg = topicSubscriber.receiveNoWait();
        if (msg instanceof ObjectMessage) {
          ObjectMessage om = (ObjectMessage) msg;
          if (om.getObject() instanceof CustomMessage) {
            CustomMessage model = (CustomMessage) om.getObject();
            out.add(model);
          } else {
            log.error(String.format("Skipping unexpected message payload of type %s", om.getObject().getClass().getSimpleName()));
          }
        } else if (msg != null) {
          log.error(String.format("Skipping unexpected message type %s", msg.getClass().getSimpleName()));
        }
        
      } while (msg != null && out.size() <= maxResults);
    } catch (JMSException e) {
      throw new RuntimeException("Error retrieving  Messages", e);
    } finally {
      closeTopicResources(topicSubscriber, null, topicSession, topicConnection);
    }
    return out;
  }

 private String getSubscriptionName(String userName) {
    return String.format("user:%s", userName);
  }

  private TopicSubscriber getTopicSubscriber(TopicSession topicSession, Topic topic, String userName) throws JMSException {
    return topicSession.createDurableSubscriber(topic, getSubscriptionName(userName), String.format("user = 'ALL' OR user = '%s'", userName), false);
  }

{code}

Conguration:

In destination-service.xml

<mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=UserNotificationTopic" xmbean-dd="xmdesc/Topic-xmbean.xml">
        <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
        <depends>jboss.messaging:service=PostOffice</depends>
    </mbean>

 

In jms-ds.xml, added new connection factory

    <tx-connection-factory>
        <jndi-name>NotificationJmsXA</jndi-name>
        <xa-transaction />
        <rar-name>jms-ra.rar</rar-name>
        <connection-definition>org.jboss.resource.adapter.jms.JmsConnectionFactory</connection-definition>
        <config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
        <config-property name="JmsProviderAdapterJNDI" type="java.lang.String">java:/DefaultJMSProvider</config-property>
        <config-property name="Strict" type="java.lang.Boolean">false</config-property>
        <security-domain-and-application>JmsXARealm</security-domain-and-application>
        <max-pool-size>60</max-pool-size>
    </tx-connection-factory>

Responses