Chapter 5. Design and Development

5.1. HACEP Integration and Source Examples

This reference architecture includes an example application that is designed, deployed, and tested herein. The application consists of four modules: the integration code with RESTful interface, the supporting data model, the rules package, and a runner script used to populate the Event Channel and exercise the application.

This application is alternatively referred to as hacep-integration and the source code is available as part of the downloaded attachment accompanying this paper. While a complete copy of the example application is provided with this reference architecture, this section walks the reader through every step of design and development. By following the steps outlined in this section, the reader is able to replicate the original effort and recreate every component of the application.

The HACEP source code itself contains two examples of HACEP integration, one intended for usage via command line, and the other being an example EAP integration itself. While the structure of the latter was referenced quite heavily in the creation of the integration application detailed below, the two are somewhat different in that the provided source example ties into a larger subset of modules intended for demonstration and exhibit of the HACEP framework and its features, whereas the application built herein minimally shows the efforts required for integration into your own application or service code and allows coverage of the steps from start to finish without the need to filter out extraneous parts or code.

5.2. Project Setup

This reference architecture assumes that the previous installation and configuration steps have been followed and the environment set up. It’s also assumed that the HACEP source code has been copied out of the accompanying download for reference. In order to support the JBoss Maven repositories necessary for working with the project, modify or create a settings.xml file in your local machine’s /.m2/ directory, typically located in the user’s home directory. An example settings.xml is included in the HACEP source code under example-maven-settings and instructions for where to download and set up the mentioned repositories can be found in the project README within the source code.

5.3. Running the CLI HACEP Example

Within the source code in the hacep-examples directory, resides a hacep-playground example which can be used to quickly preview the functionality of the HACEP framework and become familiar with the various commands present within the more complex hacep-eap-playground example. In order to run a four-node example from one machine, you can run the following commands in separate terminals, then take the various instances on and offline to see the clustering in action.

mvn -P run -DnodeName=node1 -Djava.net.preferIPv4Stack=true -Djgroups.bind_addr=localhost -Dgrid.buffer=5000 -Dqueue.url=tcp://localhost:61616 -Dqueue.security=true -Dqueue.usr=admin -Dqueue.pwd=admin

mvn -P run -DnodeName=node2 -Djava.net.preferIPv4Stack=true -Djgroups.bind_addr=localhost -Dgrid.buffer=5000 -Dqueue.url=tcp://localhost:61616 -Dqueue.security=true -Dqueue.usr=admin -Dqueue.pwd=admin

mvn -P run -DnodeName=node3 -Djava.net.preferIPv4Stack=true -Djgroups.bind_addr=localhost -Dgrid.buffer=5000 -Dqueue.url=tcp://localhost:61616 -Dqueue.security=true -Dqueue.usr=admin -Dqueue.pwd=admin

mvn -P run -DnodeName=node4 -Djava.net.preferIPv4Stack=true -Djgroups.bind_addr=localhost -Dgrid.buffer=5000 -Dqueue.url=tcp://localhost:61616 -Dqueue.security=true -Dqueue.usr=admin -Dqueue.pwd=admin

Once running, each instance will give you access the the HACEP CLI prompt. To start, use the help command to view the various commands and usage available. For example, info will show you a small amount of information about the active HACEP cluster, as seen below:

---------------------------------------
                HACEP CLI
---------------------------------------

> help
Commands:
address
		Address of this cluster node
all <cache>
		List all values.
change <user>
		change <user> <oldpassword> to <newpassword>
get <cache> <key> <player>
		Get an object from the <cache>.
help
		List of commands.
info [<cache>]
		General information or specific information on cache.
local <cache>
		List all local valuesFromKeys.
login <user> <password>
		Login the <user>
logout <user>
		Logout the <user>
primary <cache>
		List all local valuesFromKeys for which this node is primary.
quit|exit|q|x
		Exit the shell.
replica <cache>
		List all local values for which this node is a replica.
> info
Cache Manager Status: RUNNING
Cache Manager Address: lab1-55621
Coordinator address: lab1-20909
Is Coordinator: false
Cluster Name: HACEP
Cluster Size: 2
Member list: [lab1-20909, lab1-55621]
Caches: [fact, session]

5.4. Running the EAP HACEP Example

The other provided example application included with the HACEP source code is one that demonstrates EAP integration. Since we already have an environment established for building our own integration, testing this example is as simple as building the .war file via mvn clean package in the root directory of the HACEP codebase. Next, modify the JNDI name within the standalone.xml resource adapter definition to read java:/HACEPConnectionFactory rather than java:/HacepConnectionFactory (notice the capitalization differences). Following, copy the .war file out of the hacep-eap-playground/target directory into the [root_eap_install]/standalone/deployments/ directory on the intended HACEP nodes and start the standalone server.

Once up and running, a similar set of commands is available for use via a RESTful interface. The format for accessing them via browser is [ip_addr]:8080/[war_name]/execute/info, such as seen below:

Figure 5.1. EAP Playground Example Info Screen

EAP Playground Example Info Screen

The HACEP source code also includes a module called hacep-perf-client that contains a runnable Java class for populating the Event Channel, lending the ability to monitor the HACEP nodes while in action. While the servers are running, navigate to the perf-client’s target directory, then execute the script using the following format:

java -Dduration=480 -Dconcurrent.players=10 -Ddelay.range=5 -Devent.interval=1 -Dtest.preload=true -Dtest.messages=20000 -Dbroker.host="[AMQ_MASTER_NODE_IP]:61616" -Dbroker.authentication=true -Dbroker.usr=admin -Dbroker.pwd=admin -Dorg.apache.activemq.SERIALIZABLE_PACKAGES="*" -cp hacep-perf-client-1.0-SNAPSHOT.jar it.redhat.hacep.client.App

While executing, a multitude of events are published to the Event Channel by parallel threads and thus consumed by the various HACEP nodes, potentially triggering rules regarding a video game POJO model included within the source code. These actions and rules should lead to some output within the server logs. While running, it’s also possible to take a HACEP node offline and back on again to witness the logs of the rebalancing process in action. When satisfied with the demonstration, stop the jar process, then un-deploy the EAP Playground and reset the JNDI reference name within standalone.xml as it was prior to setting up this example.

5.5. Integrating HACEP into an Application

The example HACEP integration application built herein will be comprised of four modules. The first, the integration-app module, will be the main part of the application extending various HACEP configurations and housing the RESTful interface we’ll use to monitor the HACEP instances. The second module, integration-model, will house the POJO models that will be used in both the main code and the rules module. Next, the integration-rules module will hold the BRMS rules and KIE configuration used to establish the CEP sessions. Lastly, a module called purchase-publisher is included that will act as a runner application to feed events into the Event Channel in order to exercise our HACEP-infused application.

5.5.1. Parent Project Configuration

The parent project which will house the various modules mentioned in the previous section is called hacep-integration. This parent project will do little more than orchestrate the dependencies of the inner modules and control packaging and various other maven commands. We start by initializing a new Maven Project in JBoss Developer Studio with the following information and hitting Finish:

Figure 5.2. Parent Pom Information

Parent Pom Information

With the outlying parent project now present, we can edit the pom.xml file to lay out the project structure required. Begin by adding the modules, license, and properties to the pom file:

<modules>
    <module>integration-app</module>
    <module>integration-rules</module>
    <module>integration-model</module>
    <module>purchase-publisher</module>
</modules>
 
<licenses>
    <license>
        <name>Apache License, Version 2.0</name>
        <distribution>repo</distribution>
        <url>www.apache.org/licenses/LICENSE-2.0.html</url>
    </license>
</licenses>
 
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.plugin.version>3.3</maven.compiler.plugin.version>
    <maven.resources.plugin.version>2.6</maven.resources.plugin.version>
    <maven.war.plugin.version>2.6</maven.war.plugin.version>
    <maven.jar.plugin.version>2.6</maven.jar.plugin.version>
    <maven.surefire.plugin.version>2.19.1</maven.surefire.plugin.version>
    <maven.dependency.plugin.version>2.10</maven.dependency.plugin.version>
    <maven.exec.plugin.version>1.3.2</maven.exec.plugin.version>
 
    <hacep.version>1.0-SNAPSHOT</hacep.version>
    <junit.version>4.12</junit.version>
    <mockito.version>1.10.19</mockito.version>
    <log4j.version>1.2.17</log4j.version>
    <slf4j.version>1.7.12</slf4j.version>
</properties>

These sections define the child modules that will be included, the Apache license which the software will fall under, and various version variables we’ll need throughout the pom configuration. Next, add a default profile referencing the JBoss repositories previously mentioned:

<profiles>
    <profile>
    <id>supported-GA</id>
    <activation>
        <activeByDefault>true</activeByDefault>
    </activation>
    <properties>
        <version.org.infinispan>8.3.0.Final-redhat-1</version.org.infinispan>
        <version.org.jboss.fuse>6.2.1.redhat-084</version.org.jboss.fuse>
        <version.org.jboss.bom.brms>6.3.0.GA-redhat-3</version.org.jboss.bom.brms>
        <version.org.jboss.bom.eap>6.4.5.GA</version.org.jboss.bom.eap>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.jboss.bom.eap</groupId>
                <artifactId>jboss-javaee-6.0-with-transactions</artifactId>
                <version>${version.org.jboss.bom.eap}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
 
            <dependency>
                <groupId>org.infinispan</groupId>
                <artifactId>infinispan-bom</artifactId>
                <version>${version.org.infinispan}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
 
            <dependency>
                <groupId>org.jboss.bom.brms</groupId>
                <artifactId>jboss-brms-bpmsuite-bom</artifactId>
                <version>${version.org.jboss.bom.brms}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>org.jboss.fuse.bom</groupId>
                <artifactId>jboss-fuse-parent</artifactId>
                <version>${version.org.jboss.fuse}</version>
                <type>pom</type>
                <scope>import</scope>
                <exclusions>
                    <exclusion>
                        <groupId>com.google.guava</groupId>
                        <artifactId>guava</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>
    </dependencyManagement>
    </profile>
</profiles>

Beneath the profiles, add the dependencyManagement and dependencies sections which will orchestrate child module dependency versions, inclusions, and Java EE API usage:

<dependencyManagement>
    <dependencies>
 
        <!-- HACEP Framework Dependencies -→
        <dependency>
            <groupId>it.redhat.jdg</groupId>
            <artifactId>hacep-core</artifactId>
            <version>${hacep.version}</version>
        </dependency>
        <dependency>
            <groupId>it.redhat.jdg</groupId>
            <artifactId>hacep-core-model</artifactId>
            <version>${hacep.version}</version>
        </dependency>
        <dependency>
            <groupId>it.redhat.jdg</groupId>
            <artifactId>hacep-core-camel</artifactId>
            <version>${hacep.version}</version>
        </dependency>
 
        <!-- HACEP Integration Dependencies -→
        <dependency>
            <groupId>com.redhat.refarch.hacep</groupId>
            <artifactId>integration-model</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>com.redhat.refarch.hacep</groupId>
            <artifactId>integration-rules</artifactId>
            <version>${project.version}</version>
        </dependency>
 
        <!-- Logging Dependencies -→
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
 
        <!--Test Dependencies -→
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <version>${mockito.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
 
<dependencies>
 
    <!-- CDI Welding Dependencies -→
    <dependency>
        <groupId>javax.enterprise</groupId>
        <artifactId>cdi-api</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>javax.inject</groupId>
        <artifactId>javax.inject</artifactId>
        <scope>provided</scope>
    </dependency>

    <!-- JavaEE API Dependencies -→
    <dependency>
        <groupId>org.jboss.spec.javax.jms</groupId>
        <artifactId>jboss-jms-api_1.1_spec</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.jboss.spec.javax.transaction</groupId>
        <artifactId>jboss-transaction-api_1.1_spec</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.jboss.spec.javax.ejb</groupId>
        <artifactId>jboss-ejb-api_3.1_spec</artifactId>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.jboss.spec.javax.ws.rs</groupId>
        <artifactId>jboss-jaxrs-api_1.1_spec</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>

Lastly, define the build section responsible for configuring the maven plugins used:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>${maven.compiler.plugin.version}</version>
            <configuration>
                <source>${maven.compiler.source}</source>
                <target>${maven.compiler.target}</target>
                <maxmem>256M</maxmem>
                <showDeprecation>true</showDeprecation>
            </configuration>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-resources-plugin</artifactId>
            <version>${maven.resources.plugin.version}</version>
        </plugin>
    </plugins>
    <pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>${maven.exec.plugin.version}</version>
            </plugin>
            <plugin>
                <groupId>org.kie</groupId>
                <artifactId>kie-maven-plugin</artifactId>
                <extensions>true</extensions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-war-plugin</artifactId>
                <version>${maven.war.plugin.version}</version>
                <configuration>
                    <failOnMissingWebXml>false</failOnMissingWebXml>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>${maven.surefire.plugin.version}</version>
                <configuration>
                    <argLine>-Djava.net.preferIPv4Stack=true</argLine>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>

                <version>${maven.dependency.plugin.version}</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <overWriteReleases>false</overWriteReleases>
                            <overWriteSnapshots>false</overWriteSnapshots>
                            <overWriteIfNewer>true</overWriteIfNewer>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>${maven.jar.plugin.version}</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>${maven.resources.plugin.version}</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </pluginManagement>
 
</build>

This concludes the defining and configuring of the parent project itself. At this time, stub out the 4 child modules which we’ll be using inside the IDE. Rather than going through each inherent pom file of the child modules, please refer to the accompanying attachment hacep-integration folder and child module subfolders to locate and copy the contents of each pom file respectively.

5.5.2. The integration-model Module

This module, responsible for housing the shared POJO’s and other Java objects necessary within both the rules and main integration modules, consists of a skeleton beans.xml file (necessary for the triggering of CDI across the application) and only three other classes. Both the integration-model and integration-app modules will need a copy of this beans.xml file placed in the src/main/resources/META-INF directory, using the content below:

<beans xmlns="http://java.sun.com/xml/ns/javaee"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://java.sun.com/xml/ns/javaee         java.sun.com/xml/ns/javaee/beans_1_0.xsd" >
</beans>

The other three classes within the integration-model module will fall into two packages:

  • com.redhat.refarch.hacep.model
  • com.redhat.refarch.hacep.model.channels

The model package will contain two classes, Purchase and PurchaseKey, which will serve as the Event POJO fed to the HACEP nodes via the Event Channel and the Key which will be used in classifying the grouping of the events into relevant rules sessions and node ownerships. Define Purchase object as an object extending the Fact interface from within the HACEP core code as follows:

import it.redhat.hacep.model.Fact;
import it.redhat.hacep.model.Key;

import java.time.Instant;
import java.util.Date;
import java.util.Objects;

public class Purchase implements Fact {

    private static final long serialVersionUID = 7517352753296362943L;

    protected long id;

    protected Long customerId;

    protected Date timestamp;

    protected Double amount;

    public Purchase(long id, Long customerId, Date timestamp, Double amount) {
        this.id = id;
        this.customerId = customerId;
        this.timestamp = timestamp;
        this.amount = amount;
    }

    @Override
    public Instant getInstant() {
        return timestamp.toInstant();
    }

    @Override
    public Key extractKey() {
        return new PurchaseKey(String.valueOf(id), String.valueOf(customerId));
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public Long getCustomerId() {
        return customerId;
    }

    public void setCustomerId(Long customerId) {
        this.customerId = customerId;
    }

    public Date getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Date timestamp) {
        this.timestamp = timestamp;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof Purchase)) return false;
        Purchase purchase = (Purchase) o;
        return id == purchase.id &&
                Objects.equals(customerId, purchase.customerId) &&
                Objects.equals(timestamp, purchase.timestamp) &&
                Objects.equals(amount, purchase.amount);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, customerId, timestamp, amount);
    }

    @Override
    public String toString() {
        return "Purchase{" +
                "id=" + id +
                ", customerId=" + customerId +
                ", timestamp=" + timestamp +
                ", amount=" + amount +
                '}';
    }
}

Now define the PurchaseKey as seen below:

import it.redhat.hacep.model.Key;

import java.util.Objects;

public class PurchaseKey extends Key<String> {

    private String id;

    public PurchaseKey(String id, String customer) {
        super(customer);
        this.id = id;
    }

    public String getId() {
        return id;
    }

    public String toString() {
        return id + "::" + getGroup();
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof PurchaseKey)) return false;
        if (!super.equals(o)) return false;
        PurchaseKey purchaseKey = (PurchaseKey) o;
        return Objects.equals(id, purchaseKey.id);
    }

    @Override
    public int hashCode() {
        return Objects.hash(super.hashCode(), id);
    }
}

These two POJO’s make up the only shared model needed for our simple integration. Lastly, we’ll create a class called SysoutChannel in the channels package mentioned prior, which will serve as a pipeline for output from within rules sessions, which can be altered to conform to various logging needs. For our purposes, information sent to the sysout channel will simply be dumped to the console.

import org.kie.api.runtime.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SysoutChannel implements Channel {

    public static final String CHANNEL_ID = "sysout";
    private static final Logger LOGGER = LoggerFactory.getLogger(SysoutChannel.class);

    @Override
    public void send(Object object) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("=============================>" + object);
        }
    }
}

These three classes plus the beans.xml and pom files which have been created above complete the integration-model module.

5.5.3. The integration-rules Module

This module, like the integration-model module, is fairly simplistic and straight-forward. As with any BRMS integration, we’ll define a kmodule.xml in the META-INF directory which defines the rules and sessions to be used. We’ll also create two DRL rules files as well as one Java class containing a custom implementation of the rules accumulate function. Start by defining the two rules files in the src/main/resources/rules directory as follows:

  • purchase-audit.drl: a simplistic base rule file which handles all the necessary function imports and rules event declarations.
package it.redhat.hacep.playground.rules.reward.catalog;

import com.redhat.refarch.hacep.model.Purchase;

import accumulate com.redhat.refarch.hacep.rules.functions.ConsecutiveDaysAccumulateFunction consecutivedays;

declare Purchase
    @role( event )
    @timestamp( timestamp.getTime() )
    @expires( 60d )
end

  • purchase-audit-rule1.drl: secondary rule file defining two rules, one which simply echoes whenever a Purchase fact enters the session, and the other which will accumulate multiple purchases made by a customer so that we can reflect how many purchases by each repeat customer have been made over a 30-day period and the sum total of customer’s purchases during the timeframe.
package it.redhat.hacep.playground.rules.reward.catalog;

import com.redhat.refarch.hacep.model.Purchase

rule "purchase seen"
when
    $purchase : Purchase ( )
then
    channels["sysout"].send("Purchase rec'd for customer " + $purchase.getCustomerId() + " for amt " + $purchase.getAmount());
end

rule "report multiple purchases by a customer over 30-day window"
when
    $purchase : Purchase ( $customerId : customerId ) over window:length(1)
    $purchaseCount : Number( intValue > 1 )
    				 from accumulate ($iter : Purchase ( $purchase.customerId == customerId ) over window:time( 30d ),
    				 count( $iter ) )
    $purchaseSum : Number( )
                     from accumulate ( Purchase ( $purchase.customerId == customerId, $purchaseAmount : amount ) over window:time( 30d ),
                     sum( $purchaseAmount ) )
then
	channels["sysout"].send("repeat customer " + $purchase.getCustomerId() + " has made " + $purchaseCount + " purchases over last 30 days for a sum of " + $purchaseSum);
end

Lastly, define the custom accumulate implementation in src/main/java under the com.redhat.refarch.hacep.rules.functions package as follows:

import org.kie.api.runtime.rule.AccumulateFunction;

import java.io.*;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class ConsecutiveDaysAccumulateFunction implements AccumulateFunction {

    @Override
    public void accumulate(Serializable context, Object value) {
        ConsecutiveDaysBuckets buckets = (ConsecutiveDaysBuckets) context;
        int days = getDays(value);
        if (buckets.buckets.get(days) == null) {
            buckets.buckets.put(days, new Integer(1));
        } else {
            buckets.buckets.put(days, buckets.buckets.get(days) + 1);
        }
    }

    @Override
    public Serializable createContext() {
        return new ConsecutiveDaysBuckets();
    }

    @Override
    public Object getResult(Serializable context) throws Exception {
        ConsecutiveDaysBuckets buckets = (ConsecutiveDaysBuckets) context;
        return buckets.buckets.size();
    }

    @Override
    public void init(Serializable context) throws Exception {
        ConsecutiveDaysBuckets buckets = (ConsecutiveDaysBuckets) context;
        buckets.buckets = new HashMap<>();
    }

    @Override
    public void reverse(Serializable context, Object value) throws Exception {
        ConsecutiveDaysBuckets buckets = (ConsecutiveDaysBuckets) context;
        int days = getDays(value);
        if (buckets.buckets.get(days) == null) {
            //ignore, shouldn't happen
        } else if (buckets.buckets.get(days) == 1) {
            buckets.buckets.remove(days);
        } else {
            buckets.buckets.put(days, buckets.buckets.get(days) - 1);
        }
    }

    @Override
    public boolean supportsReverse() {
        return true;
    }

    @Override
    public void writeExternal(ObjectOutput out) throws IOException {
        // TODO Auto-generated method stub

    }

    @Override
    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        // TODO Auto-generated method stub

    }

    @Override
    public Class<?> getResultType() {
        return Number.class;
    }

    public static class ConsecutiveDaysBuckets implements Externalizable {

        public Map<Integer, Integer> buckets;

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            out.writeInt(buckets.size());
            for (int key : buckets.keySet()) {
                out.writeInt(key);
                out.writeInt(buckets.get(key));
            }
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            buckets = new HashMap<>();
            int size = in.readInt();
            for (int i = 0; i < size; i++) {
                buckets.put(in.readInt(), in.readInt());
            }
        }
    }

    private int getDays(Object value) {
        BigDecimal data = new BigDecimal(((Number) value).longValue());
        int days = data.divide(new BigDecimal(TimeUnit.DAYS.toMillis(1)), 2, RoundingMode.HALF_UP).intValue();
        return days;
    }
}

These 4 files described above make up the entirety of the integration-rules module. Feel free to cross-reference the source code within the accompanying attachment for both structure and content accuracy as required.

5.5.4. The purchase-publisher Module

Next, define the classes within the helper module which will assist in create and submitting events to the HACEP-integrated application to simulate input from real use case scenarios. In a production system, such a tool wouldn’t be used outside of perhaps testing purposes, but it fulfills the need for event creation that will allow observation and adminstrating of the HACEP framework in real-time. Add the following two classes to the purchase-publisher module under src/main/java in the com.redhat.refarch.hacep.publisher package:

  • PurchaseProducer: class responsible for generating customer purchase events for submittal to the Event Channel broker.
import com.redhat.refarch.hacep.model.Purchase;

import javax.jms.*;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Date;
import java.util.concurrent.ThreadLocalRandom;

public class PurchaseProducer {

    private final Connection connection;
    private final Session session;

    private String queueName;
    private long customerId;
    ConnectionFactory connectionFactory;

    private static final Double PURCHASE_MIN = 10.0;
    private static final Double PURCHASE_MAX = 500.0;


    public PurchaseProducer(ConnectionFactory connectionFactory, String queueName, long customerId) {

        this.customerId = customerId;
        this.connectionFactory = connectionFactory;
        this.queueName = queueName;
        try {
            connection = connectionFactory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    public void produce(Integer id, Long timestamp) {

        try {

            Queue destination = session.createQueue(queueName);
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            Purchase fact = new Purchase(id, customerId, new Date(timestamp),
                    round(ThreadLocalRandom.current().nextDouble(PURCHASE_MIN, PURCHASE_MAX)));

            System.out.println("sending purchase for customer " + customerId + " for amount of " + fact.getAmount());

            ObjectMessage message = session.createObjectMessage(fact);
            message.setStringProperty("JMSXGroupID", String.format("P%05d", customerId));
            message.setIntProperty("JMSXGroupSeq", id);

            producer.send(message);

        } catch (Exception e) {
            System.out.println("Caught: " + e);
        }
    }

    public static double round(double value) {
        BigDecimal bd = new BigDecimal(value);
        bd = bd.setScale(2, RoundingMode.HALF_UP);
        return bd.doubleValue();
    }
}
  • Runner: executable class responsible for establishing a connection to the Event Channel broker and orchestrating parallel thread submittals of Purchase content via PurchaseProducer. Note that there are some configurable options within the runner that should be modified to match your needs and environment.
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class Runner {

    private ScheduledExecutorService scheduler = null;

    private final String BROKER_URL = "failover:(tcp://amq1:61616,tcp://amq2:61616,tcp://amq3:61616)";
    private final String QUEUE_NAME = "facts";
    private final String BROKER_USERNAME = "admin";
    private final String BROKER_PASSWORD = "admin";

    private final Integer CONCURRENT_PURCHASERS = 5;
    private final Integer DURATION = 15;
    private final Integer EVENT_DELAY = 15;
    private final Integer EVENT_INTERVAL = 5;
    private final Integer MAX_ACTIVE_SESSIONS = 250;
    private final Integer POOL_SIZE = 8;


    public Runner() {
        scheduler = Executors.newScheduledThreadPool(CONCURRENT_PURCHASERS);
    }

    public static void main(String[] args) throws Exception {
        new Runner().produce();
    }

    private void produce() throws InterruptedException {

        ActiveMQConnectionFactory amqConnectionFactory;

        amqConnectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        amqConnectionFactory.setUserName(BROKER_USERNAME);
        amqConnectionFactory.setPassword(BROKER_PASSWORD);

        PooledConnectionFactory connectionFactory = new PooledConnectionFactory(amqConnectionFactory);
        connectionFactory.setMaxConnections(POOL_SIZE);
        connectionFactory.setMaximumActiveSessionPerConnection(MAX_ACTIVE_SESSIONS);

        for (int i = 0; i < CONCURRENT_PURCHASERS; i++) {

            int delay = (int) (Math.random() * EVENT_DELAY);

            final ScheduledFuture<?> playerHandle = scheduler
                    .scheduleAtFixedRate(
                            new PurchaseRunner(
                                    new PurchaseProducer(connectionFactory, QUEUE_NAME, i)),
                                        delay,
                                        EVENT_INTERVAL,
                                        TimeUnit.SECONDS);

            scheduler.schedule(() -> playerHandle.cancel(true), DURATION, TimeUnit.MINUTES);
        }
    }

    private class PurchaseRunner implements Runnable {

        private final PurchaseProducer purchaseProducer;
        private int id = 10000;

        public PurchaseRunner(PurchaseProducer purchaseProducer) {
            this.purchaseProducer = purchaseProducer;
        }

        @Override
        public void run() {
            purchaseProducer.produce(id++, System.currentTimeMillis());
        }
    }
}

5.5.5. The integration-app Module

Lastly, define the various components that make up the main module of the integration. This module will house several HACEP-inherent configurations, the HACEP integration itself, as well as a RESTful interface for monitoring of the application in action.

5.5.5.1. Resources

This main module will contain a a log4j.properties file underneath the src/main/resources directory and a jboss-deployment-structure.xml file underneath the src/main/resources/WEB-INF directory. These two files are fairly self explanatory, so reference the accompanying attachment source code in order to copy the content needed for them. A copy of the beans.xml file should already be present within META-INF from previous steps.

5.5.5.2. HACEP Configuration Files

The HACEP core code defines two interfaces which must be implemented in order to configure the framework. Within src/main/java add a com.redhat.refarch.hacep.configuration package with the following two classes:

  • DroolsConfigurationImpl: responsible for the definition of rule package and session names, as well as channels utilized within the rules.
import com.redhat.refarch.hacep.model.channels.SysoutChannel;
import it.redhat.hacep.configuration.DroolsConfiguration;
import it.redhat.hacep.drools.channels.NullChannel;
import org.kie.api.KieBase;
import org.kie.api.KieServices;
import org.kie.api.runtime.Channel;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.enterprise.context.ApplicationScoped;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@ApplicationScoped
public class DroolsConfigurationImpl implements DroolsConfiguration {

    private static final Logger logger = LoggerFactory.getLogger(DroolsConfigurationImpl.class);

    private final Map<String, Channel> channels = new ConcurrentHashMap<>();
    private final Map<String, Channel> replayChannels = new ConcurrentHashMap<>();
    private final KieContainer kieContainer;
    private final KieBase kieBase;
    private static final String KSESSION_RULES = "hacep-sessions";
    private static final String KBASE_NAME = "hacep-rules";

    public DroolsConfigurationImpl() {
        KieServices kieServices = KieServices.Factory.get();
        kieContainer = kieServices.getKieClasspathContainer();
        kieBase = kieContainer.getKieBase(KBASE_NAME);
        channels.put(SysoutChannel.CHANNEL_ID, new SysoutChannel());
        replayChannels.put(SysoutChannel.CHANNEL_ID, new NullChannel());
        if (logger.isInfoEnabled()) {
            logger.info("[Kie Container] successfully initialized.");
        }
    }

    @Override
    public KieSession getKieSession() {
        return kieContainer.newKieSession(KSESSION_RULES);
    }

    @Override
    public KieBase getKieBase() {
        return kieBase;
    }

    @Override
    public Map<String, Channel> getChannels() {
        return channels;
    }

    @Override
    public Map<String, Channel> getReplayChannels() {
        return replayChannels;
    }

    @Override
    public int getMaxBufferSize() {
        try {
            return Integer.valueOf(System.getProperty("grid.buffer", "1000"));
        } catch (IllegalArgumentException e) {
            return 1000;
        }
    }
}

  • JmsConfigurationImpl: responsible for referencing the Event Channel broker’s JNDI connection and configuring the number of concurrent consumers to be used per HACEP node for taking input from the broker
import it.redhat.hacep.configuration.JmsConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.jms.ConnectionFactory;

@ApplicationScoped
public class JmsConfigurationImpl implements JmsConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsConfigurationImpl.class);

    @Resource(lookup = "java:/HacepConnectionFactory")
    private ConnectionFactory connectionFactory;

    @Override
    public ConnectionFactory getConnectionFactory() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Provide connection factory [%s]", connectionFactory));
        }
        return connectionFactory;
    }

    @Override
    public String getQueueName() {
        try {
            return System.getProperty("queue.name", "facts");
        } catch (IllegalArgumentException e) {
            return "facts";
        }
    }

    @Override
    public int getMaxConsumers() {
        try {
            return Integer.valueOf(System.getProperty("queue.consumers", "5"));
        } catch (IllegalArgumentException e) {
            return 5;
        }
    }
}

5.5.5.3. HACEP Instantiation & RESTful Interface

Within the com.redhat.refarch.hacep package, add the following two classes:

  • Application: solely responsible for referencing and, thereby, instantiating the HACEP framework core code
import it.redhat.hacep.configuration.HACEPApplication;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.inject.Inject;

@Startup
@Singleton
public class Application {

    @Inject
    private HACEPApplication hacepApplication;

    @PostConstruct
    public void start() {
        hacepApplication.start();
    }

    @PreDestroy
    public void stop() {
        hacepApplication.stop();
    }
}
  • ApplicationRestService: defines a simplistic RESTful interface which would normally be a point of further expansion in production applications for interacting with the HACEP application. In this instance, two commands are exposed, the info command, which will provide basic information about the state of the node, the cluster, and the caches within the system, and help which provides a list of accepted commands via the [IP_ADDR]:8080/execute/[command] URL format.
import com.redhat.refarch.hacep.rest.RestInterface;
import com.redhat.refarch.hacep.rest.commands.InterfaceRequest;

import javax.inject.Inject;
import javax.ws.rs.*;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Response;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;

@ApplicationPath("/")
@Path("/")
public class ApplicationRestService extends Application {

    @Inject
    private RestInterface restInterface;

    @GET
    @Path("/execute/{command}")
    @Produces("application/json")
    public Response executeCommand(@PathParam("command") String commandName, @QueryParam("params") String params) {
        try {
            Optional<InterfaceRequest> command = restInterface.findByName(commandName);
            if (command.isPresent()) {
                if (params != null) {
                    command.get().execute(restInterface, Arrays.asList(params.split(",")).iterator());
                } else {
                    command.get().execute(restInterface, Collections.emptyIterator());
                }
            } else {
                restInterface.printUsage();
            }
            return Response.ok(restInterface.getContent()).build();
        } finally {
            restInterface.clear();
        }
    }

    @GET
    @Path("/help")
    @Produces("application/json")
    public Response help() {
        restInterface.printUsage();
        return Response.ok(restInterface.toString()).build();
    }

    @GET
    @Path("/help/{command}")
    @Produces("application/json")
    public Response helpOnCommand(@PathParam("command") String commandName) {
        Optional<InterfaceRequest> command = restInterface.findByName(commandName);
        if (command.isPresent()) {
            command.get().usage(restInterface);
            return Response.ok(restInterface.toString()).build();
        }
        return help();
    }
}

5.5.5.4. RESTful Interface Request Definitions

Within the integration-app module, the RESTful interface elicits a flexible & extendable set of commands for fetching and displaying information about the state of the application and its resources. These request commands follow a common interface so that they can be injected as one list and easily iterated from within the ApplicationRestService RESTful endpoints. The interface is as follows:

import com.redhat.refarch.hacep.rest.RestInterface;

import java.util.Iterator;

public interface InterfaceRequest {

    String command();

    boolean execute(RestInterface console, Iterator<String> args) throws IllegalArgumentException;

    void usage(RestInterface console);
}

An example of one such interface implementation is the _ InfoInterfaceRequest_:

import com.redhat.refarch.hacep.dto.NodeType;
import com.redhat.refarch.hacep.dto.SessionDataObjectInformation;
import com.redhat.refarch.hacep.rest.RestInterface;
import com.redhat.refarch.hacep.util.JDGUtility;
import it.redhat.hacep.configuration.HACEPApplication;
import it.redhat.hacep.configuration.annotations.HACEPSessionCache;
import it.redhat.hacep.model.Key;
import org.infinispan.Cache;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.remoting.transport.Address;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.*;

public class InfoInterfaceRequest implements InterfaceRequest {

    private static final String COMMAND_NAME = "info";

    private final HACEPApplication application;

    @Inject
    private JDGUtility jdgUtility;

    @Inject
    @HACEPSessionCache
    private Cache<Key, Object> sessionCache;

    @Inject
    public InfoInterfaceRequest(HACEPApplication application) {
        this.application = application;
    }

    @Override
    public String command() {
        return COMMAND_NAME;
    }

    @Override
    public boolean execute(RestInterface console, Iterator<String> args) throws IllegalArgumentException {
        try {
            DefaultCacheManager cacheManager = application.getCacheManager();
            console.println(generalInfo());
            for (String cacheName : cacheManager.getCacheNames()) {
                console.println("\n");
                console.println(buildInfo(cacheManager.getCache(cacheName)));
            }

            console.println("\n\nSession cache objects:");
            Map<Address, List<SessionDataObjectInformation>> sessions = new HashMap<>();
            for (Map.Entry<Key, List<Address>> entry : jdgUtility.getKeysAddresses(sessionCache).entrySet()) {
                List<Address> addresses = entry.getValue() != null ? entry.getValue() : Collections.emptyList();
                for (int i = 0; i < addresses.size(); i++) {
                    boolean isPrimary = (i == 0);
                    sessions.compute(addresses.get(i), (a, l) -> {
                        if (l == null) {
                            l = new ArrayList<>();
                        }
                        SessionDataObjectInformation object = new SessionDataObjectInformation(entry.getKey().toString(), isPrimary ? NodeType.PRIMARY : NodeType.REPLICA);
                        l.add(object);
                        return l;
                    });
                }
            }

            console.print(sessions);

        } catch (NoSuchElementException e) {
            console.println(generalInfo());
        }
        return true;

    }

    private String generalInfo() {
        DefaultCacheManager cacheManager = application.getCacheManager();
        StringBuilder info = new StringBuilder();
        info.append("Cache Manager Status: ").append(cacheManager.getStatus()).append("\n");
        info.append("Cache Manager Address: ").append(cacheManager.getAddress()).append("\n");
        info.append("Coordinator address: ").append(cacheManager.getCoordinator()).append("\n");
        info.append("Is Coordinator: ").append(cacheManager.isCoordinator()).append("\n");
        info.append("Cluster Name: ").append(cacheManager.getClusterName()).append("\n");
        info.append("Cluster Size: ").append(cacheManager.getClusterSize()).append("\n");
        info.append("Member list: ").append(cacheManager.getMembers()).append("\n");
        info.append("Caches: ").append(cacheManager.getCacheNames()).append("\n");
        return info.toString();
    }

    private String buildInfo(Cache<Key, Object> cache) {

        StringBuilder info = new StringBuilder();

        info.append("Cache: ").append(cache).append("\n");
        info.append("Cache Mode: ").append(cache.getCacheConfiguration().clustering().cacheModeString()).append("\n");
        info.append("Cache Size: ").append(cache.size()).append("\n");
        info.append("Cache Status: ").append(cache.getStatus()).append("\n");
        info.append("Number of Owners: ").append(cache.getAdvancedCache().getDistributionManager().getConsistentHash().getNumOwners()).append("\n");
        info.append("Number of Segments: ").append(cache.getAdvancedCache().getDistributionManager().getConsistentHash().getNumSegments()).append("\n");

        return info.toString();
    }

    @Override
    public void usage(RestInterface console) {
        console.println(COMMAND_NAME);
        console.println("\t\tGeneral information on caches.");
    }
}

For further examples of potential commands to add to the RESTful interface, reference and copy the HelpInterfaceRequest class from the hacep-integration source code within the accompanying attachment and also review the it.redhat.hacep.playground.console.commands package within the hacep-eap-playground module of the HACEP source code.

5.5.5.5. Utility Classes for Diagnostics of HACEP Caches and Sessions

The com.redhat.refarch.hacep.dto package will contain a simple Enum and object that will assist in carrying information about the HACEP caches and sessions forward when the info command is called upon. Their content is as follows:

  • NodeType
public enum NodeType {
    PRIMARY, REPLICA;
}

  • SessionDataObjectInformation
public class SessionDataObjectInformation {

    private String name;
    private NodeType nodeType;

    public SessionDataObjectInformation() {
    }

    public SessionDataObjectInformation(String name, NodeType nodeType) {
        this.name = name;
        this.nodeType = nodeType;
    }

    public String getName() {
        return name;
    }

    public NodeType getNodeType() {
        return nodeType;
    }
}

5.6. Packaging & Deploying the Integration Application

Once code-complete, packaging and deployment of the application follows. Start by navigating to the application’s root hacep-integration directory and issuing the mvn clean package command to kick off building and compilation of the war & jar files of the child modules where applicable.

Once building is complete and successful, the next step is deploying the integration-app.war file by copying it from the integration-app/target directory to the standalone/deployments directory of each HACEP EAP node you wish to run the application on. Upon deployment, server logs should reflect various steps of CDI wiring, connecting to the A-MQ Event Channel, Infinispan JGroups cluster coordination, and initialization of the BRMS rules sessions.

5.7. Executing the Purchase Publisher for Real-Time Observation

Once the publisher Runner class has been customized, maven packaging has been completed, and the integration application has been deployed to the various HACEP nodes, we can now use the purchase-publisher module to generate and feed events to the Event Channel. Within the project source folder, navigate to the hacep-integration/purchase-publisher/target directory, which should now contain an executable jar file, and issue the following command to commence event generation:

java -Dorg.apache.activemq.SERIALIZABLE_PACKAGES="*" -cp purchase-publisher-1.0-SNAPSHOT.jar com.redhat.refarch.hacep.publisher.Runner

5.8. Summation

The modules making up the hacep-integration project described above make up the entirety of the simple HACEP integration application, discounting the purchase-publisher module, which as previously mentioned serves a purpose for proof-of-concept, but doesn’t necessarily pertain to production functionality. The model and rules modules both serve as utility modules supplying the rules definitions and object models required, while the integration-app module acts as the HACEP integration and configuration point and exposes some simple monitoring functionality via a RESTful interface, demonstrating how interaction and monitoring of such an application could be achieved in HACEP-integrated systems.