16.2. JMS

The Java Message Service (JMS) is an API that allows Java Enterprise components to communicate with each other asynchronously and reliably.
Operations on the runtime engine and tasks can be done through the JMS API exposed by Business Central. However, it is not possible to manage deployments or the knowledge base via this JMS API.
Unlike the REST API, it is possible to send a batch of commands to the JMS API that will all be processed in one request after which the responses to the commands will be collected and return in one response message.

16.2.1. JMS Queue Setup

When you deploy Business Central on the WebSphere application server or EAP server, it automatically creates 3 queues:
  • jms/queue/KIE.SESSION
  • jms/queue/KIE.TASK
  • jms/queue/KIE.RESPONSE
The KIE.SESSION and KIE.TASK queues should be used to send request messages to the JMS API. Command response messages will be then placed on the KIE.RESPONSE queues. Command request messages that involve starting and managing business processes should be sent to the KIE.SESSION and command request messages that involve managing human tasks, should be sent to the KIE.TASK queue.
Although there are 2 different input queues, KIE.SESSION and KIE.TASK, this is only in order to provide multiple input queues so as to optimize processing: command request messages will be processed in the same manner regardless of which queue they're sent to. However, in some cases, users may send many more requests involving human tasks than requests involving business processes, but then not want the processing of business process-related request messages to be delayed by the human task messages. By sending the appropriate command request messages to the appropriate queues, this problem can be avoided.
The term "command request message" used above refers to a JMS byte message that contains a serialized JaxbCommandsRequest object. At the moment, only XML serialization (as opposed to, JSON or protobuf, for example) is supported.

16.2.2. Serialization issues

Sometimes, users may wish to pass instances of their own classes as parameters to commands sent in a REST request or JMS message. In order to do this, there are a number of requirements.
  1. The user-defined class satisfy the following in order to be property serialized and deserialized by the JMS or REST API:
    • The user-defined class must be correctly annotated with JAXB annotations, including the following:
      • The user-defined class must be annotated with a javax.xml.bind.annotation.XmlRootElement annotation with a non-empty name value
      • All fields or getter/setter methods must be annotated with a javax.xml.bind.annotation.XmlElement or javax.xml.bind.annotation.XmlAttribute annotations.
      Furthermore, the following usage of JAXB annotations is recommended:
      • Annotate the user-defined class with a javax.xml.bind.annotation.XmlAccessorType annotation specifying that fields should be used, (javax.xml.bind.annotation.XmlAccessType.FIELD). This also means that you should annotate the fields (instead of the getter or setter methods) with @XmlElement or @XmlAttribute annotations.
      • Fields annotated with @XmlElement or @XmlAttribute annotations should also be annotated with javax.xml.bind.annotation.XmlSchemaType annotations specifying the type of the field, even if the fields contain primitive values.
      • Use objects to store primitive values. For example, use the java.lang.Integer class for storing an integer value, and not the int class. This way it will always be obvious if the field is storing a value.
    • The user-defined class definition must implement a no-arg constructor.
    • Any fields in the user-defined class must either be object primitives (such as a Long or String) or otherwise be objects that satisfy the first 2 requirements in this list (correct usage of JAXB annotations and a no-arg constructor).
  2. The class definition must be included in the deployment jar of the deployment that the JMS message content is meant for.

    Note

    If you create your class definitions from an XSD schema, you may end up creating classes that inconsistently (among classes) refer to a namespace. This inconsistent use of a namespace can end up preventing a these class instance from being correctly deserialized when received as a parameter in a command on the server side.
    For example, you may create a class that is used in a BPMN2 process, and add an instance of this class as a parameter when starting the process. While sending the command/operation request (via the Remote (client) Java API) will succeed, the parameter will not be correctly deserialized on the server side, leading the process to eventually throw an exception about an unexpected type for the class.
  3. The sender must set a deploymentId string property on the JMS bytes message to the name of the deploymentId. This property is necessary in order to be able to load the proper classes from the deployment itself before deserializing the message on the server side.

Note

While submitting an instance of a user-defined class is possible via both the JMS and REST API's, retrieving an instance of the process variable is only possible via the REST API.

16.2.3. Example JMS Usage

The following is an example that shows how to use the JMS API. The numbers (callouts) along the side of the example refer to notes below that explain particular parts of the example. It's supplied for those advanced users that do not wish to use the JBoss BPM Suite Remote Java API.
The JBoss BPM Suite Remote Java API, described here, will otherwise take care of all of the logic shown below.
// normal java imports skipped

import org.drools.core.command.runtime.process.StartProcessCommand;
import org.jbpm.services.task.commands.GetTaskAssignedAsPotentialOwnerCommand;
import org.kie.api.command.Command;
import org.kie.api.runtime.process.ProcessInstance;
import org.kie.api.task.model.TaskSummary;
// 1 
import org.kie.services.client.api.command.exception.RemoteCommunicationException;
import org.kie.services.client.serialization.JaxbSerializationProvider;
import org.kie.services.client.serialization.SerializationConstants;
import org.kie.services.client.serialization.SerializationException;
import org.kie.services.client.serialization.jaxb.impl.JaxbCommandResponse;
import org.kie.services.client.serialization.jaxb.impl.JaxbCommandsRequest;
import org.kie.services.client.serialization.jaxb.impl.JaxbCommandsResponse;
import org.kie.services.client.serialization.jaxb.rest.JaxbExceptionResponse;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DocumentationJmsExamples {

  protected static final Logger logger = LoggerFactory.getLogger(DocumentationJmsExamples.class);

  public void sendAndReceiveJmsMessage() {

    String USER = "charlie";
    String PASSWORD = "ch0c0licious";

    String DEPLOYMENT_ID = "test-project";
    String PROCESS_ID_1 = "oompa-processing";
    URL serverUrl;
    try {
      serverUrl = new URL("http://localhost:8080/business-central/");
    } catch (MalformedURLException murle) {
      logger.error("Malformed URL for the server instance!", murle); 
      return;
    }

    // Create JaxbCommandsRequest instance and add commands
    Command<?> cmd = new StartProcessCommand(PROCESS_ID_1);
    int oompaProcessingResultIndex = 0;
    
    //5
    
    JaxbCommandsRequest req = new JaxbCommandsRequest(DEPLOYMENT_ID, cmd);
    
    //2
    
    req.getCommands().add(new GetTaskAssignedAsPotentialOwnerCommand(USER, "en-UK"));
    int loompaMonitoringResultIndex = 1;

		//5

    // Get JNDI context from server
    InitialContext context = getRemoteJbossInitialContext(serverUrl, USER, PASSWORD);
      
    // Create JMS connection 
    ConnectionFactory connectionFactory;
    try {
      connectionFactory = (ConnectionFactory) context.lookup("jms/RemoteConnectionFactory");
    } catch (NamingException ne) {
      throw new RuntimeException("Unable to lookup JMS connection factory.", ne);
    }

    // Setup queues
    Queue sendQueue, responseQueue;
    try {
      sendQueue = (Queue) context.lookup("jms/queue/KIE.SESSION");
      responseQueue = (Queue) context.lookup("jms/queue/KIE.RESPONSE");
    } catch (NamingException ne) {
      throw new RuntimeException("Unable to lookup send or response queue", ne);
    }

    // Send command request
    Long processInstanceId = null; // needed if you're doing an operation on a PER_PROCESS_INSTANCE deployment
    String humanTaskUser = USER;
    JaxbCommandsResponse cmdResponse = sendJmsCommands(
        DEPLOYMENT_ID, processInstanceId, humanTaskUser, req, 
        connectionFactory, sendQueue, responseQueue, 
        USER, PASSWORD, 5);

    // Retrieve results
    ProcessInstance oompaProcInst = null;
    List<TaskSummary> charliesTasks = null;
    
    //6
    
    for (JaxbCommandResponse<?> response : cmdResponse.getResponses()) {
      if (response instanceof JaxbExceptionResponse) {
        // something went wrong on the server side
        JaxbExceptionResponse exceptionResponse = (JaxbExceptionResponse) response;
        throw new RuntimeException(exceptionResponse.getMessage());
      }

			//5
		
      if (response.getIndex() == oompaProcessingResultIndex) {
        oompaProcInst = (ProcessInstance) response.getResult();
        
      //6
      
      } else if (response.getIndex() == loompaMonitoringResultIndex) {
    
    	//5  
    	
        charliesTasks = (List<TaskSummary>) response.getResult(); 
      
      //6

      }
    }
  }

  private JaxbCommandsResponse sendJmsCommands(String deploymentId, Long processInstanceId, String user, JaxbCommandsRequest req,
      ConnectionFactory factory, Queue sendQueue, Queue responseQueue, String jmsUser, String jmsPassword, int timeout) {
    req.setProcessInstanceId(processInstanceId);
    req.setUser(user);

    Connection connection = null;
    Session session = null;
    String corrId = UUID.randomUUID().toString();
    String selector = "JMSCorrelationID = '" + corrId + "'";
    JaxbCommandsResponse cmdResponses = null;
    try {

      // setup
      MessageProducer producer;
      MessageConsumer consumer;
      try {
        if (jmsPassword != null) {
          connection = factory.createConnection(jmsUser, jmsPassword);
        } else {
          connection = factory.createConnection();
        }
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        producer = session.createProducer(sendQueue);
        consumer = session.createConsumer(responseQueue, selector);

        connection.start();
      } catch (JMSException jmse) {
        throw new RemoteCommunicationException("Unable to setup a JMS connection.", jmse);
      }

      JaxbSerializationProvider serializationProvider = new JaxbSerializationProvider();
      // if necessary, add user-created classes here:
      // xmlSerializer.addJaxbClasses(MyType.class, AnotherJaxbAnnotatedType.class);

      // Create msg
      BytesMessage msg;
      try {
        msg = session.createBytesMessage();

			//3
			
        // set properties
        msg.setJMSCorrelationID(corrId);
        
      //3
      
        msg.setIntProperty(SerializationConstants.SERIALIZATION_TYPE_PROPERTY_NAME, JaxbSerializationProvider.JMS_SERIALIZATION_TYPE);
        
      //3  
      
        Collection<Class<?>> extraJaxbClasses = serializationProvider.getExtraJaxbClasses();
        if (!extraJaxbClasses.isEmpty()) {
          String extraJaxbClassesPropertyValue = JaxbSerializationProvider
              .classSetToCommaSeperatedString(extraJaxbClasses);
          msg.setStringProperty(SerializationConstants.EXTRA_JAXB_CLASSES_PROPERTY_NAME, extraJaxbClassesPropertyValue);
          msg.setStringProperty(SerializationConstants.DEPLOYMENT_ID_PROPERTY_NAME, deploymentId);
        }

        // serialize request
        String xmlStr = serializationProvider.serialize(req);
        msg.writeUTF(xmlStr);
        
      //3  
        
      } catch (JMSException jmse) {
        throw new RemoteCommunicationException("Unable to create and fill a JMS message.", jmse);
      } catch (SerializationException se) {
        throw new RemoteCommunicationException("Unable to deserialze JMS message.", se.getCause());
      }

      // send
      try {
        producer.send(msg);
      } catch (JMSException jmse) {
        throw new RemoteCommunicationException("Unable to send a JMS message.", jmse);
      }

      // receive
      Message response;
      
      //4
      
      try {
        response = consumer.receive(timeout);
      } catch (JMSException jmse) {
        throw new RemoteCommunicationException("Unable to receive or retrieve the JMS response.", jmse);
      }

      if (response == null) {
        logger.warn("Response is empty, leaving");
        return null;
      }
      // extract response
      assert response != null : "Response is empty.";
      try {
        String xmlStr = ((BytesMessage) response).readUTF();
        cmdResponses = (JaxbCommandsResponse) serializationProvider.deserialize(xmlStr);
      } catch (JMSException jmse) {
        throw new RemoteCommunicationException("Unable to extract " + JaxbCommandsResponse.class.getSimpleName()
            + " instance from JMS response.", jmse);
      } catch (SerializationException se) {
        throw new RemoteCommunicationException("Unable to extract " + JaxbCommandsResponse.class.getSimpleName()
            + " instance from JMS response.", se.getCause());
      }
      assert cmdResponses != null : "Jaxb Cmd Response was null!";
    } finally {
      if (connection != null) {
        try {
          connection.close();
          session.close();
        } catch (JMSException jmse) {
          logger.warn("Unable to close connection or session!", jmse);
        }
      }
    }
    return cmdResponses;
  }

  private InitialContext getRemoteJbossInitialContext(URL url, String user, String password) { 
    Properties initialProps = new Properties();
    initialProps.setProperty(InitialContext.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
    String jbossServerHostName = url.getHost();
    initialProps.setProperty(InitialContext.PROVIDER_URL, "remote://"+ jbossServerHostName + ":4447");
    initialProps.setProperty(InitialContext.SECURITY_PRINCIPAL, user);
    initialProps.setProperty(InitialContext.SECURITY_CREDENTIALS, password);

    for (Object keyObj : initialProps.keySet()) {
      String key = (String) keyObj;
      System.setProperty(key, (String) initialProps.get(key));
    }
    try {
      return new InitialContext(initialProps);
    } catch (NamingException e) {
      throw new RemoteCommunicationException("Unable to create " + InitialContext.class.getSimpleName(), e);
    }
  }
}
1. These classes can all be found in the kie-services-client and the kie-services-jaxb JAR.
2. The JaxbCommandsRequest instance is the "holder" object in which you can place all of the commands you want to execute in a particular request. By using the JaxbCommandsRequest.getCommands() method, you can retrieve the list of commands in order to add more commands to the request.
A deployment id is required for command request messages that deal with business processes. Command request messages that only contain human task-related commands do not require a deployment id.
3. Note that the JMS message sent to the remote JMS API must be constructed as follows:
  • It must be a JMS byte message.
  • It must have a filled JMS Correlation ID property.
  • It must have an int property with the name of "serialization" set to an acceptable value (only 0 at the moment).
  • It must contain a serialized instance of a JaxbCommandsRequest, added to the message as a UTF string
4. The same serialization mechanism used to serialize the request message will be used to serialize the response message.
5. In order to match the response to a command, to the initial command, use the index field of the returned JaxbCommandResponse instances. This index field will match the index of the initial command. Because not all commands will return a result, it's possible to send 3 commands with a command request message, and then receive a command response message that only includes one JaxbCommandResponse message with an index value of 1. That 1 then identifies it as the response to the second command.
6. Since many of the results returned by various commands are not serializable, the jBPM JMS Remote API converts these results into JAXB equivalents, all of which implement the JaxbCommandResponse interface. The JaxbCommandResponse.getResult() method then returns the JAXB equivalent to the actual result, which will conform to the interface of the result.
For example, in the code above, the StartProcessCommand returns a ProcessInstance. In order to return this object to the requester, the ProcessInstance is converted to a JaxbProcessInstanceResponse and then added as a JaxbCommandResponse to the command response message. The same applies to the List<TaskSummary> that's returned by the GetTaskAssignedAsPotentialOwnerCommand.
However, not all methods that can be called on a normal ProcessInstance can be called on the JaxbProcessInstanceResponse because the JaxbProcessInstanceResponse is simply a representation of a ProcessInstance object. This applies to various other command response as well. In particular, methods which require an active (backing) KieSession, such as ProcessInstance.getProess() or ProcessInstance.signalEvent(String type, Object event) will throw an UnsupportedOperationException.