Show Table of Contents
13.5. JMS
The Java Message Service (JMS) is an API that allows Java Enterprise components to communicate with each other asynchronously and reliably.
Operations on the runtime engine and tasks can be done via the JMS API exposed by the jBPM console and KIE workbench. However, it's not possible to manage deployments or the knowledge base via this JMS API.
Unlike the REST API, it is possible to send a batch of commands to the JMS API that will all be processed in one request after which the responses to the commands will be collected and return in one response message.
13.5.1. JMS Queue Setup
When the Workbench is deployed on the JBoss AS or EAP server, it automatically creates 3 queues:
The
jms/queue/KIE.SESSIONjms/queue/KIE.TASKjms/queue/KIE.RESPONSE
KIE.SESSION and KIE.TASK queues should be used to send request messages to the JMS API. Command response messages will be then placed on the KIE.RESPONSE queues. Command request messages that involve starting and managing business processes should be sent to the KIE.SESSION and command request messages that involve managing human tasks, should be sent to the KIE.TASK queue.
Although there are 2 different input queues,
KIE.SESSION and KIE.TASK, this is only in order to provide multiple input queues so as to optimize processing: command request messages will be processed in the same manner regardless of which queue they're sent to. However, in some cases, users may send many more requests involving human tasks than requests involving business processes, but then not want the processing of business process-related request messages to be delayed by the human task messages. By sending the appropriate command request messages to the appropriate queues, this problem can be avoided.
The term "command request message" used above refers to a JMS byte message that contains a serialized
JaxbCommandsRequest object. At the moment, only XML serialization (as opposed to, JSON or protobuf, for example) is supported.
13.5.2. Serialization issues
Sometimes, users may wish to pass instances of their own classes as parameters to commands sent in a REST request or JMS message. In order to do this, there are a number of requirements.
- The user-defined class satisfy the following in order to be property serialized and deserialized by the JMS or REST API:
- The user-defined class must be correctly annotated with JAXB annotations, including the following:
- The user-defined class must be annotated with a
javax.xml.bind.annotation.XmlRootElementannotation with a non-emptynamevalue - All fields or getter/setter methods must be annotated with a
javax.xml.bind.annotation.XmlElementorjavax.xml.bind.annotation.XmlAttributeannotations.
Furthermore, the following usage of JAXB annotations is recommended:- Annotate the user-defined class with a
javax.xml.bind.annotation.XmlAccessorTypeannotation specifying that fields should be used, (javax.xml.bind.annotation.XmlAccessType.FIELD). This also means that you should annotate the fields (instead of the getter or setter methods) with@XmlElementor@XmlAttributeannotations. - Fields annotated with
@XmlElementor@XmlAttributeannotations should also be annotated withjavax.xml.bind.annotation.XmlSchemaTypeannotations specifying the type of the field, even if the fields contain primitive values. - Use objects to store primitive values. For example, use the
java.lang.Integerclass for storing an integer value, and not theintclass. This way it will always be obvious if the field is storing a value.
- The user-defined class definition must implement a no-arg constructor.
- Any fields in the user-defined class must either be object primitives (such as a
LongorString) or otherwise be objects that satisfy the first 2 requiremends in this list (correct usage of JAXB annotations and a no-arg constructor).
- The class definition must be included in the deployment jar of the deployment that the JMS message content is meant for.
- The sender must set a “deploymentId” string property on the JMS bytes message to the name of the deploymentId. This property is necessary in order to be able to load the proper classes from the deployment itself before deserializing the message on the server side.
Note
While submitting an instance of a user-defined class is possible via both the JMS and REST API's, retrieving an instance of the process variable is only possible via the REST API.
13.5.3. Example JMS Usage
The following is an example that shows how to use the JMS API. The numbers ("callouts") along the side of the example refer to notes below that explain particular parts of the example. It's supplied for those advanced users that do not wish to use the BPMS Remote Java API.
The BPMS Remote Java API, described here, will otherwise take care of all of the logic shown below.
// normal java imports skipped import org.drools.core.command.runtime.process.StartProcessCommand; import org.jbpm.services.task.commands.GetTaskAssignedAsPotentialOwnerCommand; import org.kie.api.command.Command; import org.kie.api.runtime.process.ProcessInstance; import org.kie.api.task.model.TaskSummary; import org.kie.services.client.api.command.exception.RemoteCommunicationException;import org.kie.services.client.serialization.JaxbSerializationProvider; import org.kie.services.client.serialization.SerializationConstants; import org.kie.services.client.serialization.SerializationException; import org.kie.services.client.serialization.jaxb.impl.JaxbCommandResponse; import org.kie.services.client.serialization.jaxb.impl.JaxbCommandsRequest; import org.kie.services.client.serialization.jaxb.impl.JaxbCommandsResponse; import org.kie.services.client.serialization.jaxb.rest.JaxbExceptionResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DocumentationJmsExamples { protected static final Logger logger = LoggerFactory.getLogger(DocumentationJmsExamples.class); public void sendAndReceiveJmsMessage() { String USER = "charlie"; String PASSWORD = "ch0c0licious"; String DEPLOYMENT_ID = "test-project"; String PROCESS_ID_1 = "oompa-processing"; URL serverUrl; try { serverUrl = new URL("http://localhost:8080/business-central/"); } catch (MalformedURLException murle) { logger.error("Malformed URL for the server instance!", murle); return; } // Create JaxbCommandsRequest instance and add commands Command<?> cmd = new StartProcessCommand(PROCESS_ID_1); int oompaProcessingResultIndex = 0;
JaxbCommandsRequest req = new JaxbCommandsRequest(DEPLOYMENT_ID, cmd);
req.getCommands().add(new GetTaskAssignedAsPotentialOwnerCommand(USER, "en-UK")); int loompaMonitoringResultIndex = 1;
// Get JNDI context from server InitialContext context = getRemoteJbossInitialContext(serverUrl, USER, PASSWORD); // Create JMS connection ConnectionFactory connectionFactory; try { connectionFactory = (ConnectionFactory) context.lookup("jms/RemoteConnectionFactory"); } catch (NamingException ne) { throw new RuntimeException("Unable to lookup JMS connection factory.", ne); } // Setup queues Queue sendQueue, responseQueue; try { sendQueue = (Queue) context.lookup("jms/queue/KIE.SESSION"); responseQueue = (Queue) context.lookup("jms/queue/KIE.RESPONSE"); } catch (NamingException ne) { throw new RuntimeException("Unable to lookup send or response queue", ne); } // Send command request Long processInstanceId = null; // needed if you're doing an operation on a PER_PROCESS_INSTANCE deployment String humanTaskUser = USER; JaxbCommandsResponse cmdResponse = sendJmsCommands( DEPLOYMENT_ID, processInstanceId, humanTaskUser, req, connectionFactory, sendQueue, responseQueue, USER, PASSWORD, 5); // Retrieve results ProcessInstance oompaProcInst = null; List<TaskSummary> charliesTasks = null; for (JaxbCommandResponse<?> response : cmdResponse.getResponses()) {
if (response instanceof JaxbExceptionResponse) { // something went wrong on the server side JaxbExceptionResponse exceptionResponse = (JaxbExceptionResponse) response; throw new RuntimeException(exceptionResponse.getMessage()); } if (response.getIndex() == oompaProcessingResultIndex) {
oompaProcInst = (ProcessInstance) response.getResult();
} else if (response.getIndex() == loompaMonitoringResultIndex) {
charliesTasks = (List<TaskSummary>) response.getResult();
} } } private JaxbCommandsResponse sendJmsCommands(String deploymentId, Long processInstanceId, String user, JaxbCommandsRequest req, ConnectionFactory factory, Queue sendQueue, Queue responseQueue, String jmsUser, String jmsPassword, int timeout) { req.setProcessInstanceId(processInstanceId); req.setUser(user); Connection connection = null; Session session = null; String corrId = UUID.randomUUID().toString(); String selector = "JMSCorrelationID = '" + corrId + "'"; JaxbCommandsResponse cmdResponses = null; try { // setup MessageProducer producer; MessageConsumer consumer; try { if (jmsPassword != null) { connection = factory.createConnection(jmsUser, jmsPassword); } else { connection = factory.createConnection(); } session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(sendQueue); consumer = session.createConsumer(responseQueue, selector); connection.start(); } catch (JMSException jmse) { throw new RemoteCommunicationException("Unable to setup a JMS connection.", jmse); } JaxbSerializationProvider serializationProvider = new JaxbSerializationProvider(); // if necessary, add user-created classes here: // xmlSerializer.addJaxbClasses(MyType.class, AnotherJaxbAnnotatedType.class); // Create msg BytesMessage msg; try { msg = session.createBytesMessage();
// set properties msg.setJMSCorrelationID(corrId);
msg.setIntProperty(SerializationConstants.SERIALIZATION_TYPE_PROPERTY_NAME, JaxbSerializationProvider.JMS_SERIALIZATION_TYPE);
Collection<Class<?>> extraJaxbClasses = serializationProvider.getExtraJaxbClasses(); if (!extraJaxbClasses.isEmpty()) { String extraJaxbClassesPropertyValue = JaxbSerializationProvider .classSetToCommaSeperatedString(extraJaxbClasses); msg.setStringProperty(SerializationConstants.EXTRA_JAXB_CLASSES_PROPERTY_NAME, extraJaxbClassesPropertyValue); msg.setStringProperty(SerializationConstants.DEPLOYMENT_ID_PROPERTY_NAME, deploymentId); } // serialize request String xmlStr = serializationProvider.serialize(req);
msg.writeUTF(xmlStr); } catch (JMSException jmse) { throw new RemoteCommunicationException("Unable to create and fill a JMS message.", jmse); } catch (SerializationException se) { throw new RemoteCommunicationException("Unable to deserialze JMS message.", se.getCause()); } // send try { producer.send(msg); } catch (JMSException jmse) { throw new RemoteCommunicationException("Unable to send a JMS message.", jmse); } // receive Message response; try { response = consumer.receive(timeout); } catch (JMSException jmse) { throw new RemoteCommunicationException("Unable to receive or retrieve the JMS response.", jmse); } if (response == null) { logger.warn("Response is empty, leaving"); return null; } // extract response assert response != null : "Response is empty."; try { String xmlStr = ((BytesMessage) response).readUTF(); cmdResponses = (JaxbCommandsResponse) serializationProvider.deserialize(xmlStr);
} catch (JMSException jmse) { throw new RemoteCommunicationException("Unable to extract " + JaxbCommandsResponse.class.getSimpleName() + " instance from JMS response.", jmse); } catch (SerializationException se) { throw new RemoteCommunicationException("Unable to extract " + JaxbCommandsResponse.class.getSimpleName() + " instance from JMS response.", se.getCause()); } assert cmdResponses != null : "Jaxb Cmd Response was null!"; } finally { if (connection != null) { try { connection.close(); session.close(); } catch (JMSException jmse) { logger.warn("Unable to close connection or session!", jmse); } } } return cmdResponses; } private InitialContext getRemoteJbossInitialContext(URL url, String user, String password) { Properties initialProps = new Properties(); initialProps.setProperty(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory"); String jbossServerHostName = url.getHost(); initialProps.setProperty(InitialContext.PROVIDER_URL, "remote://"+ jbossServerHostName + ":4447"); initialProps.setProperty(InitialContext.SECURITY_PRINCIPAL, user); initialProps.setProperty(InitialContext.SECURITY_CREDENTIALS, password); for (Object keyObj : initialProps.keySet()) { String key = (String) keyObj; System.setProperty(key, (String) initialProps.get(key)); } try { return new InitialContext(initialProps); } catch (NamingException e) { throw new RemoteCommunicationException("Unable to create " + InitialContext.class.getSimpleName(), e); } } }
|
These classes can all be found in the kie-services-client and the kie-services-jaxb JAR.
|
|
The JaxbCommandsRequest instance is the "holder" object in which you can place all of the commands you want to execute in a particular request. By using the JaxbCommandsRequest.getCommands() method, you can retrieve the list of commands in order to add more commands to the request.
A deployment id is required for command request messages that deal with business processes. Command request messages that only contain human task-related commands do not require a deployment id.
|
|
Note that the JMS message sent to the remote JMS API must be constructed as follows:
|
|
The same serialization mechanism used to serialize the request message will be used to serialize the response message.
|
|
In order to match the response to a command, to the initial command, use the index field of the returned JaxbCommandResponse instances. This index field will match the index of the initial command. Because not all commands will return a result, it's possible to send 3 commands with a command request message, and then receive a command response message that only includes one JaxbCommandResponse message with an index value of 1. That 1 then identifies it as the response to the second command.
|
|
Since many of the results returned by various commands are not serializable, the jBPM JMS Remote API converts these results into JAXB equivalents, all of which implement the JaxbCommandResponse interface. The JaxbCommandResponse.getResult() method then returns the JAXB equivalent to the actual result, which will conform to the interface of the result.
For example, in the code above, the StartProcessCommand returns a ProcessInstance. In order to return this object to the requester, the ProcessInstance is converted to a JaxbProcessInstanceResponse and then added as a JaxbCommandResponse to the command response message. The same applies to the List<TaskSummary> that's returned by the GetTaskAssignedAsPotentialOwnerCommand.
However, not all methods that can be called on a normal ProcessInstance can be called on the JaxbProcessInstanceResponse because the JaxbProcessInstanceResponse is simply a representation of a ProcessInstance object. This applies to various other command response as well. In particular, methods which require an active (backing) KieSession, such as ProcessInstance.getProess() or ProcessInstance.signalEvent(String type, Object event) will throw an UnsupportedOperationException.
|

import org.kie.services.client.serialization.JaxbSerializationProvider;
import org.kie.services.client.serialization.SerializationConstants;
import org.kie.services.client.serialization.SerializationException;
import org.kie.services.client.serialization.jaxb.impl.JaxbCommandResponse;
import org.kie.services.client.serialization.jaxb.impl.JaxbCommandsRequest;
import org.kie.services.client.serialization.jaxb.impl.JaxbCommandsResponse;
import org.kie.services.client.serialization.jaxb.rest.JaxbExceptionResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DocumentationJmsExamples {
protected static final Logger logger = LoggerFactory.getLogger(DocumentationJmsExamples.class);
public void sendAndReceiveJmsMessage() {
String USER = "charlie";
String PASSWORD = "ch0c0licious";
String DEPLOYMENT_ID = "test-project";
String PROCESS_ID_1 = "oompa-processing";
URL serverUrl;
try {
serverUrl = new URL("http://localhost:8080/business-central/");
} catch (MalformedURLException murle) {
logger.error("Malformed URL for the server instance!", murle);
return;
}
// Create JaxbCommandsRequest instance and add commands
Command<?> cmd = new StartProcessCommand(PROCESS_ID_1);
int oompaProcessingResultIndex = 0;
JaxbCommandsRequest req = new JaxbCommandsRequest(DEPLOYMENT_ID, cmd);
req.getCommands().add(new GetTaskAssignedAsPotentialOwnerCommand(USER, "en-UK"));
int loompaMonitoringResultIndex = 1;
if (response instanceof JaxbExceptionResponse) {
// something went wrong on the server side
JaxbExceptionResponse exceptionResponse = (JaxbExceptionResponse) response;
throw new RuntimeException(exceptionResponse.getMessage());
}
if (response.getIndex() == oompaProcessingResultIndex) {
// set properties
msg.setJMSCorrelationID(corrId);
} catch (JMSException jmse) {
throw new RemoteCommunicationException("Unable to extract " + JaxbCommandsResponse.class.getSimpleName()
+ " instance from JMS response.", jmse);
} catch (SerializationException se) {
throw new RemoteCommunicationException("Unable to extract " + JaxbCommandsResponse.class.getSimpleName()
+ " instance from JMS response.", se.getCause());
}
assert cmdResponses != null : "Jaxb Cmd Response was null!";
} finally {
if (connection != null) {
try {
connection.close();
session.close();
} catch (JMSException jmse) {
logger.warn("Unable to close connection or session!", jmse);
}
}
}
return cmdResponses;
}
private InitialContext getRemoteJbossInitialContext(URL url, String user, String password) {
Properties initialProps = new Properties();
initialProps.setProperty(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
String jbossServerHostName = url.getHost();
initialProps.setProperty(InitialContext.PROVIDER_URL, "remote://"+ jbossServerHostName + ":4447");
initialProps.setProperty(InitialContext.SECURITY_PRINCIPAL, user);
initialProps.setProperty(InitialContext.SECURITY_CREDENTIALS, password);
for (Object keyObj : initialProps.keySet()) {
String key = (String) keyObj;
System.setProperty(key, (String) initialProps.get(key));
}
try {
return new InitialContext(initialProps);
} catch (NamingException e) {
throw new RemoteCommunicationException("Unable to create " + InitialContext.class.getSimpleName(), e);
}
}
}
Where did the comment section go?
Red Hat's documentation publication system recently went through an upgrade to enable speedier, more mobile-friendly content. We decided to re-evaluate our commenting platform to ensure that it meets your expectations and serves as an optimal feedback mechanism. During this redesign, we invite your input on providing feedback on Red Hat documentation via the discussion platform.