Delay in consuming messages from topic
Issue:
Hi,
We have topic with durable subscribers. We could publish and consume the messages , however we notice some weird behavior while consuming the messages from the topic , there is a minute long delays while consuming message for particular subscriber.
We haven't used any message listener or MDBs for the topic, because, we don't have to consume the message as soon as message available on topic for particular subscriber.
The requirement is to publish the message to subscriber and read all the messages for particular subscriber using message selector later some point of time.
Here is the code for subscription,publishing and consuming messages from Topic. Please let me know if am missing anything.
Environment:
JBoss EAP 5.x
Code:
{code}
public void subscribeUser(String userName) {
TopicConnection topicConnection = null;
TopicSession topicSession = null;
TopicSubscriber topicSubscriber = null;
TopicPublisher topicPublisher = null;
try {
topicConnection = connectionFactory.createTopicConnection(notificationQUser, notificationQPwd);
topicConnection.setClientID("SomeId");
topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(topicName);
topicSubscriber = getTopicSubscriber(topicSession,topic, userName);
topicPublisher = topicSession.createPublisher(topic);
} catch (JMSException e) {
throw new RuntimeException("Error while subscribing to Topic", e);
} finally {
closeTopicResources(topicSubscriber, topicPublisher, topicSession, topicConnection);
}
}
public void publishMessage(CustomMessage customMessage, Long timeToLive, String userProperty) {
TopicConnection topicConnection = null;
TopicSession topicSession = null;
TopicPublisher topicPublisher = null;
try {
topicConnection = connectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(topicName);
topicPublisher = topicSession.createPublisher(topic);
ObjectMessage message = topicSession.createObjectMessage(customMessage);
message.setStringProperty("user", userProperty);
topicPublisher.publish(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, timeToLive);
} catch (JMSException e) {
throw new RuntimeException("Error Sending Message", e);
} finally {
closeTopicResources(null, topicPublisher, topicSession, topicConnection);
}
}
public Collection<CustomMessage> findAllMessages(String userId, int maxResults) {
Collection<CustomMessage> out = new ArrayList<CustomMessage>();
TopicConnection topicConnection = null;
TopicSession topicSession = null;
TopicSubscriber topicSubscriber = null;
try {
topicConnection = connectionFactory.createTopicConnection(notificationQUser, notificationQPwd);
topicConnection.setClientID("someId");
topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(topicName);
topicSubscriber = getTopicSubscriber(topicSession, topic, userId);
topicConnection.start();
Message msg = null;
int count = 0;
do {
msg = topicSubscriber.receiveNoWait();
if (msg instanceof ObjectMessage) {
ObjectMessage om = (ObjectMessage) msg;
if (om.getObject() instanceof CustomMessage) {
CustomMessage model = (CustomMessage) om.getObject();
out.add(model);
} else {
log.error(String.format("Skipping unexpected message payload of type %s", om.getObject().getClass().getSimpleName()));
}
} else if (msg != null) {
log.error(String.format("Skipping unexpected message type %s", msg.getClass().getSimpleName()));
}
} while (msg != null && out.size() <= maxResults);
} catch (JMSException e) {
throw new RuntimeException("Error retrieving Messages", e);
} finally {
closeTopicResources(topicSubscriber, null, topicSession, topicConnection);
}
return out;
}
private String getSubscriptionName(String userName) {
return String.format("user:%s", userName);
}
private TopicSubscriber getTopicSubscriber(TopicSession topicSession, Topic topic, String userName) throws JMSException {
return topicSession.createDurableSubscriber(topic, getSubscriptionName(userName), String.format("user = 'ALL' OR user = '%s'", userName), false);
}
{code}
Conguration:
In destination-service.xml
<mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=UserNotificationTopic" xmbean-dd="xmdesc/Topic-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
</mbean>
In jms-ds.xml, added new connection factory
<tx-connection-factory>
<jndi-name>NotificationJmsXA</jndi-name>
<xa-transaction />
<rar-name>jms-ra.rar</rar-name>
<connection-definition>org.jboss.resource.adapter.jms.JmsConnectionFactory</connection-definition>
<config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
<config-property name="JmsProviderAdapterJNDI" type="java.lang.String">java:/DefaultJMSProvider</config-property>
<config-property name="Strict" type="java.lang.Boolean">false</config-property>
<security-domain-and-application>JmsXARealm</security-domain-and-application>
<max-pool-size>60</max-pool-size>
</tx-connection-factory>
Responses
Hi,
You say "there is a minute long delays while consuming message for particular subscriber" what do you mean by "particular subscriber" in this context?
Hi,
As far as I can see what you are doing sounds fine, in order to get to the bottom of this I would suggest you open a support case with if possible a simple test case.
it would also be interesting to know what results you get if you changed .receiveNoWait() to something like .receive(300), see the following wiki for a discussion on .receiveNoWait() behavior:
http://community.jboss.org/wiki/JBMReceiveNoWait
I don't think I can really add much to what is explained in the wiki but the important part is that receiveNoWait is not guaranteed to return a message even if in theory there should be one waiting.
With regard to the timetolive, the JMS spec simply states that messages should not be delivered after the TTL is exceeded. It does not require them to be actively removed from queues before delivery.
With JBoss messaging if you tried to consume those message, they would get expired at the point of delivery and would not be received by the consumer.