11.6. distribution/Replication/Scattered
这是流进入其范围的地方。执行流操作时,它会将各种中间和终端操作发送到具有相关数据的每个节点。这允许处理拥有数据的节点上的中间值,并且仅将最终结果发送回原始节点,从而提高性能。
11.6.1. 重新哈希 Aware
在内部,数据被分段,每个节点只对它拥有的数据执行操作。这允许平均处理数据,假设片段足够精细,以为每个节点上提供相同数量的数据。
使用分布式缓存时,当新节点加入或离开时,可以在节点间重新处理数据。分布式流处理会自动重新处理数据,因此您不必在节点离开或加入集群时考虑监控。Reshuffled 条目可能会第二次处理,我们将在密钥级别或网段级别(取决于终端操作)跟踪已处理的条目,以限制重复处理的数量。
最好禁用重新哈希对流的了解。只有在您的请求只能处理重新哈希时,才应考虑这一点。这可以通过调用 CacheStream.disableRehashAware () 来实现,当重新哈希无法完全忽略时,大多数操作的性能都会获得相应的性能。唯一的例外是 iterator 和 per per (使用较少的内存),因为它们不必跟踪已处理的密钥。
请重新考虑禁用重新哈希感知,除非您真正知道自己正在执行的操作。
11.6.2. serialization
由于操作会互相发送到其他节点,因此它们必须可以被 Data Grid marshalling 处理。这允许将操作发送到其他节点。
最简单的方法是使用 CacheStream 实例,并使用 lambda,就像您正常一样。Data Grid 会覆盖所有各种流中间和终端方法,以获取参数的 Serializable 版本(如 SerializableFunction、SerializablePredicate…)您可以在 CacheStream 中找到这些方法。这依赖于 spec 来选择 此处定义的 最具体方法。
在前面的示例中,我们使用 Collector
将所有结果收集到 Map
中。不幸的是 ,Collector s 类不会生成 Serializable 实例。因此,如果您需要使用它们,可以通过两种方式来实现:
一个选项是使用 CacheCollectors 类,该类允许提供 Supplier<Collector&
gt;。然后,这个实例可以使用 Collector
提供没有序列化的 Collector。https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collectors.html
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(e -> e.getValue().contains("Jboss")) .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
或者,您可以避免使用 CacheCollectors,而是使用采用 Supplier<Collector
> 的超载 收集
方法。这些过载 收集
方法只能通过 CacheStream
接口获得。
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(e -> e.getValue().contains("Jboss")) .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
但是,如果您无法使用 Cache
和 CacheStream
接口,则无法使用
参数,而是必须通过将 lambda 发送到多个接口来手动调用 lambdas。这不是一个非常好的,但会获得作业完成。
Serializable
Map<Object, String> jbossValues = map.entrySet().stream() .filter((Serializable & Predicate<Map.Entry<Object, String>>) e -> e.getValue().contains("Jboss")) .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
推荐的和最高性能的方法是使用 AdvancedExternalizer
,因为这提供了最小的有效负载。不幸的是,您不能将 lamdbas 用作高级外部工具,需要预先定义该类。
您可以使用高级外部工具,如下所示:
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(new ContainsFilter("Jboss")) .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); class ContainsFilter implements Predicate<Map.Entry<Object, String>> { private final String target; ContainsFilter(String target) { this.target = target; } @Override public boolean test(Map.Entry<Object, String> e) { return e.getValue().contains(target); } } class JbossFilterExternalizer implements AdvancedExternalizer<ContainsFilter> { @Override public Set<Class<? extends ContainsFilter>> getTypeClasses() { return Util.asSet(ContainsFilter.class); } @Override public Integer getId() { return CUSTOM_ID; } @Override public void writeObject(ObjectOutput output, ContainsFilter object) throws IOException { output.writeUTF(object.target); } @Override public ContainsFilter readObject(ObjectInput input) throws IOException, ClassNotFoundException { return new ContainsFilter(input.readUTF()); } }
您还可以为收集器供应商使用高级外部工具来进一步缩小有效负载大小。
Map<Object, String> map = (Map<Object, String>) cache.entrySet().stream() .filter(new ContainsFilter("Jboss")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); class ToMapCollectorSupplier<K, U> implements Supplier<Collector<Map.Entry<K, U>, ?, Map<K, U>>> { static final ToMapCollectorSupplier INSTANCE = new ToMapCollectorSupplier(); private ToMapCollectorSupplier() { } @Override public Collector<Map.Entry<K, U>, ?, Map<K, U>> get() { return Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue); } } class ToMapCollectorSupplierExternalizer implements AdvancedExternalizer<ToMapCollectorSupplier> { @Override public Set<Class<? extends ToMapCollectorSupplier>> getTypeClasses() { return Util.asSet(ToMapCollectorSupplier.class); } @Override public Integer getId() { return CUSTOM_ID; } @Override public void writeObject(ObjectOutput output, ToMapCollectorSupplier object) throws IOException { } @Override public ToMapCollectorSupplier readObject(ObjectInput input) throws IOException, ClassNotFoundException { return ToMapCollectorSupplier.INSTANCE; } }