Chapter 2. Highly-Available & Scalable Complex Event Processing with Red Hat JBoss BRMS
HACEP (pronounced hä-sep) is a scalable and highly-available architecture framework for enterprise-level usage of the complex event processing abilities in
Red Hat JBoss BRMS (Business Rules Management System). HACEP combines
Red Hat JBoss Data Grid (JDG),
Apache Camel capabilities as delivered in
Red Hat JBoss BRMS, and consumption of events from a Java Messaging Service (JMS) such as
Red Hat JBoss A-MQ in a unique way to obtain horizontal scalability and failover.
HACEP is a generic solution for imposing a partitioning criteria, while no further constraints or limitations on capabilities are introduced. The framework is designed on a few fundamental patterns:
- sharding: horizontal partitioning, or splitting of data and workload into separated nodes
- data affinity: colocation of data and calculations related to said data together on a single owner node
- event sourcing: capturing every change of state within the application as an event object and storing them in the same sequence as they are received and applied for the lifespan duration of the application itself, thus allowing such events to be queried or utilized to rebuild current or past states
Out of the box, BRMS CEP functionality doesn’t account for high-availability or horizontal scaling. Introducing the HACEP framework to an enterprise architectural stack fulfills these functional needs given that it can easily scale from 2 to 100s of nodes and back again, even dynamically at runtime. The clusters themselves are inherently highly-available when provided with the minimal 2 nodes per cluster. Outside of scaling and fault tolerance, some of the other features gained from usage of the framework include:
- survival in cases of multiple node failures
- in-memory read/write performance for extreme throughput
- dynamic CEP rules recompilation
- multiple disk storage options
- minimal system footprint
- rolling upgrade support
plain JVM and
Red Hat JBoss Enterprise Application Platform (EAP)support
2.1.2. Statistical Use Case
As mentioned before, CEP can be summarized as the process of detecting and selecting interesting events from within an event cloud, finding their relationships, and inferring new data based on the findings. In more practical terms, consider a system with users who interact with an application several times a day for a given consecutive number of days. As an example, listed below are a few possible statistical statements of interest involving a hypothetical gaming application:
- User performs an action T times
- User performs an action T times for D consecutive days
- User places X actions/items in D days
- User wins/loses more than N times
- User wins/loses a cumulative S amount
Each and every event fed to the system is possibly a point of interest, so all events must be captured, considered, and stored away for fault tolerance. Network traffic considerations aside, a singular system utilizing
JBoss BRMS sessions to handle such a workload would quickly necessitate scaling of session memory to an unreasonable point. When thinking in terms of numbers, such a system could potentially have a very large workload to deal with in an isolated fashion. In order to better visualize the need to compensate for the significant amount of events and facts involved, consider a probable workload such as the one given below:
- 10 million events per day
- 1 million registered users
- approximate 30-day sliding time window
- 8 thousand concurrent users per day
- 90 thousand unique users over a 30-day period
- approximately 200 bytes per event
Given that JBoss BRMS CEP functionality doesn’t provide for scalability out of the box, at approximately 200 bytes per event, 10 million events per day over a period of 30 days, just the size of raw event facts alone reaches 60GB, which doesn’t account for a significant amount of information other than event facts that a BRMS session would be responsible for. At a minimum, such a system would call for 128GB of heap space in order to store everything required within a single rules session.
Introducing HACEP’s capabilities for scaling and partitioning via data affinity dramatically decreases the memory requirements while also introducing asynchronous session processing. A 4-node HACEP cluster with 8GB heap per node can easily deal with the 10-thousand concurrent users and time window of 30 days. If around 10% of concurrent users were generating an event every 3 seconds, the rules sessions within the HACEP nodes would be approximately 5MB each, which, alongside the heap allotment, is a drastic improvement on memory allocation alone. Paired with gains in performance and fault tolerance, the benefits of HACEP become self-evident.
2.2. Architectural Design
At a high level, an enterprise system designed for usage of HACEP generally consists of two major components, the Event Channel, responsible for feeding the HACEP cluster with inbound event objects, and the HACEP cluster itself, which is responsible for processing said events.
Figure 2.1. High-Level HACEP Architecture
2.2.1. Event Channel
The Events producer is external to the HACEP framework, however, the framework does set forward a few assumptions and recommendations about the source system. The event source must be JMS-compliant and include JMSXGroupID metadata with its published events. With data affinity and scaling in mind, it’s highly recommended that a JMS Server like
JBoss A-MQ be used with message grouping enabled. This grouping enables multiple consumers on the same queue to process, in FIFO order, messages tagged with the aforementioned JMSXGroupID. It also facilitates concurrency as multiple consumers can parallel process different message groups, each identified by the unique group ID attribute.
In use cases where the business doesn’t particularly necessitate relevance of events ordering, JMS grouping could be seen as a non-viable or overly-complex option. In these cases, HACEP offers a reordering component that will internally reorder events on its nodes based on a configurable field on the events. However, it should be noted that utilizing this mechanism over JMS grouping will likely introduce some latencies due to buffering and gaps between events that must be ordered, thereby impacting overall performance.
In future versions, HACEP is slated to enable an A-MQ-backed event source to use the same grouping algorithm as JBoss Data Grid’s grouping so that inbound messages are consumed directly on the group’s owner node, thus furthering the framework’s data affinity.
Figure 2.2. Event Channel
2.2.2. HACEP Node Structure
Each node within a HACEP cluster is identical. Each consists of a camel route, a portion of data relevant to its owned groups via two JDG caches, and JBoss BRMS code.
Figure 2.3. HACEP Node Cluster
2.2.3. Event Consumption
A HACEP cluster consists of multiple nodes, each responsible for listening to and consuming from the Event Channel event source across multiple concurrent consumers. After receiving an event, the consuming node will place the event into a JDG Fact (soon to be Event in future releases) cache. HACEP’s JDG instances are configured to use its distributed topology, meaning that entries are distributed to a subset of the nodes with one of those nodes acting as the owner of the event’s related group. In HACEP, this is accomplished by leveraging JDG’s grouping API.
JBoss Data Grid’s grouping API allows a group of entries to be collocated on the same node, instead of the default behavior of having each entry being stored on a node corresponding to a calculated hash code of the entry. By default JBoss Data Grid will take a hash code of each key when it is stored and map that key to a hash segment; this allows an algorithm to be used to determine the node that contains the key, allowing each node in the cluster to know the location of the key without distributing ownership information. This behavior reduces overhead and improves redundancy as the ownership information does not need to be replicated, should a node fail.
By enabling the grouping API, the hash of the key is ignored when deciding which node to store the entry on. Instead, a hash of the group is obtained and used in its place, while the hash of the key is used internally to prevent performance degradation. When the group API is in use, every node can still determine the owners of the key, which means that the group may not be manually specified. A group may either be intrinsic to the entry (generated by the key class), or extrinsic to the entry (generated by an external function).
More information can be found in the official Red Hat JBoss Data Grid Administration & Configuration Guide
Once a received event has been placed in the event cache, it will expire after a few milliseconds of idle time. Due to the underlying distributed topology of the grid and cache, the group owner node quickly consumes the event. There’s no need to store such event facts long-term, as they’re solely put into the grid to fire a synchronous notification on the node with primary ownership of the related group and later separately maintained in-cache as part of the session’s respective event buffer or session snapshot via a specialized session wrapper, the HaKieSession object.
2.2.4. Session Instantiation, Resumption, and State Replication
Once an event has been received by the related group’s owning node, said group owner will then retrieve the related BRMS session wrapper from the JDG Session cache, or if one is not found, create one for the related group. Once established, the received event is added to the wrapper object’s event buffer and then injected into the BRMS session itself. Following, the session’s pseudoclock is adjusted accordingly, the rules are fired, then the modified session is saved back into the JDG Session cache, replicating itself minimally to other non-primary nodes responsible for the group’s information, utilizing JDG’s
DeltaAware to minimize the amount of network traffic required to do so.
DeltaAware and Delta are interfaces utilized alongside JDG to allow for fine-grained replication. Rather than forcing the application to transfer a full copy of an entire object to each redundant non-primary data owner, the interfaces allow the system to identify only those parts that have been altered, send forward those changes, and allow the recipients to apply the same changes on top of the currently owned copy rather than replacing it completely. In terms of HACEP, rather than replicating the entire event buffer which exists alongside the BRMS session inside the HAKieSession wrapper object, DeltaAware can identify changes made and allow for solely the information regarding the new event information to be forwarded on to secondary nodes.
2.2.5. Fault Tolerance
In the event that nodes are removed or added to a HACEP cluster, the camel routes atop each node are automatically stopped. A rehashing then occurs across the cluster to rebalance session ownership and redistribute them where appropriate, according to their Consistent Hashing. This method of hashing also offers performance gains over traditional methods. With Consistent Hashing, the number of keys that need to be remapped on average is the number of keys / number of slots, as opposed to a nearly full remapping of all keys, when keys and slots are defined by a modular operation, something that is typical with more traditional implementations.
Should a failover happen to occur within the small time window in which a primary group node is mid-process firing rules, the consequential result would not necessarily reflect a valid result. In cases such as these, non-primary nodes utilize an idempotent channel to replay buffered events onto the last-known session snapshot. In cases where the event has been previously seen, the impact can be safely discarded, as it’s already been captured. Eventually, the session will receive the event and resulting actions that were mid-process at the time of failure, as well as any other events that follow in the buffer. After completion, the local session has been brought up-to-date and can now be utilized as the new primary group record.
2.2.6. Session Snapshotting
Event buffers within the HAKieSession have a configurable limit for the number of events that can be held. Thanks to DeltaAware, this number can safely be in the thousands without severely impacting performance or network traffic, but ultimately there is a cap. While primary group owner nodes apply each event to the BRMS session on-hand when received, secondary group nodes follow a different method to preserve event sourcing. Since each event is replicated to secondary owner nodes via DeltaAware, at any given point, a secondary group node can take the BRMS session on-hand, apply its events from the buffer in-order and have an up-to-date version of the group’s session ready for use, which matches that of the primary owner node (or what the primary node would contain in failover situations). Given that buffer queues have a storage limit, yet identical session capabilities must be maintained, Session Snapshots are utilized. When a buffer reaches the upper limit, an asynchronous thread process is initiated, which takes the current BRMS session found in local memory, applies the full queue of events within the buffer, then places the session back into storage. This process effectively records the state of the session at the end of the current queue playback. Afterwards, the now emptied buffer can continue to capture new replicated events.
Future versions of HACEP are slated to allow a more scripted approach to configuration of snapshot usage. Whereas currently the asynchronous snapshot creation occurs at event buffer capacity, down the road configuration will allow specification of when the process should take place, be it via cron, when rules are triggered, etc. While the performance impact of the snapshot process are very minimal, such configuration will allow for even more optimized offload onto typical slow times in the system or key points throughout the business process.