Select Your Language

Infrastructure and Management

Cloud Computing

Storage

Runtimes

Integration and Automation

  • Comments
  • Delay in consuming messages from topic

    Posted on

    Issue:

    Hi,

    We have topic with durable subscribers. We could publish and consume the messages , however we notice some weird behavior while consuming the messages from the topic , there is a minute long delays while consuming message for particular subscriber.

    We haven't used any message listener or MDBs for the topic, because, we don't have to consume the message as soon as message available on topic for particular subscriber.

    The requirement is to publish the message to subscriber and read all the messages for particular subscriber using message selector later some point of time.

    Here is the code for subscription,publishing and consuming messages from Topic. Please let me know if am missing anything.

     

    Environment:

    JBoss EAP 5.x

     

    Code:

    {code}

    public void subscribeUser(String userName) {
        TopicConnection topicConnection = null;
        TopicSession topicSession = null;
        TopicSubscriber topicSubscriber = null;
        TopicPublisher topicPublisher = null;
        try {
          topicConnection = connectionFactory.createTopicConnection(notificationQUser, notificationQPwd);
          topicConnection.setClientID("SomeId");
          topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
          Topic topic = topicSession.createTopic(topicName);
          topicSubscriber = getTopicSubscriber(topicSession,topic, userName);
          topicPublisher = topicSession.createPublisher(topic);
        } catch (JMSException e) {
          throw new RuntimeException("Error while subscribing to Topic", e);
        } finally {
          closeTopicResources(topicSubscriber, topicPublisher, topicSession, topicConnection);
        }
      }

     public void publishMessage(CustomMessage customMessage, Long timeToLive, String userProperty) {
        TopicConnection topicConnection = null;
        TopicSession topicSession = null;
        TopicPublisher topicPublisher = null;
        try {
          topicConnection = connectionFactory.createTopicConnection();
          topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
          Topic topic = topicSession.createTopic(topicName);
          topicPublisher = topicSession.createPublisher(topic);
          ObjectMessage message = topicSession.createObjectMessage(customMessage);
          message.setStringProperty("user", userProperty);
          topicPublisher.publish(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, timeToLive);
        } catch (JMSException e) {
          throw new RuntimeException("Error Sending Message", e);
        } finally {
          closeTopicResources(null, topicPublisher, topicSession, topicConnection);
        }
      }

     public Collection findAllMessages(String userId, int maxResults) {
        Collection out = new ArrayList();
        TopicConnection topicConnection = null;
        TopicSession topicSession = null;
        TopicSubscriber topicSubscriber = null;
        try {
          topicConnection = connectionFactory.createTopicConnection(notificationQUser, notificationQPwd);
          topicConnection.setClientID("someId");
          topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
          Topic topic = topicSession.createTopic(topicName);
          topicSubscriber = getTopicSubscriber(topicSession, topic, userId);
          topicConnection.start();
          Message msg = null;
          int count = 0;
          do {
            msg = topicSubscriber.receiveNoWait();
            if (msg instanceof ObjectMessage) {
              ObjectMessage om = (ObjectMessage) msg;
              if (om.getObject() instanceof CustomMessage) {
                CustomMessage model = (CustomMessage) om.getObject();
                out.add(model);
              } else {
                log.error(String.format("Skipping unexpected message payload of type %s", om.getObject().getClass().getSimpleName()));
              }
            } else if (msg != null) {
              log.error(String.format("Skipping unexpected message type %s", msg.getClass().getSimpleName()));
            }
            
          } while (msg != null && out.size() = maxResults);
        } catch (JMSException e) {
          throw new RuntimeException("Error retrieving  Messages", e);
        } finally {
          closeTopicResources(topicSubscriber, null, topicSession, topicConnection);
        }
        return out;
      }

     private String getSubscriptionName(String userName) {
        return String.format("user:%s", userName);
      }

      private TopicSubscriber getTopicSubscriber(TopicSession topicSession, Topic topic, String userName) throws JMSException {
        return topicSession.createDurableSubscriber(topic, getSubscriptionName(userName), String.format("user = 'ALL' OR user = '%s'", userName), false);
      }

    {code}

    Conguration:

    In destination-service.xml


            jboss.messaging:service=ServerPeer
            jboss.messaging:service=PostOffice
        

     

    In jms-ds.xml, added new connection factory

       
            NotificationJmsXA
           
            jms-ra.rar
            org.jboss.resource.adapter.jms.JmsConnectionFactory
            javax.jms.Topic
            java:/DefaultJMSProvider
            false
            JmsXARealm
            60
        

    by

    points

    Responses

    Red Hat LinkedIn YouTube Facebook X, formerly Twitter

    Quick Links

    Help

    Site Info

    Related Sites

    © 2025 Red Hat