Chapter 4. Sensor Application

4.1. Overview

A global transportation company can have thousands of IoT sensors installed in various places like gates, inside containers, ships, and planes. These sensors continuously feed a large amount of data to the data center. Processing the large amount of data provided by this feed in a reasonable amount of time is the biggest challenge of the project. Apache Spark’s lighting fast processing speed combines with the high performance, in-memory, distributed, NoSQL datastore solution provided by JBoss Data Grid 7 to fulfill this requirement. The second challenge is related to the dynamic model of a global logistics business, where certain periods, like the time before major holidays, might need more processing power than others. This requires processing capability to be added on-the-fly. Again, Apache Spark’s large-scale data processing with the elastic scalability of JDG 7 meet this requirement.

This reference architecture includes and deploys a sample IoT sensor application, which makes use of the Spark connector, providing tight integration with Apache Spark, and allowing applications written in Scala to utilize JBoss Data Grid as a backing data store. This connector includes support for the following:

Spark connector includes support for the following:

  • Create an RDD from any cache
  • Write a key/value RDD to a cache
  • Create a DStream from cache-level events
  • Write a key/value DStream to a cache

4.2. Packages Overview

This application contains three main packages, and a common data package (ShipmentData) shared by all three packages.

The first package is Sensor with its main class being TestSensor.java, which simulates IoT sensors for a global transportation company. These sensors might be installed anywhere, like on docks, ships, gates, containers, trucks, warehouses, etc. These sensors send temperature and humidity data they collect to the JDG cluster’s sensor-data cache.

The second package is Analyzer with the main class Analyzer.scala, which runs on the Apache Spark cluster, reads the sensor-data cache through the JDG Spark connector, and calculates the average temperature and humidity in the past 5 minutes, putting the data in the JDG sensor-avg-data cache.

The third package is Client with the main class Client.java, which simulates system clients; the client might be a transportation company or owners of the actual shipment, who are interested in the condition of the shipment. They will monitor the sensor-avg-data cache and send alerts if there might be a problem.

Below is the data flow diagram.

Figure 4.1. Data Flow Diagram

Data Flow Diagram

Last is the ShipmentData.java class, which is a simple data object:

@SuppressWarnings("serial")
public class ShipmentData implements Serializable {
	private double temperature;
	private double humidity;
	private String skuNumbers;
	private long sensorTime;


	public void setSensorTime(long sensorTime) {
		this.sensorTime = sensorTime;
	}

	public double getTemperature() {
		return temperature;
	}
	public void setTemperature(double temperature) {
		this.temperature = temperature;
	}
	public double getHumidity() {
		return humidity;
	}
	public void setHumidity(double humidity) {
		this.humidity = humidity;
	}
	public String getSkuNumbers() {
		return skuNumbers;
	}
	public void setSkuNumbers(String skuNumbers) {
		this.skuNumbers = skuNumbers;
	}
	public long getSensorTime() {
		return sensorTime;
	}

	public String getValues(){
		SimpleDateFormat sdf = new SimpleDateFormat("MMM dd,yyyy HH:mm");
		Date sensorTimeInDisplay = new Date(sensorTime);
		return " timeGenerated: " + sdf.format(sensorTimeInDisplay) + " temperature: " + (int)this.temperature + " humidity: " + (int)this.humidity + " skuNumbers " + this.skuNumbers;
	}
}

4.3. Sensor package

The TestSensor class sends ShipmentData to the JDG sensor-data cache, simulating the real world work of an IoT sensor. The key for the cache is the shipment ID, like shipment1, shipment2,…​ shipment1000, while the value is the ShipmentData object.

public class TestSensor {

	public static final String CACHE_NAME = "sensor-data";
	private static final Random RANDOM = new Random();

	// running for 24 hours
	private static final int GENERATE_INTERVAL = 1000 * 60 * 60 * 24;
	// how often data should be generated, in ms
	private static final int GENERATE_PERIOD = 1000;

	public static void main(String[] args) throws InterruptedException {

		// Configure remote cache
		ConfigurationBuilder builder = new ConfigurationBuilder();
		String DATAGRID_IP = args[0];
		builder.addServer().host(DATAGRID_IP).port(ConfigurationProperties.DEFAULT_HOTROD_PORT);
		RemoteCacheManager cacheManager = new RemoteCacheManager(builder.build());
		RemoteCache<String, ShipmentData> cache = cacheManager.getCache(CACHE_NAME);

		// Insert some sensor data into the cache
		TimerTask randomSensorData = new SensorDataGenerator(cache);
		Timer timer = new Timer(true);
		timer.schedule(randomSensorData, 0, GENERATE_PERIOD);

		// Generate sensor data for specified interval
		Thread.sleep(GENERATE_INTERVAL);
		// Thread.sleep(GENERATE_INTERVAL * 60 * 1000);
		randomSensorData.cancel();
		cacheManager.stop();
		System.exit(0);
	}

	private static class SensorDataGenerator extends TimerTask {
		// maximum temperature for the simulation data range
		private static final int TEMP_MAX = 60;
		// maximum humidity for the simulation data range
		private static final int HUMIDITY_MAX = 100;

		private final RemoteCache<String, ShipmentData> cache;

		public SensorDataGenerator(RemoteCache<String, ShipmentData> cache) {
			this.cache = cache;
		}

		@Override
		public void run() {
			//Simulate 1000 different shipments
			int i = RANDOM.nextInt(1000);

			String shipment = "shipment" + i;
			String sku = "sku_";
			double temperature = RANDOM.nextDouble() * TEMP_MAX;
			double humidity = RANDOM.nextDouble() * HUMIDITY_MAX;
			// setup the fake data, that the first batch of sku# is for meat shipment, next batch is metal, next batch is animal, last batch for combined shipment
			if (i < 250) {
				sku = sku + "meat" + i;
				temperature = RANDOM.nextDouble() * 30;
			}
			if (i >= 250 && i < 500) {
				sku = sku + "metal" + i;
				humidity = RANDOM.nextDouble() * 40;
			}
			if (i >= 500 && i < 750) {
				sku = sku + "animal" + i;
				temperature = RANDOM.nextDouble() * 45;
				humidity = RANDOM.nextDouble() * 75;
			}
			if (i >= 750 && i < 1000) {
				sku = sku + "animal" + i + "," + sku + "metal" + i;
			}

			ShipmentData newShipment = new ShipmentData();
			newShipment.setHumidity(humidity);
			newShipment.setTemperature(temperature);
			newShipment.setSkuNumbers(sku);
			newShipment.setSensorTime(System.currentTimeMillis());

			cache.put(shipment, newShipment);
			System.out.println("Inserted " + shipment + newShipment.getValues());
		}
	}

}

4.3.1. TestSensor class detail explanation

SensorDataGenerator is the inner class which extends TimerTask; the main purpose for this class is to generate the data which simulates real world IoT data:

	private static class SensorDataGenerator extends TimerTask {

To simulate the data of IoT sensors from all over the world, this class will generate temperature and humidity data, and set these values for random shipments (shipment id from 0 to 999).

The key for ShipmentData is the shipment field; the value of it won’t change during the whole trip, just like in the real world. Each shipment could contain different items, which are identified by SKU numbers; it could be one or multiple items, and during the trip, the SKU number won’t change as well.

For demonstration purposes and in our example, if the random shipment id is between 0 to 250, the SKU number will be assigned as meat shipment. If the random shipment id is between 250 to 500, the SKU number will be assigned as metal shipment. if the random shipment id is between 500 to 750, the SKU number will be assigned as animal shipment. For shipment id between 750 to 1000, it’s a concatenated string of metal and animal, to demo a shipment that has both animal and metal products in it.

			String shipment = "shipment" + i;
			String sku = "sku_";
			double temperature = RANDOM.nextDouble() * TEMP_MAX;
			double humidity = RANDOM.nextDouble() * HUMIDITY_MAX;
			// setup the fake data, that the first batch of sku# is for meat shipment, next batch is metal, next batch is animal, last batch for combined shipment
			if (i < 250) {
				sku = sku + "meat" + i;
				temperature = RANDOM.nextDouble() * 30;
			}
			if (i >= 250 & i < 500) {
				sku = sku + "metal" + i;
				humidity = RANDOM.nextDouble() * 40;
			}
			if (i >= 500 & i < 750) {
				sku = sku + "animal" + i;
				temperature = RANDOM.nextDouble() * 45;
				humidity = RANDOM.nextDouble() * 75;
			}
			if (i >= 750 & i < 1000) {
				sku = sku + "animal" + i + "," + sku + "metal" + i;
			}

The main method of the TestSensor class will set up the Hot Rod Java Client to JDG cluster and connect to the sensor-data cache:

		// Configure remote cache
		ConfigurationBuilder builder = new ConfigurationBuilder();
		String DATAGRID_IP = args[0];
		builder.addServer().host(DATAGRID_IP).port(ConfigurationProperties.DEFAULT_HOTROD_PORT);
		RemoteCacheManager cacheManager = new RemoteCacheManager(builder.build());
		RemoteCache<String, ShipmentData> cache = cacheManager.getCache(CACHE_NAME);

Then it will start the timer and assign the task, which generates random sensor data.

		// Insert some sensor data into the cache
		TimerTask randomSensorData = new SensorDataGenerator(cache);
		Timer timer = new Timer(true);
		timer.schedule(randomSensorData, 0, GENERATE_PERIOD);

e.g. Below is sample of random data output.

Inserted shipment546 timeGenerated: Aug 22,2016 14:42 temperature: 30 humidity: 35 skuNumbers sku_animal546
Inserted shipment333 timeGenerated: Aug 22,2016 14:42 temperature: 11 humidity: 6 skuNumbers sku_metal333
Inserted shipment571 timeGenerated: Aug 22,2016 14:42 temperature: 33 humidity: 13 skuNumbers sku_animal571
Inserted shipment942 timeGenerated: Aug 22,2016 14:42 temperature: 47 humidity: 3 skuNumbers sku_animal942,sku_metal942

4.4. Analyzer package

The Analyzer class reads shipment data from the JDG 7 sensor-data cache based on the shipmentID key, then calculates average temperature and humidity for the past 5 minutes for each shipment, and stores the average ShipmentData to another (sensor-avg-data) JDG 7 cache, also storing the latest shipment data back in the map in the sensor-data cache.

/**
 * <p>Computes average sensor data in each place from incoming stream of measurements.</p>
 * <p>The measurement stream is read from the JDG's sensor-data and produces stream of average data.
 * Updates are written back to the Data Grid into a different cache called sensor-avg-data.
 */
object Analyzer {

  val inputDataGridCache = "sensor-data"
  val outputDataGridCache = "sensor-avg-data"



  val mapFunc = (place: String, currDataLists: Option[Iterable[ShipmentData]], state: State[Map[String, Iterable[ShipmentData]]]) => {

    // obtain the state map contains the past 5 minutes shipment data
    val stateMap: Map[String, Iterable[ShipmentData]] = state.getOption().getOrElse(Map[String, Iterable[ShipmentData]]())
    val pastData: (Iterable[ShipmentData]) = stateMap.getOrElse(place, (Iterable[ShipmentData])())

    var sumTemp: Double = 0d;
    var sumHumd: Double = 0d;
    val currTime: Long = java.lang.System.currentTimeMillis();
    var skuNos: String = "";

    //create the new list for storing the data, from both pastData and currData
    var newShipmentData = collection.mutable.ListBuffer[ShipmentData]();

    //only count the time in less than 5 minutes
    pastData.foreach { data: ShipmentData =>
      val sensorTime: Long = data.getSensorTime();

      //for testing usage
      val sensorTimeInDisplay = new Date(sensorTime);

      //only include the data from past 5 minutes
      if (currTime - sensorTime < 5 * 60 * 1000) {
        sumTemp = sumTemp + data.getTemperature();
        sumHumd = sumHumd + data.getHumidity();
        newShipmentData += data;
      }
    }

    val currData: Iterable[ShipmentData] = currDataLists.getOrElse(List());

    currData.foreach { data: ShipmentData =>
      //only count the time in less than 5 minutes
      val sensorTime: Long = data.getSensorTime();

      //for testing usage
      val sensorTimeInDisplay = new Date(sensorTime);
      //only include the data from past 5 minutes
      if (currTime - sensorTime < 5 * 60 * 1000) {
        sumTemp = sumTemp + data.getTemperature();
        sumHumd = sumHumd + data.getHumidity();
        newShipmentData += data;
        skuNos = data.getSkuNumbers();
      }
    }

    val count = newShipmentData.size;
     var avgTemp: Double = sumTemp / count;
    var avgHumd: Double = sumHumd / count;

    var avgShipmentData: ShipmentData = new ShipmentData();
    avgShipmentData.setTemperature(avgTemp);
    avgShipmentData.setHumidity(avgHumd);
    avgShipmentData.setSkuNumbers(skuNos);
    avgShipmentData.setSensorTime(currTime);

    // update stored state
    state.update(stateMap.updated(place, newShipmentData.toList))

    (place, avgShipmentData)
  }


  def main(args: Array[String]) {
    val usage = """
    Please input the Spark server address in ip:port format
  """

    if (args.length == 0) println(usage)
    val dataGridServer = args(0)
    // Initialize the Spark streaming context, with a batch duration of 1 second.
    val sparkConf = new SparkConf().setAppName("SparkSensor")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    // set up checkpoint to store state information
    ssc.checkpoint("/mnt/shared/sharefs/spark-sensor")

    // configure the connection to the DataGrid and create the incoming DStream
    val configIn = new Properties
    configIn.put("infinispan.rdd.cacheName", inputDataGridCache)
    configIn.put("infinispan.client.hotrod.server_list", dataGridServer)
    val ispnStream = new InfinispanInputDStream[String, ShipmentData](ssc, StorageLevel.MEMORY_ONLY, configIn)

    // extract the (place, temperature) pair from the incoming stream that is composed of (key, value, eventType)
    // we are assuming only events of type CLIENT_CACHE_ENTRY_CREATED will be present.
    val measurementStream: DStream[(String, ShipmentData)] = ispnStream map { case (key, value, eventType) => (key, value) }

    // as measurements are batched, it can contain several measurements from the same place - let's group them together
    val measurementGrouped: DStream[(String, Iterable[ShipmentData])] = measurementStream.groupByKey()

    // Produce a new DStream combining the previous DStream with the stored state
    val avgSensorDataStream: MapWithStateDStream[String, Iterable[ShipmentData], Map[String, Iterable[ShipmentData]], (String, ShipmentData)] =
      measurementGrouped.mapWithState(StateSpec.function(mapFunc))

    // just print number of items in each RDD
    avgSensorDataStream.foreachRDD(rdd => {
      printf("# items in DStream: %d\n", rdd.count())
      rdd.foreach { case (place, average) => println("Averages:" + place + " -> " + average) }
    })

    // write stream to the avg-sensor-data cache
    val configOut = new Properties
    configOut.put("infinispan.rdd.cacheName", outputDataGridCache)
    configOut.put("infinispan.client.hotrod.server_list", dataGridServer)
    avgSensorDataStream.writeToInfinispan(configOut)

    ssc.start()
    ssc.awaitTermination()
  }
}

4.4.1. Analyzer class detail explanation

First declare both JDG caches that will be used in this class:

  val inputDataGridCache = "sensor-data"
  val outputDataGridCache = "sensor-avg-data"

Get the JDG 7 server address from the first argument provided when starting the server, it can be the full cluster address like this: 10.19.137.34:11222;10.19.137.35:11222;10.19.137.36:11222

    val dataGridServer = args(0)

Next, initialize settings for spark stream and checkpoint path.

    // Initialize the Spark streaming context, with a batch duration of 1 second.
    val sparkConf = new SparkConf().setAppName("SparkSensor")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    // set up checkpoint to store state information
    ssc.checkpoint("/mnt/shared/sharefs/spark-sensor")

Then create the Spark stream.

    // configure the connection to the DataGrid and create the incoming DStream
    val configIn = new Properties
    configIn.put("infinispan.rdd.cacheName", inputDataGridCache)
    configIn.put("infinispan.client.hotrod.server_list", dataGridServer)
    val ispnStream = new InfinispanInputDStream[String, ShipmentData](ssc, StorageLevel.MEMORY_ONLY, configIn)

Next section is the kernel of this class, it extracts data from incoming stream and call the mapFunc function to calculate the average sensor data.

    // extract the (place, temperature) pair from the incoming stream that is composed of (key, value, eventType)
    // we are assuming only events of type CLIENT_CACHE_ENTRY_CREATED will be present.
    val measurementStream: DStream[(String, ShipmentData)] = ispnStream map { case (key, value, eventType) => (key, value) }

    // as measurements are batched, it can contain several measurements from the same place - let's group them together
    val measurementGrouped: DStream[(String, Iterable[ShipmentData])] = measurementStream.groupByKey()

    // Produce a new DStream combining the previous DStream with the stored state
    val avgSensorDataStream: MapWithStateDStream[String, Iterable[ShipmentData], Map[String, Iterable[ShipmentData]], (String, ShipmentData)] =
      measurementGrouped.mapWithState(StateSpec.function(mapFunc))

Last part of the main method will just write to the JDG 7 cache (avg-sensor-data).

    // write stream to the avg-sensor-data cache
    val configOut = new Properties
    configOut.put("infinispan.rdd.cacheName", outputDataGridCache)
    configOut.put("infinispan.client.hotrod.server_list", dataGridServer)
    avgSensorDataStream.writeToInfinispan(configOut)

The other funciton (mapFunc) in this class focuses on the getting the past 5-minute sensor data and calculating averages based on it.

First, get the existing statemap, and initialize other variables like the new average shipment data collection.

    // obtain the state map contains the past 5 minutes shipment data
    val stateMap: Map[String, Iterable[ShipmentData]] = state.getOption().getOrElse(Map[String, Iterable[ShipmentData]]())
    val pastData: (Iterable[ShipmentData]) = stateMap.getOrElse(place, (Iterable[ShipmentData])())

    var sumTemp: Double = 0d;
    var sumHumd: Double = 0d;
    val currTime: Long = java.lang.System.currentTimeMillis();
    var skuNos: String = "";

    //create the new list for storing the data, from both pastData and currData
    var newShipmentData = collection.mutable.ListBuffer[ShipmentData]();

Then get the total sum from both history data and current data.

    //only count the time in less than 5 minutes
    pastData.foreach { data: ShipmentData =>
      val sensorTime: Long = data.getSensorTime();

      //for testing usage
      val sensorTimeInDisplay = new Date(sensorTime);

      //only include the data from past 5 minutes
      if (currTime - sensorTime < 5 * 60 * 1000) {
        sumTemp = sumTemp + data.getTemperature();
        sumHumd = sumHumd + data.getHumidity();
        newShipmentData += data;
      }
    }

    val currData: Iterable[ShipmentData] = currDataLists.getOrElse(List());

    currData.foreach { data: ShipmentData =>
      //only count the time in less than 5 minutes
      val sensorTime: Long = data.getSensorTime();

      //for testing usage
      val sensorTimeInDisplay = new Date(sensorTime);
      //only include the data from past 5 minutes
      if (currTime - sensorTime < 5 * 60 * 1000) {
        sumTemp = sumTemp + data.getTemperature();
        sumHumd = sumHumd + data.getHumidity();
        newShipmentData += data;
        skuNos = data.getSkuNumbers();
      }
    }

Last part calculates the average data and updates statemap.

    val count = newShipmentData.size;
     var avgTemp: Double = sumTemp / count;
    var avgHumd: Double = sumHumd / count;

    var avgShipmentData: ShipmentData = new ShipmentData();
    avgShipmentData.setTemperature(avgTemp);
    avgShipmentData.setHumidity(avgHumd);
    avgShipmentData.setSkuNumbers(skuNos);
    avgShipmentData.setSensorTime(currTime);

    // update stored state
    state.update(stateMap.updated(place, newShipmentData.toList))

    (place, avgShipmentData)

4.5. Client package

The Client class simulates the real world end user, like the transportation company watching closely for its shipment. It contains the business rules based on SKU number, if the shipment data reaches the warning level, etc.

It registers a listener on the JDG sensor-avg-data cache for the shipment and when the shipmentID matches, it will read the average temperature and humidity data of the past 5 minutes and compare them with the threshold. Inside the ShipmentData class there is a field called skuNumbers, which contains one or more SKU numbers. Client.java uses the skuNumbers to get the related threshold rules and compare with the current average temperature and humidity to decide whether to send out a warning or not.

Examples: 1) Sensor sends shipment1 to 1st JDG cache

Inserted shipment1 timeGenerated: Aug 22,2016 15:07 temperature: 19 humidity: 67 skuNumbers sku_meat1
Inserted shipment1 timeGenerated: Aug 22,2016 15:07 temperature: 17 humidity: 61 skuNumbers sku_meat1

2) Spark Analyzer reads from 1st JDG cache, calcualtes the shipment1 average temperature value as 18 and stores it in the 2nd JDG cache.

3) Client is listening on the 2nd JDG cache for shipment1, so it reads the average temperature in the past 5 minutes as 18, and also reads the skuNumbers as sku_meat1. Since for meat shipments, there is a threshold that temperature can’t be higher than 15 degrees, it sends out the below warning:

Average data is from: shipment1 timeGenerated: Aug 22,2016 15:07 temperature: 18 humidity: 64 skuNumbers sku_meat1
Warning, MEAT shipment is in danger, current temperature: 18 threshold is 15

This package contains two classes. First class is Client.java, the main purpose of which is to create the Hot Rod Java Client connection and add the listener to the 2nd JDG cache (sensor-avg-data).

public class Client {

   public static final String CACHE_NAME = "sensor-avg-data";

   public static void main(String[] args) throws Exception {
      // check provided arguments - at least one shipment needs to be specified
      if (args.length < 1) {
         System.err.println("You have to provide list of shipment to watch, at least one!");
         System.exit(1);
      }
      String DATAGRID_IP = args[0];
      args[0] = null;
      Set<String> shipmentToWatch = new HashSet<>(args.length);
      Collections.addAll(shipmentToWatch, args);

      // Configure remote cache
      ConfigurationBuilder builder = new ConfigurationBuilder();
      builder.addServer().host(DATAGRID_IP).port(ConfigurationProperties.DEFAULT_HOTROD_PORT);
      RemoteCacheManager cacheManager = new RemoteCacheManager(builder.build());
      RemoteCache<String, ShipmentData> cache = cacheManager.getCache(CACHE_NAME);

      // Add cache listener and wait for specified amount of time
      AvgSensorDataListener avgListener = new AvgSensorDataListener(cache, shipmentToWatch);
      cache.addClientListener(avgListener);
      int LISTEN_TIME = 24; // how long the client should listen to changes, in hours
      System.out.printf("Client will be listening to avg. sensor updates for %d hours%n", LISTEN_TIME);
      Thread.sleep(LISTEN_TIME * 60 * 60 * 1000);

      System.out.println("Stopping client");
      cache.removeClientListener(avgListener);
      cacheManager.stop();
      System.exit(0);
   }

}

4.5.1. Client class detail explanation

The first part of Client’s main method is the java argument handling, which deals with the IP addresses and shipment ids that it needs to listen to.

      // check provided arguments - at least one shipment needs to be specified
      if (args.length < 1) {
         System.err.println("You have to provide list of shipment to watch, at least one!");
         System.exit(1);
      }
      String DATAGRID_IP = args[0];

It corrosponds to the invoke command of Client.java

# java -jar target/temperature-client-jar-with-dependencies.jar 10.19.137.34 shipment1 shipment5 shipment9

Those shipment id strings that the client is interested in, will be put into a collection, then passed to the second class, AvgSensorDataListener.

      Set<String> shipmentToWatch = new HashSet<>(args.length);
      Collections.addAll(shipmentToWatch, args);

The next part will create the Hot Rod Java Client to JDG cluster and connect with the sensor-avg-data JDG cache.

// Configure remote cache
      ConfigurationBuilder builder = new ConfigurationBuilder();
      builder.addServer().host(DATAGRID_IP).port(ConfigurationProperties.DEFAULT_HOTROD_PORT);
      RemoteCacheManager cacheManager = new RemoteCacheManager(builder.build());
      RemoteCache<String, ShipmentData> cache = cacheManager.getCache(CACHE_NAME);

Then it just registers the listener:

// Add cache listener and wait for specified amount of time
      AvgSensorDataListener avgListener = new AvgSensorDataListener(cache, shipmentToWatch);
      cache.addClientListener(avgListener);
      int LISTEN_TIME = 24; // how long the client should listen to changes, in hours
      System.out.printf("Client will be listening to avg. sensor updates for %d hours%n", LISTEN_TIME);
      Thread.sleep(LISTEN_TIME * 60 * 60 * 1000);

The second class in Client package is AvgSensorDataListener.java

@ClientListener
public class AvgSensorDataListener {
	private final RemoteCache<String, ShipmentData> cache;
	private final Set<String> watchedShipment;

	public AvgSensorDataListener(RemoteCache<String, ShipmentData> cache, Set<String> watchedShipment) {
		this.cache = cache;
		this.watchedShipment = watchedShipment;
	}

	@ClientCacheEntryCreated
	public void entryCreated(ClientCacheEntryCreatedEvent<String> event) {
		for (String oneWatchedShipment : watchedShipment){
			if (event.getKey()!=null && oneWatchedShipment != null && event.getKey().contains(oneWatchedShipment))
				judgeAction(event.getKey());
		}

	}

	@ClientCacheEntryModified
	public void entryModified(ClientCacheEntryModifiedEvent<String> event) {
		for (String oneWatchedShipment : watchedShipment){
			if (event.getKey()!=null && oneWatchedShipment != null && event.getKey().contains(oneWatchedShipment))
				judgeAction(event.getKey());
		}
	}

	private void judgeAction(String key) {
		ShipmentData data = cache.get(key);
		// setup a few test business rules, rules are based on SKU #.
		String skuNo = data.getSkuNumbers();
		if (skuNo.contains("meat") && data.getTemperature() > 15) {
			// business logic: meat's average temperature can't be higher than 15
			System.out.println("Average data is from: " + key + data.getValues());
			System.out.println(" Warning, MEAT shipment is in danger,  current temperature: " + (int) data.getTemperature() + " threshold is 15");
			System.out.println("");
		}
		if (skuNo.contains("metal") && data.getHumidity() > 30) {
			// business logic:metal's average humidity can't be higher than 30
			System.out.println("Average data is from: " + key + data.getValues());
			System.out.println("  Warning, METAL shipment is in danger,  current humidity: " + (int) data.getHumidity() + " threshold is 30");
			System.out.println("");
		}
		if (skuNo.contains("animal") && (data.getTemperature() < 15 || data.getTemperature() > 30 || data.getHumidity() < 20 || data.getHumidity()> 90)) {
			// business logic: animal average temperature can't be lower than 15 or higher than 30 and average humidity need to between 20 and 90
			System.out.println("Average data is from: " + key + data.getValues());
			System.out.println(" Warning, ANIMAL shipment is in danger, current temperature:  " + (int) data.getTemperature() + " threshold is between 15 to 30" + " current humidity: " + (int) data.getHumidity() + " threshold is between 20 to 90");
			System.out.println("");
		}

	}
}

4.5.2. AvgSensorDataListener class detail explanation

Both entryCreated and entryModified methods will compare the watched shipment string with the current created/modified shipment string. If they are a match, then it invokes another method to see if the data will trigger the alert.

	@ClientCacheEntryCreated
	public void entryCreated(ClientCacheEntryCreatedEvent<String> event) {
		for (String oneWatchedShipment : watchedShipment){
			if (event.getKey()!=null && oneWatchedShipment != null && event.getKey().contains(oneWatchedShipment))
				updateAction(event.getKey());
		}

	}

	@ClientCacheEntryModified
	public void entryModified(ClientCacheEntryModifiedEvent<String> event) {
		for (String oneWatchedShipment : watchedShipment){
			if (event.getKey()!=null && oneWatchedShipment != null && event.getKey().contains(oneWatchedShipment))
				updateAction(event.getKey());
		}
	}

When the right sensor data is encountered, the judgeAction method will be invoked. It uses the SKU number to decide whether the threshold is reached or not, to then send out an alert. Below is the code for meat shipment; the other cases are similar.

		ShipmentData data = cache.get(key);
		// setup a few test business rules, rules are based on SKU #.
		String skuNo = data.getSkuNumbers();
		if (skuNo.contains("meat") && data.getTemperature() > 15) {
			// business logic: meat's average temperature can't be higher than 15
			System.out.println("Average data is from: " + key + data.getValues());
			System.out.println(" Warning, MEAT shipment is in danger,  current temperature: " + (int) data.getTemperature() + " threshold is 15");
			System.out.println("");
		}