40.2. 实施端点接口

实现端点的替代方法

支持以下替代端点实现模式:

事件驱动的端点实现

如果您的自定义端点符合事件驱动的模式(请参阅 第 38.1.3 节 “消费者模式和线程”),它通过扩展抽象类来实现,则 org.apache.camel.impl.DefaultEndpoint,如 例 40.2 “实施 DefaultEndpoint” 所示。

例 40.2. 实施 DefaultEndpoint

import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;

public class CustomEndpoint extends DefaultEndpoint { 1

    public CustomEndpoint(String endpointUri, Component component) { 2
        super(endpointUri, component);
        // Do any other initialization...
    }

    public Producer createProducer() throws Exception { 3
        return new CustomProducer(this);
    }

    public Consumer createConsumer(Processor processor) throws Exception { 4
        return new CustomConsumer(this, processor);
    }

    public boolean isSingleton() {
        return true;
    }

    // Implement the following methods, only if you need to set exchange properties.
    //
    public Exchange createExchange() { 5
        return this.createExchange(getExchangePattern());
    }

    public Exchange createExchange(ExchangePattern pattern) {
        Exchange result = new DefaultExchange(getCamelContext(), pattern);
        // Set exchange properties
        ...
        return result;
    }
}
1
通过扩展 DefaultEndpoint 类来实施事件驱动的自定义端点 CustomEndpoint
2
您必须至少有一个构造器使用端点 URI、endpointUri 和父组件引用、组件 作为参数。
3
实施 createProducer() factory 方法来创建制作者端点。
4
实施 createConsumer() 工厂方法来创建事件驱动的消费者实例。
5
通常,不需要 覆盖 createExchange() 方法。从 DefaultEndpoint 继承的实现会默认创建一个 DefaultExchange 对象,它可以用于任何 Apache Camel 组件。然而,如果您需要在 DefaultExchange 对象中初始化一些交换属性,则最好覆盖此处的 createExchange() 方法来添加 Exchange 属性设置。
重要

不要 覆盖 createPollingConsumer() 方法。

DefaultEndpoint 类提供以下方法的默认实现,您可以在编写自定义端点代码时发现这些实施可能很有用:

  • getEndpointUri() the endpoint URI。
  • getCamelContext() 进行对 CamelContext 的引用。
  • getComponent() 3.10.0-- the return returns 引用父组件。
  • 创建PollingConsumer()&gt;_<-对称Creates 轮询消费者。创建的轮询消费者功能取决于事件驱动的消费者。如果覆盖事件驱动的消费者方法,请创建Consumer(),您可以获得轮询消费者的实施。
  • CreateExchange(Exchange e) the given exchange 对象 e to the given exchange 对象 e,指向该端点所需的类型。此方法使用覆盖的 createExchange() 端点来创建新的端点。这可确保该方法也可用于自定义交换类型。

调度轮询端点实现

如果您的自定义端点符合调度的轮询模式(请参阅 第 38.1.3 节 “消费者模式和线程”),它通过从抽象类继承来实现(请参阅 例 40.3 “ScheduledPollEndpoint 实现”,如 所示。

例 40.3. ScheduledPollEndpoint 实现

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.ScheduledPollEndpoint;

public class CustomEndpoint extends ScheduledPollEndpoint {  1

    protected CustomEndpoint(String endpointUri, CustomComponent component) { 2
        super(endpointUri, component);
        // Do any other initialization...
    }

    public Producer createProducer() throws Exception { 3
        Producer result = new CustomProducer(this);
        return result;
    }

    public Consumer createConsumer(Processor processor) throws Exception { 4
        Consumer result = new CustomConsumer(this, processor);
        configureConsumer(result); 5
        return result;
    }

    public boolean isSingleton() {
        return true;
    }

    // Implement the following methods, only if you need to set exchange properties.
    //
    public Exchange createExchange() { 6
        return this.createExchange(getExchangePattern());
    }

    public Exchange createExchange(ExchangePattern pattern) {
        Exchange result = new DefaultExchange(getCamelContext(), pattern);
        // Set exchange properties
        ...
        return result;
    }
}
1
通过扩展 ScheduledPollEndpoint 类来实施调度的轮询自定义端点 CustomEndpoint
2
您必须至少有一个构造器使用端点 URI、endpointUri 和父组件引用、组件 作为参数。
3
实施 createProducer() factory 方法来创建制作者端点。
4
实施 createConsumer() 工厂方法来创建调度的轮询消费者实例。
5
ScheduledPollEndpoint 基本类中定义的 configureConsumer() 方法负责将使用者查询选项注入使用者。请参阅 “consumer 参数注入”一节
6
通常,不需要 覆盖 createExchange() 方法。从 DefaultEndpoint 继承的实现会默认创建一个 DefaultExchange 对象,它可以用于任何 Apache Camel 组件。然而,如果您需要在 DefaultExchange 对象中初始化一些交换属性,则最好覆盖此处的 createExchange() 方法来添加 Exchange 属性设置。
重要

不要 覆盖 createPollingConsumer() 方法。

轮询端点实现

如果您的自定义端点遵循轮询消费者模式(请参阅 第 38.1.3 节 “消费者模式和线程”),它通过从抽象类继承来实现的,则 org.apache.camel.impl.DefaultPollingEndpoint,如 例 40.4 “DefaultPollingEndpoint Implementation” 所示。

例 40.4. DefaultPollingEndpoint Implementation

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultPollingEndpoint;

public class CustomEndpoint extends DefaultPollingEndpoint {
    ...
    public PollingConsumer createPollingConsumer() throws Exception {
        PollingConsumer result = new CustomConsumer(this);
        configureConsumer(result);
        return result;
    }

    // Do NOT implement createConsumer(). It is already implemented in DefaultPollingEndpoint.
    ...
}

由于此 CustomEndpoint 类是一个轮询端点,您必须实施 createPollingConsumer() 方法,而不是 createConsumer() 方法。从 createPollingConsumer() 返回的消费者实例必须从 PollingConsumer 接口继承。有关如何实施轮询消费者的详情,请参考 “轮询消费者实施”一节

除了 createPollingConsumer() 方法的实施步骤外,实施 DefaultPollingEndpoint 的步骤与实施 ScheduledPollEndpoint 的步骤类似。详情请查看 例 40.3 “ScheduledPollEndpoint 实现”

实施 BrowsableEndpoint 接口

如果要公开当前端点中待处理的交换实例列表,您可以实现 org.apache.camel.spi.BrowsableEndpoint 接口,如 例 40.5 “BrowsableEndpoint 接口” 所示。如果端点执行某种形式的传入事件,则需要实施这个接口。例如,Apache Camel SEDA 端点实施 BrowsableEndpoint 接口>_<-KUBECONFIGsee 例 40.6 “SedaEndpoint 实施”

例 40.5. BrowsableEndpoint 接口

package org.apache.camel.spi;

import java.util.List;

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;

public interface BrowsableEndpoint extends Endpoint {
    List<Exchange> getExchanges();
}

示例

例 40.6 “SedaEndpoint 实施” 显示 SedaEndpoint 的示例实现。SEDA 端点是一个由 事件驱动的端点的示例。传入的事件存储在 FIFO 队列中( java.util.concurrent.BlockingQueue的实例)和 SEDA 使用者启动线程以读取和处理事件。事件本身由 org.apache.camel.Exchange 对象表示。

例 40.6. SedaEndpoint 实施

package org.apache.camel.component.seda;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.BrowsableEndpoint;

public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint { 1
    private BlockingQueue<Exchange> queue;

    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) { 2
        super(endpointUri, component);
        this.queue = queue;
    }

    public SedaEndpoint(String uri, SedaComponent component, Map parameters) { 3
        this(uri, component, component.createQueue(uri, parameters));
    }

    public Producer createProducer() throws Exception { 4
        return new CollectionProducer(this, getQueue());
    }

    public Consumer createConsumer(Processor processor) throws Exception { 5
        return new SedaConsumer(this, processor);
    }

    public BlockingQueue<Exchange> getQueue() { 6
        return queue;
    }

    public boolean isSingleton() { 7
        return true;
    }

    public List<Exchange> getExchanges() { 8
        return new ArrayList<Exchange> getQueue());
    }
}
1
SedaEndpoint 类遵循通过扩展 DefaultEndpoint 类来实施事件驱动的端点的模式。SedaEndpoint 类还实施 BrowsableEndpoint 接口,它提供对队列中交换对象列表的访问。
2
按照事件驱动的消费者的常用模式,SedaEndpoint 定义了一个使用端点参数、端点Uri 以及组件引用参数 的构造器。
3
提供了另一个构造器,它将队列创建委派给父组件实例。
4
createProducer() factory 方法创建 CollectionProducer 的实例,这是向队列添加事件的生产者实现。
5
createConsumer() 工厂方法创建 SedaConsumer 的实例,它负责从队列拉取事件并进行处理。
6
getQueue() 方法返回对队列的引用。
7
isSingleton() 方法返回 true,表示应为每个唯一 URI 字符串创建一个端点实例。
8
getExchanges() 方法从 BrowsableEndpoint 实现了对应的抽象方法。