热 Rod Java 客户端指南


Red Hat Data Grid 8.5

配置和使用 Hot Rod Java 客户端

Red Hat Customer Content Services

摘要

热 Rod Java 客户端可让您对 Data Grid 集群进行高性能远程访问。

Red Hat Data Grid

Data Grid 是一个高性能分布式内存数据存储。

无架构数据结构
将不同对象存储为键值对的灵活性。
基于网格的数据存储
旨在在集群中分发和复制数据。
弹性扩展
动态调整节点数量,以便在不中断服务的情况下满足需求。
数据互操作性
从不同端点在网格中存储、检索和查询数据。

Data Grid 文档

红帽客户门户网站中提供了 Data Grid 的文档。

Data Grid 下载

访问红帽客户门户上的 Data Grid 软件下载

注意

您必须有一个红帽帐户才能访问和下载数据中心软件。

使开源包含更多

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。我们从这四个术语开始:master、slave、黑名单和白名单。由于此项工作十分艰巨,这些更改将在即将推出的几个发行版本中逐步实施。详情请查看 CTO Chris Wright 的信息

第 1 章 热 Rod Java 客户端

通过 Hot Rod Java 客户端 API 远程访问数据网格。

1.1. 热 Rod 协议

hot Rod 是一个二进制 TCP 协议,Data Grid 提供高性能客户端-服务器与以下功能的交互:

  • 负载平衡。热 Rod 客户端可以使用不同的策略在 Data Grid 集群中发送请求。
  • 故障切换.热 Rod 客户端可以监控数据网格集群拓扑更改,并自动切换到可用节点。
  • 有效的数据位置.热 Rod 客户端可以找到密钥所有者,并直接向这些节点发出请求,从而缩短延迟。

1.2. 客户端 Intelligence

热 Rod 客户端使用智能机制将请求高效地发送到 Data Grid Server 集群。默认情况下,Hot Rod 协议启用了 HASH_DISTRIBUTION_AWARE 智能机制。

BASIC 智能

客户端不会接收 Data Grid 集群的拓扑更改事件,如加入或离开节点,并且仅使用您添加到客户端配置中的数据网格服务器网络位置列表。

注意

当数据网格服务器没有向 Hot Rod 客户端发送内部和隐藏集群拓扑时,启用 BASIC 智能来使用 Hot Rod 客户端。

TOPOLOGY_AWARE 智能

客户端接收和存储 Data Grid 集群的拓扑更改事件,以动态跟踪网络上的 Data Grid 服务器。

要接收集群拓扑,客户端在启动时需要至少一个 Hot Rod 服务器的网络位置(IP 地址或主机名)。客户端连接后,Data Grid 服务器会将拓扑传输到客户端。当 Data Grid Server 节点加入或离开集群时,Data Grid 会将更新的拓扑传输到客户端。

HASH_DISTRIBUTION_AWARE 智能

除了哈希信息外,客户端还会接收和存储 Data Grid 集群的拓扑更改事件,以便客户端识别存储特定密钥的节点。

例如,考虑一个 put (k,v) 操作。客户端计算键的哈希值,以便它可以找到数据所在的确切的 Data Grid Server 节点。然后,客户端可以直接连接到该节点来执行读写操作。

HASH_DISTRIBUTION_AWARE 智能的好处是,数据网格服务器不需要根据密钥哈希查找值,这使用较少的服务器端资源。另一个好处是数据网格服务器可以更快地响应客户端请求,因为它们不需要创建额外的网络往返。

配置

默认情况下,Hot Rod 客户端使用您全局为所有 Data Grid 集群配置智能功能。

ConfigurationBuilder

ConfigurationBuilder builder = new ConfigurationBuilder();
builder.clientIntelligence(ClientIntelligence.BASIC);

hotrod-client.properties

infinispan.client.hotrod.client_intelligence=BASIC

当您将 Hot Rod 客户端配置为使用多个 Data Grid 集群时,您可以为每个集群使用不同的情报。

ConfigurationBuilder

ConfigurationBuilder builder = new ConfigurationBuilder();
builder.addCluster("NYC").clusterClientIntelligence(ClientIntelligence.BASIC);

hotrod-client.properties

infinispan.client.hotrod.cluster.intelligence.NYC=BASIC

服务器超时失败

如果服务器没有将拓扑报告为 BASIC,或者客户端因为网络问题而无法连接到服务器,客户端会将服务器标记为失败。客户端不会尝试连接到标记为失败的服务器,直到客户端收到更新的拓扑为止。因为 BASIC 拓扑永远不会发送更新,所以客户端不会重新尝试连接。

为避免这种情况,您可以使用 serverFailureTimeout 设置,该设置在指定的时间段内清除失败的服务器状态。在定义的超时后,Data Grid 将尝试重新连接服务器。如果服务器仍然无法访问,则再次将其标记为失败,并在定义的超时后重新尝试连接。您可以通过将 serverFailureTimeout 值设置为 -1 来禁用重新连接尝试。

ConfigurationBuilder

ConfigurationBuilder builder = new ConfigurationBuilder();
builder.serverFailureTimeout(5000).clusterClientIntelligence(ClientIntelligence.BASIC);

hotrod-client.properties

infinispan.client.hotrod.server_failure_timeout=5000
infinispan.client.hotrod.client_intelligence=BASIC

1.3. 请求负载平衡

热 Rod Java 客户端平衡对数据平面集群的请求,以便读取和写入操作分散到节点上。

使用 BASICTOPOLOGY_AWARE 智能的客户端为所有请求使用请求平衡。使用 HASH_DISTRIBUTION_AWARE 智能的客户端将请求直接发送到存储所需密钥的节点。如果节点没有响应,客户端会返回请求均衡。

默认平衡策略是循环的,因此 Hot Rod 客户端执行请求平衡,如下例所示,s1s 2、s3 是 Data Grid 集群中的节点:

// Connect to the Data Grid cluster
RemoteCacheManager cacheManager = new RemoteCacheManager(builder.build());
// Obtain the remote cache
RemoteCache<String, String> cache = cacheManager.getCache("test");

//Hot Rod client sends a request to the "s1" node
cache.put("key1", "aValue");
//Hot Rod client sends a request to the "s2" node
cache.put("key2", "aValue");
//Hot Rod client sends a request to the "s3" node
String value = cache.get("key1");
//Hot Rod client sends the next request to the "s1" node again
cache.remove("key2");

自定义平衡策略

如果在 Hot Rod 客户端配置中添加类,您可以使用自定义 FailoverRequestBalancingStrategy 实现。

ConfigurationBuilder

ConfigurationBuilder builder = new ConfigurationBuilder();
builder.addServer()
         .host("127.0.0.1")
         .port(11222)
         .balancingStrategy(new MyCustomBalancingStrategy());

hotrod-client.properties

infinispan.client.hotrod.request_balancing_strategy=my.package.MyCustomBalancingStrategy

1.4. 客户端故障切换

当 Data Grid 集群拓扑更改时,热 Rod 客户端可以自动进行故障转移。例如,具有拓扑感知的 Hot Rod 客户端可以检测一个或多个数据网格服务器何时失败。

除了集群数据网格服务器之间故障转移外,Hot Rod 客户端还可在 Data Grid 集群之间进行故障转移。

例如,您有一个 Data Grid 集群在 New York (NYC)中运行,另一个集群在伦敦(LON)中运行。向 NYC 发送请求的客户端检测到没有可用的节点,以便它们在 LON 中切换到集群。然后,客户端会维护到 LON 的连接,直到您手动切换集群或故障转移再次发生。

带有故障切换的事务缓存

条件操作(如 putIfAbsent (), replace (), remove () )具有严格的方法返回保证。同样,一些操作可能需要返回前面的值。

虽然 Hot Rod 客户端可以故障转移,但您应该使用事务缓存来确保操作不会部分完成,并在不同的节点上保留冲突的条目。

1.5. 热 Rod 客户端与 Data Grid 服务器的兼容性

借助数据网格服务器,您可以将 Hot Rod 客户端与不同的版本连接。例如,在迁移或升级到 Data Grid 集群期间,Hot Rod 客户端版本可能是比 Data Grid Server 低的 Data Grid 版本。

提示

Data Grid 建议使用最新的 Hot Rod 客户端版本从最新的功能和安全增强中受益。

Data Grid 8 及更新的版本

热 Rod 协议版本 3.x 会自动协商使用 Data Grid Server 的客户端可能的最高版本。

Data Grid 7.3 及更早版本

使用比 Data Grid 服务器版本高的 Hot Rod 协议版本的客户端必须设置 infinispan.client.hotrod.protocol_version 属性。

第 2 章 将 Data Grid 添加到 Maven 存储库

Data Grid Java 发行版可从 Maven 获取。

您可以从客户门户网站下载 Data Grid Maven 存储库,或者从公共 Red Hat Enterprise Maven 存储库拉取 Data Grid 依赖项。

2.1. 下载 Maven 存储库

如果您不想使用公共 Red Hat Enterprise Maven 存储库,将 Data Grid Maven 存储库下载并安装到本地文件系统、Apache HTTP 服务器或 Maven 存储库管理器。

流程

  1. 登录到红帽客户门户。
  2. 导航到 Data Grid 的软件下载
  3. 下载 Red Hat Data Grid 8.5 Maven 存储库。
  4. 将存档的 Maven 存储库提取到本地文件系统。
  5. 打开 README.md 文件,并按照适当的安装说明进行操作。

2.2. 添加 Red Hat Maven 软件仓库

在您的 Maven 构建环境中包括红帽 GA 存储库,以获取 Data Grid 工件和依赖项。

流程

  • 将 Red Hat GA 存储库添加到 Maven 设置文件中,通常为 ~/.m2/settings.xml,或者直接在项目的 pom.xml 文件中。

    <repositories>
      <repository>
        <id>redhat-ga-repository</id>
        <name>Red Hat GA Repository</name>
        <url>https://maven.repository.redhat.com/ga/</url>
      </repository>
    </repositories>
    <pluginRepositories>
      <pluginRepository>
        <id>redhat-ga-repository</id>
        <name>Red Hat GA Repository</name>
        <url>https://maven.repository.redhat.com/ga/</url>
      </pluginRepository>
    </pluginRepositories>

2.3. 配置项目 POM

配置项目中的项目对象模型(POM)文件,以将 Data Grid 依赖项用于嵌入式缓存、Hot Rod 客户端和其他功能。

流程

  1. 打开您的项目 pom.xml 进行编辑。
  2. 使用正确的 Data Grid 版本定义 version.infinispan 属性。
  3. dependencyManagement 部分中包含 infinispan-bom

    Bill Of Materials (BOM)控制依赖项版本,从而避免了版本冲突,这意味着您不需要为添加到项目的每个 Data Grid 工件设置版本。

  4. 保存并关闭 pom.xml

以下示例显示了 Data Grid 版本和 BOM:

<properties>
  <version.infinispan>14.0.21.Final-redhat-00001</version.infinispan>
</properties>

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.infinispan</groupId>
      <artifactId>infinispan-bom</artifactId>
      <version>${version.infinispan}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

后续步骤

根据需要,将 Data Grid 工件作为依赖项添加到 pom.xml 中。

第 3 章 热 Rod Java 客户端配置

Data Grid 提供了一个 Hot Rod Java 客户端配置 API,用于公开配置属性。

3.1. 添加 Hot Rod Java 客户端依赖项

添加 Hot Rod Java 客户端依赖项,将其包含在您的项目中。

先决条件

  • Java {jdkminversion} 或更高版本。

流程

  • infinispan-client-hotrod 工件作为依赖项添加到 pom.xml 中,如下所示:
<dependency>
  <groupId>org.infinispan</groupId>
  <artifactId>infinispan-client-hotrod</artifactId>
</dependency>

3.2. 配置 Hot Rod 客户端连接

配置 Hot Rod Java 客户端连接数据网格服务器。

流程

  • 使用 ConfigurationBuilder 类来生成不可变配置对象,您可以传递给 RemoteCacheManager,或使用应用 classpath 上的 hotrod-client.properties 文件。

ConfigurationBuilder

ConfigurationBuilder builder = new ConfigurationBuilder();
builder.addServer()
         .host("127.0.0.1")
         .port(ConfigurationProperties.DEFAULT_HOTROD_PORT)
       .addServer()
         .host("192.0.2.0")
         .port(ConfigurationProperties.DEFAULT_HOTROD_PORT)
       .security().authentication()
         .username("username")
         .password("changeme")
         .realm("default")
         .saslMechanism("SCRAM-SHA-512");
RemoteCacheManager cacheManager = new RemoteCacheManager(builder.build());

hotrod-client.properties

infinispan.client.hotrod.server_list = 127.0.0.1:11222,192.0.2.0:11222
infinispan.client.hotrod.auth_username = username
infinispan.client.hotrod.auth_password = changeme
infinispan.client.hotrod.auth_realm = default
infinispan.client.hotrod.sasl_mechanism = SCRAM-SHA-512

配置 Hot Rod URI

您还可以使用 URI 配置 Hot Rod 客户端连接,如下所示:

ConfigurationBuilder

ConfigurationBuilder builder = new ConfigurationBuilder();
builder.uri("hotrod://username:changeme@127.0.0.1:11222,192.0.2.0:11222?auth_realm=default&sasl_mechanism=SCRAM-SHA-512");
RemoteCacheManager cacheManager = new RemoteCacheManager(builder.build());

hotrod-client.properties

infinispan.client.hotrod.uri = hotrod://username:changeme@127.0.0.1:11222,192.0.2.0:11222?auth_realm=default&sasl_mechanism=SCRAM-SHA-512

在 classpath 之外添加属性

如果 hotrod-client.properties 文件不在应用程序类路径上,则需要指定位置,如下例所示:

ConfigurationBuilder builder = new ConfigurationBuilder();
Properties p = new Properties();
try(Reader r = new FileReader("/path/to/hotrod-client.properties")) {
   p.load(r);
   builder.withProperties(p);
}
RemoteCacheManager cacheManager = new RemoteCacheManager(builder.build());

3.2.1. 在客户端配置中定义 Data Grid 集群

在 Hot Rod 客户端配置中提供 Data Grid 集群的位置。

流程

  • 至少提供一个 Data Grid 集群名称以及至少一个带有 ClusterConfigurationBuilder 类的节点的主机名和端口。

    如果要将集群定义为默认集群,以便客户端始终首先尝试连接到它,然后使用 addServers ("<host_name>:<port>; <host_name>:<port>") 方法定义服务器列表。

多个集群连接

ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder.addCluster("siteA")
               .addClusterNode("hostA1", 11222)
               .addClusterNode("hostA2", 11222)
             .addCluster("siteB")
               .addClusterNodes("hostB1:11222; hostB2:11222");
RemoteCacheManager remoteCacheManager = new RemoteCacheManager(clientBuilder.build());

带有故障转移集群的默认服务器列表

ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder.addServers("hostA1:11222; hostA2:11222")
             .addCluster("siteB")
               .addClusterNodes("hostB1:11222; hostB2:11223");
RemoteCacheManager remoteCacheManager = new RemoteCacheManager(clientBuilder.build());

3.2.2. 手动切换 Data Grid 集群

在 Data Grid 集群间手动切换 Hot Rod Java 客户端连接。

流程

  • RemoteCacheManager 类中调用以下方法之一:

    switchToCluster (clusterName) 切换到客户端配置中定义的特定集群。

    switchToDefaultCluster () 切换到客户端配置中的默认集群,该集群被定义为 Data Grid 服务器列表。

其他资源

3.2.3. 配置连接池

热 Rod Java 客户端保持与数据网格服务器的持久连接池,以重复利用 TCP 连接,而不是在每个请求上创建它们。

流程

  • 配置 Hot Rod 客户端连接池设置,如下例所示:

ConfigurationBuilder

ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder.addServer()
               .host("127.0.0.1")
               .port(11222)
             .connectionPool()
               .maxActive(10)
               .exhaustedAction(ExhaustedAction.valueOf("WAIT"))
               .maxWait(1)
               .minIdle(20)
               .minEvictableIdleTime(300000)
               .maxPendingRequests(20);
RemoteCacheManager remoteCacheManager = new RemoteCacheManager(clientBuilder.build());

hotrod-client.properties

infinispan.client.hotrod.server_list = 127.0.0.1:11222
infinispan.client.hotrod.connection_pool.max_active = 10
infinispan.client.hotrod.connection_pool.exhausted_action = WAIT
infinispan.client.hotrod.connection_pool.max_wait = 1
infinispan.client.hotrod.connection_pool.min_idle = 20
infinispan.client.hotrod.connection_pool.min_evictable_idle_time = 300000
infinispan.client.hotrod.connection_pool.max_pending_requests = 20

这些配置选项提供对池的精细控制。我们建议根据应用程序需求调整池。否则,您可能会增加新连接的延迟,而打开或减少可用连接的吞吐量。

因为负载增加,大型池可能会导致新连接的"完全生效"。但是,它应该有助于吞吐量增加与参与请求的连接。较小的池可以避免这种效果,并消耗较少的资源。但是,它只能在开始降级前处理如此多的请求。

3.3. 为 Hot Rod 客户端配置身份验证机制

数据网格服务器使用不同的机制来验证 Hot Rod 客户端连接。

流程

  • 使用 AuthenticationConfigurationBuilder 类的 saslMechanism () 方法或 infinispan.client.hotrod.sasl_mechanism 属性指定身份验证机制。
警告

授权是每次新连接上执行昂贵的操作。我们强烈建议您对连接池进行调优。

SCRAM

ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder.addServer()
               .host("127.0.0.1")
               .port(11222)
             .security()
               .authentication()
                 .saslMechanism("SCRAM-SHA-512")
                 .username("myuser")
                 .password("qwer1234!");

摘要

ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder.addServer()
               .host("127.0.0.1")
               .port(11222)
             .security()
               .authentication()
                 .saslMechanism("DIGEST-MD5")
                 .username("myuser")
                 .password("qwer1234!");

PLAIN

ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder.addServer()
               .host("127.0.0.1")
               .port(11222)
             .security()
               .authentication()
                 .saslMechanism("PLAIN")
                 .username("myuser")
                 .password("qwer1234!");

OAUTHBEARER

String token = "..."; // Obtain the token from your OAuth2 provider
ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder.addServer()
               .host("127.0.0.1")
               .port(11222)
             .security()
               .authentication()
                 .saslMechanism("OAUTHBEARER")
                 .token(token);

EXTERNAL

ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder
   .addServer()
      .host("127.0.0.1")
      .port(11222)
   .security()
      .ssl()
         // TrustStore stores trusted CA certificates for the server.
         .trustStoreFileName("/path/to/truststore")
         .trustStorePassword("truststorepassword".toCharArray())
         .trustStoreType("PCKS12")
         // KeyStore stores valid client certificates.
         .keyStoreFileName("/path/to/keystore")
         .keyStorePassword("keystorepassword".toCharArray())
         .keyStoreType("PCKS12")
      .authentication()
         .saslMechanism("EXTERNAL");
remoteCacheManager = new RemoteCacheManager(clientBuilder.build());
RemoteCache<String, String> cache = remoteCacheManager.getCache("secured");

GSSAPI

LoginContext lc = new LoginContext("GssExample", new BasicCallbackHandler("krb_user", "krb_password".toCharArray()));
lc.login();
Subject clientSubject = lc.getSubject();

ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder.addServer()
               .host("127.0.0.1")
               .port(11222)
             .security()
               .authentication()
                 .saslMechanism("GSSAPI")
                 .clientSubject(clientSubject)
                 .callbackHandler(new BasicCallbackHandler());

基本回调处理程序

BasicCallbackHandler 如 GSSAPI 示例所示,调用以下回调:

  • NameCallbackPasswordCallback 构造客户端主题。
  • 在 SASL 身份验证过程中调用 AuthorizeCallback
带有令牌回调处理程序的 OAUTHBEARER

使用 TokenCallbackHandler 在 OAuth2 令牌过期前刷新 OAuth2 令牌,如下例所示:

String token = "..."; // Obtain the token from your OAuth2 provider
TokenCallbackHandler tokenHandler = new TokenCallbackHandler(token);
ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder.addServer()
               .host("127.0.0.1")
               .port(11222)
             .security()
               .authentication()
               .saslMechanism("OAUTHBEARER")
               .callbackHandler(tokenHandler);
remoteCacheManager = new RemoteCacheManager(clientBuilder.build());
RemoteCache<String, String> cache = remoteCacheManager.getCache("secured");
// Refresh the token
tokenHandler.setToken("newToken");
自定义 CallbackHandler

热 Rod 客户端设置默认 CallbackHandler,将凭证传递给 SASL 机制。在某些情况下,您可能需要提供自定义 CallbackHandler,如下例所示:

public class MyCallbackHandler implements CallbackHandler {
   final private String username;
   final private char[] password;
   final private String realm;

   public MyCallbackHandler(String username, String realm, char[] password) {
      this.username = username;
      this.password = password;
      this.realm = realm;
   }

   @Override
   public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
      for (Callback callback : callbacks) {
         if (callback instanceof NameCallback) {
            NameCallback nameCallback = (NameCallback) callback;
            nameCallback.setName(username);
         } else if (callback instanceof PasswordCallback) {
            PasswordCallback passwordCallback = (PasswordCallback) callback;
            passwordCallback.setPassword(password);
         } else if (callback instanceof AuthorizeCallback) {
            AuthorizeCallback authorizeCallback = (AuthorizeCallback) callback;
            authorizeCallback.setAuthorized(authorizeCallback.getAuthenticationID().equals(
                  authorizeCallback.getAuthorizationID()));
         } else if (callback instanceof RealmCallback) {
            RealmCallback realmCallback = (RealmCallback) callback;
            realmCallback.setText(realm);
         } else {
            throw new UnsupportedCallbackException(callback);
         }
      }
   }
}

ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder.addServer()
               .host("127.0.0.1")
               .port(11222)
             .security().authentication()
               .serverName("myhotrodserver")
               .saslMechanism("DIGEST-MD5")
               .callbackHandler(new MyCallbackHandler("myuser","default","qwer1234!".toCharArray()));
注意

自定义 CallbackHandler 需要处理特定于您使用的身份验证机制的回调。但是,超出了本文档的范围,为每个可能的回调类型提供示例。

3.3.1. 创建 GSSAPI 登录上下文

要使用 GSSAPI 机制,您必须创建一个 LoginContext,以便您的 Hot Rod 客户端可以获取 Ticket Granting Ticket Granting Ticket Granting Ticket Granting Ticket Granting Ticket (TGT)。

流程

  1. 在登录配置文件中定义登录模块。

    gss.conf

    GssExample {
        com.sun.security.auth.module.Krb5LoginModule required client=TRUE;
    };

    对于 IBM JDK:

    gss-ibm.conf

    GssExample {
        com.ibm.security.auth.module.Krb5LoginModule required client=TRUE;
    };

  2. 设置以下系统属性:

    java.security.auth.login.config=gss.conf
    
    java.security.krb5.conf=/etc/krb5.conf
    注意

    krb5.conf 提供 KDC 的位置。使用 kinit 命令与 Kerberos 进行身份验证并验证 krb5.conf

3.3.2. SASL 身份验证机制

Data Grid 服务器支持使用 Hot Rod 和 Memcached 二进制协议端点的以下 SASL 验证机制:

身份验证机制描述安全域类型相关详情

PLAIN

以纯文本格式使用凭据。您应该只在加密连接中使用 PLAIN 身份验证。

属性域和 LDAP 域

BASIC HTTP 机制类似。

DIGESTJPEG

使用哈希算法和非ce 值。热 Rod 连接器支持 DIGEST-MD 5、DIGEST-SHA -256、DIGEST-SHA-256DIGEST-SHA-384DIGEST-SHA-512 哈希算法,以强度顺序。

属性域和 LDAP 域

Digest HTTP 机制类似。

SCRAM-*

除了哈希算法和非ce 值外,还使用 salt 值。热 Rod 连接器支持 SCRAM-SHASCRAM-SHA-256SCRAM-SHA-384SCRAM-SHA-512 哈希算法(按强度排序)。

属性域和 LDAP 域

Digest HTTP 机制类似。

GSSAPI

使用 Kerberos 票据并需要一个 Kerberos 域控制器。您必须在 realm 配置中添加对应的 kerberos 服务器身份。在大多数情况下,您还指定一个 ldap-realm 来提供用户成员资格信息。

Kerberos realm

SPNEGO HTTP 机制类似。

GS2-KRB5

使用 Kerberos 票据并需要一个 Kerberos 域控制器。您必须在 realm 配置中添加对应的 kerberos 服务器身份。在大多数情况下,您还指定一个 ldap-realm 来提供用户成员资格信息。

Kerberos realm

SPNEGO HTTP 机制类似。

EXTERNAL

使用客户端证书。

信任存储域

CLIENT_CERT HTTP 机制类似。

OAUTHBEARER

使用 OAuth 令牌并需要一个 token-realm 配置。

令牌域

与EARER _TOKEN HTTP 机制类似。

3.4. 配置 Hot Rod 客户端加密

Data Grid Server 可以强制 SSL/TLS 加密,并呈现带有证书的 Hot Rod 客户端,以建立信任并协商安全连接。

要验证向 Data Grid Server 发布的证书,Hot Rod 客户端需要完整的证书链或以 Root CA 开头的部分链。您可以将服务器证书提供给 Hot Rod 客户端作为信任存储。

默认情况下,Hot Rod 客户端还将通过匹配 dNSNameiPAddress 或服务器证书的 subjectAltName 扩展中包含的两个类型来执行主机名验证。

提示

另外,您还可以提供信任存储,您可以使用共享系统证书。

先决条件

  • 创建 Hot Rod 客户端可用于验证数据网格服务器身份的信任存储。
  • 如果您将 Data Grid Server 配置为验证或验证客户端证书,请根据情况创建密钥存储。

流程

  1. 使用 trustStoreFileName ()trustStorePassword () 方法或对应属性将信任存储添加到客户端配置中。
  2. 如果配置客户端证书身份验证,请执行以下操作:

    1. 使用 keyStoreFileName ()keyStorePassword () 方法或对应属性,将密钥存储添加到客户端配置中。
    2. 将客户端配置为使用 EXTERNAL 身份验证机制。

ConfigurationBuilder

ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
clientBuilder
   .addServer()
      .host("127.0.0.1")
      .port(11222)
   .security()
      .ssl()
         // Server SNI hostname.
         .sniHostName("myservername")
         // Keystore that contains the public keys for Data Grid Server.
         // Clients use the trust store to verify Data Grid Server identities.
         .trustStoreFileName("/path/to/server/truststore")
         .trustStorePassword("truststorepassword".toCharArray())
         .trustStoreType("PCKS12")
         // Keystore that contains client certificates.
         // Clients present these certificates to Data Grid Server.
         .keyStoreFileName("/path/to/client/keystore")
         .keyStorePassword("keystorepassword".toCharArray())
         .keyStoreType("PCKS12")
      .authentication()
         // Clients must use the EXTERNAL mechanism for certificate authentication.
         .saslMechanism("EXTERNAL");

hotrod-client.properties

infinispan.client.hotrod.server_list = 127.0.0.1:11222
infinispan.client.hotrod.use_ssl = true
# Use SNI for hostname validation
infinispan.client.hotrod.sni_host_name = myservername
# Keystore that contains the public keys for Data Grid Server.
# Clients use the trust store to verify Data Grid Server identities.
infinispan.client.hotrod.trust_store_file_name = server_truststore.pkcs12
infinispan.client.hotrod.trust_store_password = changeme
infinispan.client.hotrod.trust_store_type = PCKS12
# Keystore that contains client certificates.
# Clients present these certificates to Data Grid Server.
infinispan.client.hotrod.key_store_file_name = client_keystore.pkcs12
infinispan.client.hotrod.key_store_password = changeme
infinispan.client.hotrod.key_store_type = PCKS12
# Clients must use the EXTERNAL mechanism for certificate authentication.
infinispan.client.hotrod.sasl_mechanism = EXTERNAL

后续步骤

将客户端信任存储添加到 $RHDG_HOME/server/conf 目录,并将 Data Grid Server 配置为使用它(如果需要)。

3.5. 启用 Hot Rod 客户端统计信息

热 Rod Java 客户端可以提供包括远程缓存和近缓存命中和未命中的统计数据,以及连接池使用情况。

流程

  1. 打开 Hot Rod Java 客户端配置以进行编辑。
  2. true 设置为 统计 属性的值或调用 statistics ().enable () 方法。
  3. 使用 jmxjmx_domain 属性为您的 Hot Rod 客户端导出 JMX MBeans,或者调用 jmxEnable ()jmxDomain () 方法。
  4. 保存并关闭您的客户端配置。
热 Rod Java 客户端统计信息

ConfigurationBuilder

ConfigurationBuilder builder = new ConfigurationBuilder();
builder.statistics().enable()
         .jmxEnable()
         .jmxDomain("my.domain.org")
       .addServer()
         .host("127.0.0.1")
         .port(11222);
RemoteCacheManager remoteCacheManager = new RemoteCacheManager(builder.build());

hotrod-client.properties

infinispan.client.hotrod.statistics = true
infinispan.client.hotrod.jmx = true
infinispan.client.hotrod.jmx_domain = my.domain.org

3.6. 热 Rod 客户端追踪传播

当您在客户端虚拟机和数据网格服务器上配置 OpenTelemetry 跟踪时,Hot Rod 客户端启用了客户端应用程序和 Data Grid 服务器之间的追踪范围自动关联。

禁用从客户端到 Data Grid 服务器的追踪传播

先决条件

  • 在 Data Grid Server 和 客户端中启用了 OpenTelemetry 追踪。

流程

  • 使用 disableTracingPropagation () 方法禁用 OpenTelemetry tracing propagation。

    import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
    
    ConfigurationBuilder builder = new ConfigurationBuilder();
    builder.addServer()
       .host("127.0.0.1")
       .port(ConfigurationProperties.DEFAULT_HOTROD_PORT)
       .disableTracingPropagation();

    Hot Rod 客户端停止向 Data Grid 服务器发送追踪。

3.7. 接近缓存

接近缓存是 Hot Rod 客户端的本地,并且最近使用的数据,因此每个读取操作都不需要遍历网络,这会显著提高性能。

接近缓存:

  • 使用读取操作填充,调用 get ()getVersioned () 方法。
    在以下示例中,put () 调用不会填充最接近的缓存,且仅在条目已存在时无效:

    cache.put("k1", "v1");
    cache.get("k1");
  • 在 Data Grid Server 上的远程缓存中更新或删除客户端监听程序时,注册客户端侦听器使其无效条目。
    如果在无效后请求条目,客户端必须再次从远程缓存检索它们。
  • 当客户端故障转移到不同的服务器时,将被清除。

绑定的接近缓存

您应该始终通过指定它们可以包含的最大条目数来使用绑定接近缓存。当接近缓存达到最大条目数时,驱除会自动进行,以删除旧的条目。这意味着您不需要手动在客户端 JVM 范围内保留缓存大小。

重要

不要将最大闲置过期时间与接近缓存一起使用,因为 near-cache 读取不会传播条目的最后一次访问时间。

Bloom 过滤器

Bloom 过滤器通过减少无效消息总数来优化写操作的性能。

Bloom 过滤器:

  • 驻留在 Data Grid 服务器上,并跟踪客户端请求的条目。
  • 需要一个连接池配置,每个服务器最多有一个活动连接,并使用 WAIT 耗尽的操作。
  • 无法与未绑定的缓存一起使用。

3.7.1. 配置 Near 缓存

使用接近缓存配置 Hot Rod Java 客户端,以在客户端 JVM 中存储最近使用的数据。

流程

  1. 打开 Hot Rod Java 客户端配置。
  2. 配置每个缓存以使用 nearCacheMode (NearCacheMode.INVALIDATED) 方法执行近缓存。

    注意

    Data Grid 提供全局近近的缓存配置属性。但是,这些属性已弃用,您不应该使用它们,而是针对每个缓存配置接近缓存。

  3. 使用 nearCacheMaxEntries () 方法指定在发生驱除前可以保存的最大条目数。
  4. 使用 nearCacheUseBloomFilter () 方法为近缓存启用 bloom 过滤器。
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.configuration.NearCacheMode;
import org.infinispan.client.hotrod.configuration.ExhaustedAction;

ConfigurationBuilder builder = new ConfigurationBuilder();
builder.addServer()
    .host("127.0.0.1")
    .port(ConfigurationProperties.DEFAULT_HOTROD_PORT)
  .security().authentication()
    .username("username")
    .password("password")
    .realm("default")
    .saslMechanism("SCRAM-SHA-512")
  // Configure the connection pool for bloom filters.
  .connectionPool()
    .maxActive(1)
    .exhaustedAction(ExhaustedAction.WAIT);
// Configure near caching for specific caches
builder.remoteCache("books")
    .nearCacheMode(NearCacheMode.INVALIDATED)
    .nearCacheMaxEntries(100)
    .nearCacheUseBloomFilter(false);
builder.remoteCache("authors")
    .nearCacheMode(NearCacheMode.INVALIDATED)
    .nearCacheMaxEntries(200)
    .nearCacheUseBloomFilter(true);

3.8. 强制返回值

为了避免不必要地发送数据,对远程缓存写入操作会返回 null,而不是之前的值。

例如,以下方法调用不会返回键以前的值:

V remove(Object key);
V put(K key, V value);

但是,您可以更改默认行为,以便您的调用返回之前的密钥的值。

流程

  • 配置 Hot Rod 客户端,以便方法调用以以下一种方式之一返回键以前的值:

FORCE_RETURN_VALUE 标记

cache.withFlags(Flag.FORCE_RETURN_VALUE).put("aKey", "newValue")

per-cache

ConfigurationBuilder builder = new ConfigurationBuilder();
// Return previous values for keys for invocations for a specific cache.
builder.remoteCache("mycache")
       .forceReturnValues(true);

hotrod-client.properties

# Use the "*" wildcard in the cache name to return previous values
# for all caches that start with the "somecaches" string.

infinispan.client.hotrod.cache.somecaches*.force_return_values = true

3.9. 从 Hot Rod 客户端创建远程缓存

使用 Data Grid Hot Rod API 从 Java、C++、.NET/C#、JS 客户端等在 Data Grid Server 上创建远程缓存。

此流程演示了如何使用 Hot Rod Java 客户端在首次访问时创建远程缓存。您可以在 Data Grid Tutorials 中找到其他 Hot Rod 客户端的代码示例。

先决条件

  • 创建具有 admin 权限的 Data Grid 用户。
  • 至少启动一个 Data Grid 服务器实例。
  • 具有 Data Grid 缓存配置。

流程

  • 调用 remoteCache () 方法,作为 ConfigurationBuilder 的一部分。
  • 在 classpath 上的 hotrod-client.properties 文件中设置 configurationconfiguration_uri 属性。

ConfigurationBuilder

File file = new File("path/to/infinispan.xml")
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.remoteCache("another-cache")
       .configuration("<distributed-cache name=\"another-cache\"/>");
builder.remoteCache("my.other.cache")
       .configurationURI(file.toURI());

hotrod-client.properties

infinispan.client.hotrod.cache.another-cache.configuration=<distributed-cache name=\"another-cache\"/>
infinispan.client.hotrod.cache.[my.other.cache].configuration_uri=file:///path/to/infinispan.xml

重要

如果远程缓存的名称包含 . 字符,则必须在使用 hotrod-client.properties 文件时将其放在方括号中。

第 4 章 hot Rod Client API

Data Grid Hot Rod 客户端 API 提供了用于远程创建缓存、处理数据、监控集群缓存拓扑等的接口。

4.1. RemoteCache API

集合方法 keySetentrySet 由远程缓存支持。也就是说,每种方法都调用回 RemoteCache。这很有用,因为它允许准确检索各种键、条目或值,如果用户不希望,则不必将它们全部存储在客户端内存中。

这些集合遵循正在添加的 Map 规格,而不支持 add All,但所有其他方法都被支持。

要注意的一件事是 Iterator.removeSet.removeCollection.remove 方法需要超过 1 个往返操作。您可以检查 RemoteCache Javadoc,以查看这些和其他方法的更多详情。

迭代器使用情况

这些集合的迭代方法在内部使用 retrieveEntries,如下所述。如果您注意到 retrieveEntries 使用批处理大小的参数。无法向迭代器提供此操作。因此,批处理大小可以通过系统属性 infinispan.client.hotrod.batch_size 配置,或者在配置 RemoteCacheManager 时通过 ConfigurationBuilder 配置。

另外,返回的 retrieveEntries iterator 可以被关闭,如来自 keySet,entrySet 返回 AutoCloseable 变体。因此,在使用完这些"Iterator"时,您应该始终关闭这些"Iterator"。

try (CloseableIterator<Map.Entry<K, V>> iterator = remoteCache.entrySet().iterator()) {

      }

如果我想要深度副本而不是后备集合,该怎么办?

之前版本的 RemoteCache 允许检索 keySet 的深度副本。您仍然可以通过新的后备映射进行这一操作,只需自行复制内容。另外,您可以使用 entrySet (在之前不支持)进行此操作。

Set<K> keysCopy = remoteCache.keySet().stream().collect(Collectors.toSet());

4.1.1. 不支持的方法

Data Grid RemoteCache API 不支持 Cache API 中的所有方法,并在调用不支持的方法时抛出 UnsupportedOperationException

大多数方法不适用于远程缓存(如侦听器管理操作),或者对应于本地缓存不支持的方法(例如,containsValue)。

RemoteCache API 不支持从 ConcurrentMap 继承的某些原子操作,例如:

boolean remove(Object key, Object value);
boolean replace(Object key, Object value);
boolean replace(Object key, Object oldValue, Object value);

但是,remoteCache 为这些原子操作提供替代的方法,它们通过网络发送版本标识符,而不是整个值对象。

4.2. 远程 Iterator API

Data Grid 提供了一个远程迭代器 API,用于检索内存资源受限制或者计划进行服务器端过滤或转换的条目。

// Retrieve all entries in batches of 1000
int batchSize = 1000;
try (CloseableIterator<Entry<Object, Object>> iterator = remoteCache.retrieveEntries(null, batchSize)) {
     while(iterator.hasNext()) {
        // Do something
     }
}

// Filter by segment
Set<Integer> segments = ...
try (CloseableIterator<Entry<Object, Object>> iterator = remoteCache.retrieveEntries(null, segments, batchSize)) {
     while(iterator.hasNext()) {
        // Do something
     }
}

// Filter by custom filter
try (CloseableIterator<Entry<Object, Object>> iterator = remoteCache.retrieveEntries("myFilterConverterFactory", segments, batchSize)) {
     while(iterator.hasNext()) {
        // Do something
     }
}

4.2.1. 将自定义过滤器部署到 Data Grid 服务器

将自定义过滤器部署到 Data Grid 服务器实例。

流程

  1. 创建扩展 KeyValueFilterConverterFactory 的工厂。

    import java.io.Serializable;
    
    import org.infinispan.filter.AbstractKeyValueFilterConverter;
    import org.infinispan.filter.KeyValueFilterConverter;
    import org.infinispan.filter.KeyValueFilterConverterFactory;
    import org.infinispan.filter.NamedFactory;
    import org.infinispan.metadata.Metadata;
    
    //@NamedFactory annotation defines the factory name
    @NamedFactory(name = "myFilterConverterFactory")
    public class MyKeyValueFilterConverterFactory implements KeyValueFilterConverterFactory {
    
       @Override
       public KeyValueFilterConverter<String, SampleEntity1, SampleEntity2> getFilterConverter() {
          return new MyKeyValueFilterConverter();
       }
       // Filter implementation. Should be serializable or externalizable for DIST caches
       static class MyKeyValueFilterConverter extends AbstractKeyValueFilterConverter<String, SampleEntity1, SampleEntity2> implements Serializable {
          @Override
          public SampleEntity2 filterAndConvert(String key, SampleEntity1 entity, Metadata metadata) {
             // returning null will case the entry to be filtered out
             // return SampleEntity2 will convert from the cache type SampleEntity1
          }
    
          @Override
          public MediaType format() {
             // returns the MediaType that data should be presented to this converter.
             // When omitted, the server will use "application/x-java-object".
             // Returning null will cause the filter/converter to be done in the storage format.
          }
       }
    }
  2. 创建一个 JAR,其中包含 META-INF/services/org.infinispan.filter.KeyValueFilterConverterFactory 文件。此文件应包含过滤器工厂类实施的完全限定类名称。

    如果过滤器使用自定义键/值类,您必须将它们包含在 JAR 文件中,以便过滤器可以正确地 unmarshall 键和/或值实例。

  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。

4.3. MetadataValue API

MetadataValue 接口用于版本控制操作。

以下示例显示了只有在条目值版本没有改变时才发生的删除操作:

RemoteCacheManager remoteCacheManager = new RemoteCacheManager();
      RemoteCache<String, String> remoteCache = remoteCacheManager.getCache();

      remoteCache.put("car", "ferrari");
      VersionedValue valueBinary = remoteCache.getWithMetadata("car");

      assert remoteCache.remove("car", valueBinary.getVersion());
      assert !remoteCache.containsKey("car");

4.4. streaming API

Data Grid 提供了一个流 API,它实现了返回 InputStreamOutputStream 实例的方法,以便您可以在 Hot Rod 客户端和 Data Grid 服务器之间流传输大型对象。

考虑以下大型对象示例:

StreamingRemoteCache<String> streamingCache = remoteCache.streaming();
OutputStream os = streamingCache.put("a_large_object");
os.write(...);
os.close();

您可以通过流读取对象,如下所示:

StreamingRemoteCache<String> streamingCache = remoteCache.streaming();
InputStream is = streamingCache.get("a_large_object");
for(int b = is.read(); b >= 0; b = is.read()) {
   // iterate
}
is.close();
注意

Streaming API 不会 marshall 值,这意味着您无法同时使用 Streaming 和 Non-Streaming API 访问相同的条目。但是,您可以实施一个自定义 marshaller 来处理这个问题单。

RemoteStreamingCache.get (K key) 方法返回的 InputStream 实现了 VersionedMetadata 接口,以便您可以检索版本和过期信息,如下所示:

StreamingRemoteCache<String> streamingCache = remoteCache.streaming();
InputStream is = streamingCache.get("a_large_object");
long version = ((VersionedMetadata) is).getVersion();
for(int b = is.read(); b >= 0; b = is.read()) {
   // iterate
}
is.close();
注意

条件写入方法(putIfAbsent () )在值完全发送到服务器后执行实际条件检查。换句话说,当在 OutputStream 上调用 close () 方法时。

4.5. 计数器 API

CounterManager 接口是定义、检索和删除计数器的入口点。

热 Rod 客户端可以检索 CounterManager 接口,如下例所示:

// create or obtain your RemoteCacheManager
RemoteCacheManager manager = ...;

// retrieve the CounterManager
CounterManager counterManager = RemoteCounterManagerFactory.asCounterManager(manager);

4.6. 创建事件 Listeners

Java Hot Rod 客户端可以注册监听程序来接收 cache-entry 级别事件。支持创建、修改和删除的事件的缓存条目。

创建客户端监听程序与嵌入式监听程序非常相似,但使用不同的注解和事件类。以下是打印收到的每个事件的客户端监听程序示例:

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener(converterFactoryName = "static-converter")
public class EventPrintListener {

   @ClientCacheEntryCreated
   public void handleCreatedEvent(ClientCacheEntryCreatedEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryModified
   public void handleModifiedEvent(ClientCacheEntryModifiedEvent e) {
      System.out.println(e);
   }

   @ClientCacheEntryRemoved
   public void handleRemovedEvent(ClientCacheEntryRemovedEvent e) {
      System.out.println(e);
   }

}

ClientCacheEntryCreatedEventClientCacheEntryModifiedEvent 实例提供有关受影响密钥的信息,以及条目的版本。此版本可用于在服务器上调用条件操作,如 replaceWithVersionremoveWithVersion

只有 remove 操作成功时才会发送 ClientCacheEntryRemovedEvent 事件。换句话说,如果调用删除操作但没有找到条目,或者不应删除任何条目,则不会生成事件。有兴趣删除的事件的用户(即使没有删除条目)可以开发事件自定义逻辑来生成此类事件。如需更多信息,请参阅自定义 客户端事件部分

所有 ClientCacheEntryCreatedEvent,ClientCacheEntryModifiedEventClientCacheEntryRemovedEvent 事件实例也会提供一个 布尔值 isCommandRetried () 方法,如果因为拓扑更改而需要再次重试的写命令返回 true。这可能是此事件已被重复或另一个事件已被丢弃并替换(例如: ClientCacheEntryModifiedEvent 替换 ClientCacheEntryCreatedEvent)的符号。

创建了客户端侦听器实施后,需要向服务器注册。要做到这一点,请执行:

RemoteCache<?, ?> cache = ...
cache.addClientListener(new EventPrintListener());

4.6.1. 删除事件 Listener

当不需要客户端事件监听程序时,可以删除它:

EventPrintListener listener = ...
cache.removeClientListener(listener);

4.6.2. 过滤事件

为了避免用事件取消客户端,用户可以提供过滤功能来限制服务器为特定客户端侦听器触发的事件数量。要启用过滤,需要创建一个缓存事件过滤器工厂来生成过滤器实例:

import org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory;
import org.infinispan.filter.NamedFactory;

@NamedFactory(name = "static-filter")
public static class StaticCacheEventFilterFactory implements CacheEventFilterFactory {

   @Override
   public StaticCacheEventFilter getFilter(Object[] params) {
      return new StaticCacheEventFilter();
   }
}


// Serializable, Externalizable or marshallable with Infinispan Externalizers
// needed when running in a cluster
class StaticCacheEventFilter implements CacheEventFilter<Integer, String>, Serializable {
   @Override
   public boolean accept(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      if (key.equals(1)) // static key
         return true;

      return false;
   }
}

上面定义的缓存事件过滤器工厂实例会创建过滤器实例,它们静态过滤掉除其键为 1 的所有条目。

为了能够使用此缓存事件过滤器工厂注册侦听器,必须赋予一个唯一的名称,并且 Hot Rod 服务器需要插入名称和缓存事件过滤器工厂实例。

  1. 创建包含过滤器实现的 JAR 文件。

    如果缓存使用自定义键/值类,则必须将它们包含在 JAR 中,以便可以使用正确的 unmarshalled 键和/或值实例来执行回调。如果客户端侦听器启用了 useRawData,则不需要此功能,因为回调键/值实例将以二进制格式提供。

  2. 在 JAR 文件中创建一个 META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory 文件,并在其中编写过滤器类实施的完全限定类名称。
  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。
  4. 通过将工厂名称添加到 @ClientListener 注释,将客户端监听程序链接到此缓存事件过滤器工厂:

    @ClientListener(filterFactoryName = "static-filter")
    public class EventPrintListener { ... }
  5. 使用服务器注册监听程序:

    RemoteCache<?, ?> cache = ...
    cache.addClientListener(new EventPrintListener());

您还可以根据在监听器注册时提供的参数注册动态过滤器实例。过滤器使用过滤器工厂接收的参数启用这个选项,例如:

import org.infinispan.notifications.cachelistener.filter.CacheEventFilterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;

class DynamicCacheEventFilterFactory implements CacheEventFilterFactory {
   @Override
   public CacheEventFilter<Integer, String> getFilter(Object[] params) {
      return new DynamicCacheEventFilter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers
// needed when running in a cluster
class DynamicCacheEventFilter implements CacheEventFilter<Integer, String>, Serializable {
   final Object[] params;

   DynamicCacheEventFilter(Object[] params) {
      this.params = params;
   }

   @Override
   public boolean accept(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      if (key.equals(params[0])) // dynamic key
         return true;

      return false;
   }
}

在注册监听器时,提供了执行过滤所需的动态参数:

RemoteCache<?, ?> cache = ...
cache.addClientListener(new EventPrintListener(), new Object[]{1}, null);
警告

当它们部署到集群中时,过滤实例必须可以被处理,以便过滤可以在生成事件的位置发生,即使即使即使在被注册了监听器的不同节点中也是如此。为了使其可以被编译,可以使它们扩展 SerializableExternalizable 或为它们提供自定义外部工具。

4.6.3. 跳过通知

在调用远程 API 方法来执行操作时,包括 SKIP_LISTENER_NOTIFICATION 标志,而无需从服务器获取事件通知。例如,要在创建或修改值时防止监听程序通知,请设置标志,如下所示:

remoteCache.withFlags(Flag.SKIP_LISTENER_NOTIFICATION).put(1, "one");

4.6.4. 自定义事件

默认情况下生成的事件仅包含足够的信息,以便使事件相关,但可以避免产生太多的信息,以降低发送它们的成本。(可选)事件中提供的信息可以自定义,使其包含更多信息,如值,或者包含较少的信息。此自定义通过 CacheEventConverter Factory 生成的 CacheEventConverterFactory 实例进行:

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;
import org.infinispan.filter.NamedFactory;

@NamedFactory(name = "static-converter")
class StaticConverterFactory implements CacheEventConverterFactory {
   final CacheEventConverter<Integer, String, CustomEvent> staticConverter = new StaticCacheEventConverter();
   public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) {
      return staticConverter;
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers
// needed when running in a cluster
class StaticCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable {
   public CustomEvent convert(Integer key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) {
      return new CustomEvent(key, newValue);
   }
}

// Needs to be Serializable, Externalizable or marshallable with Infinispan Externalizers
// regardless of cluster or local caches
static class CustomEvent implements Serializable {
   final Integer key;
   final String value;
   CustomEvent(Integer key, String value) {
      this.key = key;
      this.value = value;
   }
}

在上例中,转换器生成新的自定义事件,该事件包括值以及事件中的键。与默认事件相比,这会导致更大的事件有效负载,但如果与过滤结合使用,则可能会降低其网络带宽成本。

警告

转换程序的目标类型必须是 SerializableExternalizable。在这个特殊情况下,提供外部程序的转换器默认不起作用,因为默认的 Hot Rod 客户端 marshaller 不支持它们。

处理自定义事件需要略有不同的客户端监听程序实现。要更精确地处理 ClientCacheEntryCustomEvent 实例:

import org.infinispan.client.hotrod.annotation.*;
import org.infinispan.client.hotrod.event.*;

@ClientListener
public class CustomEventPrintListener {

   @ClientCacheEntryCreated
   @ClientCacheEntryModified
   @ClientCacheEntryRemoved
   public void handleCustomEvent(ClientCacheEntryCustomEvent<CustomEvent> e) {
      System.out.println(e);
   }

}

回调中收到的 ClientCacheEntryCustomEvent 通过 getEventData 方法公开自定义事件,getType 方法提供了有关生成的事件的信息,这是缓存条目创建、修改或删除的结果。

与过滤类似,若要使用此转换器工厂注册监听程序,必须授予唯一的名称,并且 Hot Rod 服务器需要插入名称和缓存事件转换器工厂实例。

  1. 创建一个 JAR 文件,其中带有转换器实现。

    如果缓存使用自定义键/值类,则必须将它们包含在 JAR 中,以便可以使用正确的 unmarshalled 键和/或值实例来执行回调。如果客户端侦听器启用了 useRawData,则不需要此功能,因为回调键/值实例将以二进制格式提供。

  2. 在 JAR 文件中创建一个 META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory 文件,并编写转换器类实施的完全限定类名称。
  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。
  4. 通过将工厂名称添加到 @ClientListener 注释,将客户端监听程序与这个转换器工厂连接:

    @ClientListener(converterFactoryName = "static-converter")
    public class CustomEventPrintListener { ... }
  5. 使用服务器注册监听程序:

    RemoteCache<?, ?> cache = ...
    cache.addClientListener(new CustomEventPrintListener());

也可以根据在侦听器注册时提供的参数转换的动态转换器实例。转换器使用转换器接收的参数启用此选项。例如:

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;

@NamedFactory(name = "dynamic-converter")
class DynamicCacheEventConverterFactory implements CacheEventConverterFactory {
   public CacheEventConverter<Integer, String, CustomEvent> getConverter(final Object[] params) {
      return new DynamicCacheEventConverter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster
class DynamicCacheEventConverter implements CacheEventConverter<Integer, String, CustomEvent>, Serializable {
   final Object[] params;

   DynamicCacheEventConverter(Object[] params) {
      this.params = params;
   }

   public CustomEvent convert(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      // If the key matches a key given via parameter, only send the key information
      if (params[0].equals(key))
         return new CustomEvent(key, null);

      return new CustomEvent(key, newValue);
   }
}

在注册监听器时,提供了进行转换所需的动态参数:

RemoteCache<?, ?> cache = ...
cache.addClientListener(new EventPrintListener(), null, new Object[]{1});
警告

当集群部署到集群中时,转换器实例必须可以被处理,因此即使事件被注册了监听程序的不同节点中也会发生转换。为了使其可以被编译,可以使它们扩展 SerializableExternalizable 或为它们提供自定义外部工具。

4.6.5. 过滤和自定义事件

如果要同时进行事件过滤和自定义,可以更轻松地实施 org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverter,它允许过滤和自定义在一个步骤中进行。为方便起见,建议直接扩展 org.infinispan.notifications.cachelistener.filter.AbstractCacheEventFilterConverter,而不是直接实施 org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverter。例如:

import org.infinispan.notifications.cachelistener.filter.CacheEventConverterFactory;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;

@NamedFactory(name = "dynamic-filter-converter")
class DynamicCacheEventFilterConverterFactory implements CacheEventFilterConverterFactory {
   public CacheEventFilterConverter<Integer, String, CustomEvent> getFilterConverter(final Object[] params) {
      return new DynamicCacheEventFilterConverter(params);
   }
}

// Serializable, Externalizable or marshallable with Infinispan Externalizers needed when running in a cluster
//
class DynamicCacheEventFilterConverter extends AbstractCacheEventFilterConverter<Integer, String, CustomEvent>, Serializable {
   final Object[] params;

   DynamicCacheEventFilterConverter(Object[] params) {
      this.params = params;
   }

   public CustomEvent filterAndConvert(Integer key, String oldValue, Metadata oldMetadata,
         String newValue, Metadata newMetadata, EventType eventType) {
      // If the key matches a key given via parameter, only send the key information
      if (params[0].equals(key))
         return new CustomEvent(key, null);

      return new CustomEvent(key, newValue);
   }
}

与过滤器和转换器类似,要能够使用组合的 filter/converter 工厂注册监听程序,工厂必须通过 @NamedFactory 注解指定唯一名称,并且 Hot Rod 服务器需要与名称和缓存事件转换器工厂实例插入。

  1. 创建一个 JAR 文件,其中带有转换器实现。

    如果缓存使用自定义键/值类,则必须将它们包含在 JAR 中,以便可以使用正确的 unmarshalled 键和/或值实例来执行回调。如果客户端侦听器启用了 useRawData,则不需要此功能,因为回调键/值实例将以二进制格式提供。

  2. 在 JAR 文件中创建一个 META-INF/services/org.infinispan.notifications.cachelistener.filter.CacheEventFilterConverterFactory 文件,并编写转换器类实施的完全限定类名称。
  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。

从客户端的角度来看,要使用组合过滤器和转换器类,客户端监听程序必须定义相同的过滤器工厂和转换器工厂名称,例如:

@ClientListener(filterFactoryName = "dynamic-filter-converter", converterFactoryName = "dynamic-filter-converter")
public class CustomEventPrintListener { ... }

当监听器通过 filter 或 converter 参数注册时,会提供上例中的动态参数。如果过滤器参数是非空的,则会使用这些参数,否则会使用转换器参数:

RemoteCache<?, ?> cache = ...
cache.addClientListener(new CustomEventPrintListener(), new Object[]{1}, null);

4.6.6. event Marshalling

热 Rod 服务器可以以不同的格式存储数据,但是尽管如此,Java Hot Rod 客户端用户仍然可以开发在键入的对象上运行的 CacheEventConverterCacheEventFilter 实例。默认情况下,过滤器和转换器将数据用作 POJO (application/x-java-object),但可以通过覆盖 filter/converter 中的方法 格式() 来覆盖所需的格式。如果格式返回 null,则过滤器/转换器将接收存储的数据。

热 Rod Java 客户端可以配置为使用不同的 org.infinispan.commons.marshall.Marshaller 实例。如果这样做和部署 CacheEventConverterCacheEventFilter 实例,则可以使用 Java 对象而不是 marshaller 显示过滤器/转换,服务器需要能够在对象和 marshaller 生成的二进制格式之间进行转换。

要部署 Marshaller 实例服务器端,请按照类似的方法部署 CacheEventConverterCacheEventFilter 实例:

  1. 创建一个 JAR 文件,其中带有转换器实现。
  2. 在 JAR 文件中创建 META-INF/services/org.infinispan.commons.marshall.Marshaller 文件,写入 marshaller 类实施的完全限定类名称。
  3. 将 JAR 文件添加到 Data Grid 服务器安装目录的 server/lib 目录中。

请注意,Marshaller 可以部署到单独的 jar 中,或者在与 CacheEventConverter 和/或 CacheEventFilter 实例相同的 jar 中进行部署。

4.6.6.1. 部署 Protostream Marshallers

如果缓存存储 Protobuf 内容,就像在 Hot Rod 客户端中使用 ProtoStream marshaller 时发生,则不需要部署自定义 marshaller,因为服务器已经支持格式:有 Protobuf 格式到最常见的格式,如 JSON 和 POJO。

在将过滤器/转换器与这些缓存搭配使用时,需要使用带有 Java 对象的 filter/converters 而不是二进制 Protobuf 数据时,需要配置额外的 ProtoStream marshallers,以便服务器可以在过滤/转换前处理数据。要做到这一点,您必须将所需的 SerializationContextInitializer (s) 配置为 Data Grid 服务器配置的一部分。

如需更多信息 ,请参阅缓存编码和 Marshalling

4.6.7. 侦听器状态处理

客户端侦听器注解具有可选的 includeCurrentState 属性,用于指定在添加监听程序时是否将状态发送到客户端,或者是监听器故障转移时。

默认情况下,includeCurrentState 为 false,但如果设置为 true,并且客户端监听程序添加到已包含数据的缓存中,服务器会迭代缓存内容,并将每个条目的事件作为 ClientCacheEntryCreated (如果配置)发送一个 ClientCacheEntryCreated (如果配置了自定义事件)。这允许客户端基于现有内容构建一些本地数据结构。迭代内容后,事件会正常接收,因为接收缓存更新。如果缓存被集群,则整个集群范围的内容都会迭代。

4.6.8. 侦听器故障处理

当 Hot Rod 客户端注册客户端监听程序时,它会在集群的单个节点中执行此操作。如果该节点失败,Java Hot Rod 客户端会检测到透明且在节点中注册的所有监听器失败。

在这种故障切换过程中,客户端可能会错过一些事件。为了避免缺少这些事件,client 侦听器注解包含一个名为 includeCurrentState 的可选参数,如果设为 true,则缓存内容可以迭代,并且生成 ClientCacheEntryCreated 事件(如果配置了自定义事件)。默认情况下,includeCurrentState 设置为 false。

使用回调来处理故障转移事件:

@ClientCacheFailover
public void handleFailover(ClientCacheFailoverEvent e) {
  ...
}

当客户端缓存了一些数据的用例中,这非常有用,因此,考虑到一些事件可能会丢失,它决定在收到事件失败时清除任何本地缓存的数据,了解事件故障转移后,它将收到整个缓存的内容的事件。

4.7. 热 Rod Java 客户端事务

您可以在 JTA Transactions 中配置和使用 Hot Rod 客户端。

要参与事务,Hot Rod 客户端需要与之交互的 TransactionManager,以及它是否通过同步或 XAResource 接口参与事务。https://docs.oracle.com/javaee/7/api/javax/transaction/Synchronization.html

重要

事务在准备阶段获取条目的写锁是最佳的。为了避免数据不一致,请务必阅读有关 使用 Transactions 的冲突

4.7.1. 配置服务器

服务器中的缓存还必须是事务处理,客户端才会参与 JTA 事务

需要以下服务器配置,否则只进行事务回滚:

  • 隔离级别必须是 REPEATABLE_READ
  • 建议使用 PESSIMISTIC 锁定模式,但可以使用 OPTIMISTIC
  • 事务模式应该是 NON_XANON_DURABLE_XA。热 Rod 事务不应使用 FULL_XA,因为它会降低性能。

例如:

<replicated-cache name="hotrodReplTx">
  <locking isolation="REPEATABLE_READ"/>
  <transaction mode="NON_XA" locking="PESSIMISTIC"/>
</replicated-cache>

热 Rod 事务有自己的恢复机制。

4.7.2. 配置 Hot Rod 客户端

事务的 RemoteCache 会根据缓存进行配置。例外是事务的超时,它是全局的,因为单个事务可以与多个 RemoteCaches 交互。

注意

嵌入式 Data Grid 支持 pesimistic locks,但 Hot Rod 客户端不支持。因此,在 Data Grid 服务器中使用 pessimistic locks 获得的事务结果可能与 Hot Rod 客户端获得的结果不同。

以下示例演示了如何为缓存 my-cache 配置事务 RemoteCache

org.infinispan.client.hotrod.configuration.ConfigurationBuilder cb = new org.infinispan.client.hotrod.configuration.ConfigurationBuilder();
//other client configuration parameters
cb.transactionTimeout(1, TimeUnit.MINUTES);
cb.remoteCache("my-cache")
   .transactionManagerLookup(GenericTransactionManagerLookup.getInstance())
   .transactionMode(TransactionMode.NON_XA);

有关配置参数的文档,请参阅配置 Builder 和 RemoteCache ConfigurationBuilder Javadoc。

您还可以使用属性文件配置 Java Hot Rod 客户端,如下例所示:

infinispan.client.hotrod.cache.my-cache.transaction.transaction_manager_lookup = org.infinispan.client.hotrod.transaction.lookup.GenericTransactionManagerLookup
infinispan.client.hotrod.cache.my-cache.transaction.transaction_mode = NON_XA
infinispan.client.hotrod.transaction.timeout = 60000
4.7.2.1. TransactionManagerLookup Interface

TransactionManagerLookup 提供了一个入口点,用于获取 TransactionManager

TransactionManagerLookup 的可用实现:

GenericTransactionManagerLookup
查找类,用于查找在 Java EE 应用服务器中运行的 TransactionManager。如果无法找到 TransactionManager,则默认为 RemoteTransactionManager。这是 Hot Rod Java 客户端的默认设置。
提示

在大多数情况下,GenericTransactionManagerLookup 是合适的。但是,如果您需要集成自定义 TransactionManagerLookup 接口,您可以实现 TransactionManager Lookup 接口。

RemoteTransactionManagerLookup
如果没有其他实施,则基本和易失性 TransactionManager。请注意,这个实现在处理并发事务和恢复时有很大的限制。

4.7.3. 事务模式

TransactionMode 控制 RemoteCache 如何与 TransactionManager 交互。

重要

在 Data Grid 服务器和您的客户端应用程序上配置事务模式。如果客户端试图对非事务缓存执行事务操作,则可能会出现运行时异常。

在 Data Grid 配置和客户端设置中,事务模式都相同。将以下模式与客户端搭配使用,请参阅服务器的 Data Grid 配置模式:

NONE
RemoteCache 不与 TransactionManager 交互。这是默认模式,不是事务处理模式。
NON_XA
RemoteCache 通过 同步TransactionManager 交互。
NON_DURABLE_XA
RemoteCache 通过 XAResourceTransactionManager 交互。恢复功能被禁用。
FULL_XA
RemoteCache 通过 XAResourceTransactionManager 交互。启用恢复功能。调用 XaResource.recover () 方法,以检索要恢复的事务。

4.7.4. 使用事务检测冲突

事务使用键的初始值来检测冲突。

例如,当事务开始时,"k" 的值为 "v"。在准备阶段,事务从服务器获取"k"以读取值。如果值已更改,事务会回滚以避免冲突。

注意

事务使用版本来检测更改,而不是检查值相等。

forceReturnValue 参数控制 RemoteCache 的写入操作,并帮助避免冲突。它有以下值:

  • 如果为 true,则 TransactionManager 在执行写入操作前从服务器获取最新的值。但是,forceReturnValue 参数仅适用于首次访问密钥的操作。
  • 如果为 false,则 TransactionManager 在执行写入操作前不会从服务器获取最新的值。
注意

此参数不会影响 替换 或放置 if Absent条件 写入操作,因为它们需要最新的值。

以下事务提供了一个示例,其中 forceReturnValue 参数可以防止出现冲突的写入操作:

事务 1 (TX1)

RemoteCache<String, String> cache = ...
TransactionManager tm = ...

tm.begin();
cache.put("k", "v1");
tm.commit();

事务 2 (TX2)

RemoteCache<String, String> cache = ...
TransactionManager tm = ...

tm.begin();
cache.put("k", "v2");
tm.commit();

在这个示例中,TX1 和 TX2 并行执行。"k"的初始值为 "v"。

  • 如果 forceReturnValue = true,则 cache.put () 操作从 TX1 和 TX2 的服务器获取"k"的值。首先获取"k"锁定的事务,然后提交。其他事务会在提交阶段回滚,因为事务可以检测到 "k" 的值为 "v"。
  • 如果 forceReturnValue = false,则 cache.put () 操作不会从服务器获取"k"的值并返回 null。TX1 和 TX2 都可以成功提交,这会导致冲突。这是因为事务都无法检测到初始值为 "k" 已更改。

以下事务包括 cache.get () 操作,以便在执行 cache.put () 操作前读取 "k" 的值:

事务 1 (TX1)

RemoteCache<String, String> cache = ...
TransactionManager tm = ...

tm.begin();
cache.get("k");
cache.put("k", "v1");
tm.commit();

事务 2 (TX2)

RemoteCache<String, String> cache = ...
TransactionManager tm = ...

tm.begin();
cache.get("k");
cache.put("k", "v2");
tm.commit();

在前面的示例中,TX1 和 TX2 都是读取密钥,因此 forceReturnValue 参数不会生效。一个事务提交,另一个回滚。但是 cache.get () 操作需要额外的服务器请求。如果您不要求服务器请求的 cache.put () 操作返回值效率低下。

4.7.5. 使用 Configured Transaction Manager 和 Transaction Mode

以下示例演示了如何使用在 RemoteCacheManager 中配置的 TransactionManagerTransactionMode

//Configure the transaction manager and transaction mode.
org.infinispan.client.hotrod.configuration.ConfigurationBuilder cb = new org.infinispan.client.hotrod.configuration.ConfigurationBuilder();
cb.remoteCache("my-cache")
    .transactionManagerLookup(RemoteTransactionManagerLookup.getInstance())
    .transactionMode(TransactionMode.NON_XA);

RemoteCacheManager rcm = new RemoteCacheManager(cb.build());

//The my-cache instance uses the RemoteCacheManager configuration.
RemoteCache<String, String> cache = rcm.getCache("my-cache");

//Return the transaction manager that the cache uses.
TransactionManager tm = cache.getTransactionManager();

//Perform a simple transaction.
tm.begin();
cache.put("k1", "v1");
System.out.println("K1 value is " + cache.get("k1"));
tm.commit();

4.8. 计数器 API

MultimapCacheManager 接口是获取 RemoteMultimapCache 的入口点。

热 Rod 客户端可以检索 MultimapCacheManager 接口,如下例所示:

// create or obtain your RemoteCacheManager
RemoteCacheManager manager = ...;

// retrieve the MultimapCacheManager
MultimapCacheManager multimapCacheManager = RemoteMultimapCacheManagerFactory.from(manager);

// retrieve the RemoteMultimapCache
RemoteMultimapCache<Integer, String> people = multimapCacheManager.get("people");

// add key - values
people.put("coders", "Will");
people.put("coders", "Auri");
people.put("coders", "Pedro");

// retrieve single key with multiple values
Collection<String> coders = people.get("coders").join();

法律通告

Copyright © 2024 Red Hat, Inc.
The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA"). An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.
Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.
Red Hat, Red Hat Enterprise Linux, the Shadowman logo, the Red Hat logo, JBoss, OpenShift, Fedora, the Infinity logo, and RHCE are trademarks of Red Hat, Inc., registered in the United States and other countries.
Linux® is the registered trademark of Linus Torvalds in the United States and other countries.
Java® is a registered trademark of Oracle and/or its affiliates.
XFS® is a trademark of Silicon Graphics International Corp. or its subsidiaries in the United States and/or other countries.
MySQL® is a registered trademark of MySQL AB in the United States, the European Union and other countries.
Node.js® is an official trademark of Joyent. Red Hat is not formally related to or endorsed by the official Joyent Node.js open source or commercial project.
The OpenStack® Word Mark and OpenStack logo are either registered trademarks/service marks or trademarks/service marks of the OpenStack Foundation, in the United States and other countries and are used with the OpenStack Foundation's permission. We are not affiliated with, endorsed or sponsored by the OpenStack Foundation, or the OpenStack community.
All other trademarks are the property of their respective owners.
Red Hat logoGithubRedditYoutube

关于红帽文档

通过我们的产品和服务,以及可以信赖的内容,帮助红帽用户创新并实现他们的目标。

让开源更具包容性

红帽致力于替换我们的代码、文档和 Web 属性中存在问题的语言。欲了解更多详情,请参阅红帽博客.

關於紅帽

我们提供强化的解决方案,使企业能够更轻松地跨平台和环境(从核心数据中心到网络边缘)工作。

© 2024 Red Hat, Inc.