11.6. distribution/Replication/Scattered

这是流进入其范围的地方。执行流操作时,它会将各种中间和终端操作发送到具有相关数据的每个节点。这允许处理拥有数据的节点上的中间值,并且仅将最终结果发送回原始节点,从而提高性能。

11.6.1. 重新哈希 Aware

在内部,数据被分段,每个节点只对它拥有的数据执行操作。这允许平均处理数据,假设片段足够精细,以为每个节点上提供相同数量的数据。

使用分布式缓存时,当新节点加入或离开时,可以在节点间重新处理数据。分布式流处理会自动重新处理数据,因此您不必在节点离开或加入集群时考虑监控。Reshuffled 条目可能会第二次处理,我们将在密钥级别或网段级别(取决于终端操作)跟踪已处理的条目,以限制重复处理的数量。

最好禁用重新哈希对流的了解。只有在您的请求只能处理重新哈希时,才应考虑这一点。这可以通过调用 CacheStream.disableRehashAware () 来实现,当重新哈希无法完全忽略时,大多数操作的性能都会获得相应的性能。唯一的例外是 iterator 和 per per (使用较少的内存),因为它们不必跟踪已处理的密钥。

警告

请重新考虑禁用重新哈希感知,除非您真正知道自己正在执行的操作。

11.6.2. serialization

由于操作会互相发送到其他节点,因此它们必须可以被 Data Grid marshalling 处理。这允许将操作发送到其他节点。

最简单的方法是使用 CacheStream 实例,并使用 lambda,就像您正常一样。Data Grid 会覆盖所有各种流中间和终端方法,以获取参数的 Serializable 版本(如 SerializableFunction、SerializablePredicate…​)您可以在 CacheStream 中找到这些方法。这依赖于 spec 来选择 此处定义的 最具体方法。

在前面的示例中,我们使用 Collector 将所有结果收集到 Map 中。不幸的是 ,Collector s 类不会生成 Serializable 实例。因此,如果您需要使用它们,可以通过两种方式来实现:

一个选项是使用 CacheCollectors 类,该类允许提供 Supplier<Collector& gt;。然后,这个实例可以使用 Collector 提供没有序列化的 Collector。https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collectors.html

Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(e -> e.getValue().contains("Jboss"))
              .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));

或者,您可以避免使用 CacheCollectors,而是使用采用 Supplier<Collector > 的超载 收集 方法。这些过载 收集 方法只能通过 CacheStream 接口获得。

Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(e -> e.getValue().contains("Jboss"))
              .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

但是,如果您无法使用 CacheCacheStream 接口,则无法使用 Serializable 参数,而是必须通过将 lambda 发送到多个接口来手动调用 lambdas。这不是一个非常好的,但会获得作业完成。

Map<Object, String> jbossValues = map.entrySet().stream()
              .filter((Serializable & Predicate<Map.Entry<Object, String>>) e -> e.getValue().contains("Jboss"))
              .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));

推荐的和最高性能的方法是使用 AdvancedExternalizer,因为这提供了最小的有效负载。不幸的是,您不能将 lamdbas 用作高级外部工具,需要预先定义该类。

您可以使用高级外部工具,如下所示:

   Map<Object, String> jbossValues = cache.entrySet().stream()
              .filter(new ContainsFilter("Jboss"))
              .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

   class ContainsFilter implements Predicate<Map.Entry<Object, String>> {
      private final String target;

      ContainsFilter(String target) {
         this.target = target;
      }

      @Override
      public boolean test(Map.Entry<Object, String> e) {
         return e.getValue().contains(target);
      }
   }

   class JbossFilterExternalizer implements AdvancedExternalizer<ContainsFilter> {

      @Override
      public Set<Class<? extends ContainsFilter>> getTypeClasses() {
         return Util.asSet(ContainsFilter.class);
      }

      @Override
      public Integer getId() {
         return CUSTOM_ID;
      }

      @Override
      public void writeObject(ObjectOutput output, ContainsFilter object) throws IOException {
         output.writeUTF(object.target);
      }

      @Override
      public ContainsFilter readObject(ObjectInput input) throws IOException, ClassNotFoundException {
         return new ContainsFilter(input.readUTF());
      }
   }

您还可以为收集器供应商使用高级外部工具来进一步缩小有效负载大小。

Map<Object, String> map = (Map<Object, String>) cache.entrySet().stream()
              .filter(new ContainsFilter("Jboss"))
              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

 class ToMapCollectorSupplier<K, U> implements Supplier<Collector<Map.Entry<K, U>, ?, Map<K, U>>> {
      static final ToMapCollectorSupplier INSTANCE = new ToMapCollectorSupplier();

      private ToMapCollectorSupplier() { }

      @Override
      public Collector<Map.Entry<K, U>, ?, Map<K, U>> get() {
         return Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue);
      }
   }

   class ToMapCollectorSupplierExternalizer implements AdvancedExternalizer<ToMapCollectorSupplier> {

      @Override
      public Set<Class<? extends ToMapCollectorSupplier>> getTypeClasses() {
         return Util.asSet(ToMapCollectorSupplier.class);
      }

      @Override
      public Integer getId() {
         return CUSTOM_ID;
      }

      @Override
      public void writeObject(ObjectOutput output, ToMapCollectorSupplier object) throws IOException {
      }

      @Override
      public ToMapCollectorSupplier readObject(ObjectInput input) throws IOException, ClassNotFoundException {
         return ToMapCollectorSupplier.INSTANCE;
      }
   }