9.6. Distribution/Replication/Scattered
这是流逐步进入的位置。当执行流操作时,它会将各种中间和终端操作发送到每个已大数据的节点。这允许在拥有数据的节点上处理中间值,并且仅将最终结果发回到原始节点,从而提高性能。
9.6.1. Rehash Aware
数据内部被分割,每个节点仅在作为主所有者拥有的数据上执行操作。这允许均匀处理数据,假设片段足够细化,以为每个节点上提供相等的数据量。
当您使用分布式缓存时,当新节点加入或离开时,可以在节点间重新建立数据。分布式流处理自动重新处理数据,因此您不必担心在节点离开或加入集群时不必担心监控。Reshuffled 条目可能会被处理一次,我们跟踪关键级别或网段级别(取决于终端操作)的已处理条目,以限制重复处理的数量。
可能这样做,但强烈建议您禁用对流的重新哈希意识。只有在请求只能处理重新哈希时查看数据子集时才应被视为。这可通过调用 CacheStream.disableRehashAware () 在重新哈希不完全不完全发生时,可以进行大多数操作的性能。唯一的例外是针对迭代器和每个对象,它们将使用较少的内存,因为它们不必跟踪已处理的密钥。
请重新考虑禁用重新哈希意识,除非您真正了解自己所做的工作。
9.6.2. 序列化
因为操作会发送到其他节点,因此 Data Grid marshalling 必须被序列化。这允许将操作发送到其他节点。
最简单的方法是使用 CacheStream 实例,像您通常一样使用 lambda。数据网格覆盖了所有各种流中间和终端方法,以获取参数的 Serializable 版本(ie. SerializableFunction、SerializableFunction、SerializablePredicate…),您可以在 CacheStream 找到以下方法。这依赖于 spec 来选择 此处定义 的最具体方法。
在之前的示例中,我们使用 Collector
将所有结果收集到 映射
中。遗憾的是,Collectors 类不会生成 Serializable 实例。因此,如果您需要使用这些方法,可以通过两种方式来实现:
一个选项是使用 CacheCollectors 类,它允许提供 供应商<Collector&
gt;。然后,这个实例可以使用 Collectors 提供不序列化的 收集器
。
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(e -> e.getValue().contains("Jboss")) .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
或者,您可以避免使用 CacheCollectors,并使用采用 vendor< Collector
> 的超载 收集
方法。这些超载的 收集
方法只能通过 CacheStream
接口获得。
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(e -> e.getValue().contains("Jboss")) .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
但是,您无法使用 Cache
和 CacheStream
接口,则无法使用
参数,而是必须通过将 lambdas 手动对多个接口进行序列化处理。这并不是一个非常高的,但它会让作业完成。
Serializable
Map<Object, String> jbossValues = map.entrySet().stream() .filter((Serializable & Predicate<Map.Entry<Object, String>>) e -> e.getValue().contains("Jboss")) .collect(CacheCollectors.serializableCollector(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
推荐的也是最高性能的方法是使用 高级外部化器
,因为它提供了最小的有效负载。不幸的是,您不能将 lamdbas 用作高级外部化器要求在手动之前定义课程。
您可以使用高级外部化器,如下所示:
Map<Object, String> jbossValues = cache.entrySet().stream() .filter(new ContainsFilter("Jboss")) .collect(() -> Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); class ContainsFilter implements Predicate<Map.Entry<Object, String>> { private final String target; ContainsFilter(String target) { this.target = target; } @Override public boolean test(Map.Entry<Object, String> e) { return e.getValue().contains(target); } } class JbossFilterExternalizer implements AdvancedExternalizer<ContainsFilter> { @Override public Set<Class<? extends ContainsFilter>> getTypeClasses() { return Util.asSet(ContainsFilter.class); } @Override public Integer getId() { return CUSTOM_ID; } @Override public void writeObject(ObjectOutput output, ContainsFilter object) throws IOException { output.writeUTF(object.target); } @Override public ContainsFilter readObject(ObjectInput input) throws IOException, ClassNotFoundException { return new ContainsFilter(input.readUTF()); } }
您还可以为收集器供应商使用高级外部化器来进一步降低有效负载大小。
Map<Object, String> map = (Map<Object, String>) cache.entrySet().stream() .filter(new ContainsFilter("Jboss")) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); class ToMapCollectorSupplier<K, U> implements Supplier<Collector<Map.Entry<K, U>, ?, Map<K, U>>> { static final ToMapCollectorSupplier INSTANCE = new ToMapCollectorSupplier(); private ToMapCollectorSupplier() { } @Override public Collector<Map.Entry<K, U>, ?, Map<K, U>> get() { return Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue); } } class ToMapCollectorSupplierExternalizer implements AdvancedExternalizer<ToMapCollectorSupplier> { @Override public Set<Class<? extends ToMapCollectorSupplier>> getTypeClasses() { return Util.asSet(ToMapCollectorSupplier.class); } @Override public Integer getId() { return CUSTOM_ID; } @Override public void writeObject(ObjectOutput output, ToMapCollectorSupplier object) throws IOException { } @Override public ToMapCollectorSupplier readObject(ObjectInput input) throws IOException, ClassNotFoundException { return ToMapCollectorSupplier.INSTANCE; } }