7.5. Event Streams

Complex event processing use cases deal with streams of events. The streams can be provided to the application via JMS queues, flat text files, database tables, raw sockets, or even web service calls.
Streams share a common set of characteristics:
  • Events in the stream are ordered by timestamp. The timestamps may have different semantics for different streams, but they are always ordered internally.
  • There is usually a high volume of events in the stream.
  • Atomic events contained in the streams are rarely useful by themselves.
  • Streams are either homogeneous (they contain a single type of event) or heterogeneous (they contain events of different types).
A stream is also known as an entry point.
Facts from one entry point, or stream, may join with facts from any other entry point in addition to facts already in working memory. Facts always remain associated with the entry point through which they entered the engine. Facts of the same type may enter the engine through several entry points, but facts that enter the engine through entry point A will never match a pattern from entry point B.

7.5.1. Declaring and Using Entry Points

Entry points are declared implicitly by making direct use of them in rules. Referencing an entry point in a rule will make the engine, at compile time, identify and create the proper internal structures to support that entry point.
For example, a banking application that has transactions fed into the engine via streams could have one stream for all of the transactions executed at ATMs. A rule for this scenario could state, "A withdrawal is only allowed if the account balance is greater than the withdrawal amount the customer has requested."

Example 7.8. Example ATM Rule

rule "authorize withdraw"
   when
       WithdrawRequest( $ai : accountId, $am : amount ) from entry-point "ATM Stream"
       CheckingAccount( accountId == $ai, balance > $am )
   then
       // authorize withdraw
   end
When the engine compiles this rule, it will identify that the pattern is tied to the entry point "ATM Stream." The engine will create all the necessary structures for the rule-base to support the "ATM Stream", and this rule will only match WithdrawRequest events coming from the "ATM Stream."
Note the ATM example rule joins the event (WithdrawalRequest) from the stream with a fact from the main working memory (CheckingAccount).
The banking application may have a second rule that states, "A fee of $2 must be applied to a withdraw request made via a branch teller."

Example 7.9. Using Multiple Streams

rule "apply fee on withdraws on branches"
   when
       WithdrawRequest( $ai : accountId, processed == true ) from entry-point "Branch Stream"
       CheckingAccount( accountId == $ai )
   then
       // apply a $2 fee on the account
   end
This rule matches events of the same type (WithdrawRequest) as the example ATM rule but from a different stream. Events inserted into the "ATM Stream" will never match the pattern on the second rule, which is tied to the "Branch Stream;" accordingly, events inserted into the "Branch Stream" will never match the pattern on the example ATM rule, which is tied to the "ATM Stream".
Declaring the stream in a rule states that the rule is only interested in events coming from that stream.
Events can be inserted manually into an entry point instead of directly into the working memory.

Example 7.10. Inserting Facts into an Entry Point

import org.kie.api.runtime.KieSession;
		
   // create your rulebase and your session as usual
   KieSession session = ...

   // get a reference to the entry point
   WorkingMemoryEntryPoint atmStream = session.getWorkingMemoryEntryPoint( "ATM Stream" );

   // and start inserting your facts into the entry point
   atmStream.insert( aWithdrawRequest );

7.5.2. Negative Pattern in Stream Mode

A negative pattern is concerned with conditions that are not met. Negative patterns make reasoning in the absence of events possible. For instance, a safety system could have a rule that states, "If a fire is detected and the sprinkler is not activated, sound the alarm."
In Cloud mode, the engine assumes all facts (regular facts and events) are known in advance and evaluates negative patterns immediately.

Example 7.11. A Rule with a Negative Pattern

rule "Sound the alarm"
when
    $f : FireDetected( )
    not( SprinklerActivated( ) )
then
    // sound the alarm
end
An example in stream mode is displayed below. This rule keeps consistency when dealing with negative patterns and temporal constraints at the same time interval.

Example 7.12. A Rule with a Negative Pattern, Temporal Constraints, and an Explicit Duration Parameter.

rule "Sound the alarm"
    duration( 10s )
when
    $f : FireDetected( )
    not( SprinklerActivated( this after[0s,10s] $f ) )
then
    // sound the alarm
end
In stream mode, negative patterns with temporal constraints may force the engine to wait for a set time before activating a rule. A rule may be written for an alarm system that states, "If a fire is detected and the sprinkler is not activated after 10 seconds, sound the alarm." Unlike the previous stream mode example, this one does not require the user to calculate and write the duration parameter.

Example 7.13. A Rule with a Negative Pattern with Temporal Constraints

rule "Sound the alarm"
when
    $f : FireDetected( )
    not( SprinklerActivated( this after[0s,10s] $f ) )
then
    // sound the alarm
end
The rule depicted below expects one "Heartbeat" event to occur every 10 seconds; if not, the rule fires. What is special about this rule is that it uses the same type of object in the first pattern and in the negative pattern. The negative pattern has the temporal constraint to wait between 0 to 10 seconds before firing, and it excludes the Heartbeat bound to $h. Excluding the bound Heartbeat is important since the temporal constraint [0s, ...] does not exclude by itself the bound event $h from being matched again, thus preventing the rule to fire.

Example 7.14. Excluding Bound Events in Negative Patterns

rule "Sound the alarm"
when
    $h: Heartbeat( ) from entry-point "MonitoringStream"
    not( Heartbeat( this != $h, this after[0s,10s] $h ) from entry-point "MonitoringStream" )
then
    // Sound the alarm
end