40.2. 实施端点接口

实施端点的替代方法

支持以下替代端点实现模式:

事件驱动的端点实现

如果您的自定义端点符合事件驱动的模式(请参阅 第 38.1.3 节 “消费者模式和线程”),则通过扩展抽象类 org.apache.camel.impl.DefaultEndpoint 来实现,如 例 40.2 “实施 defaultEndpoint” 所示。

例 40.2. 实施 defaultEndpoint

import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;

public class CustomEndpoint extends DefaultEndpoint { 1

    public CustomEndpoint(String endpointUri, Component component) { 2
        super(endpointUri, component);
        // Do any other initialization...
    }

    public Producer createProducer() throws Exception { 3
        return new CustomProducer(this);
    }

    public Consumer createConsumer(Processor processor) throws Exception { 4
        return new CustomConsumer(this, processor);
    }

    public boolean isSingleton() {
        return true;
    }

    // Implement the following methods, only if you need to set exchange properties.
    //
    public Exchange createExchange() { 5
        return this.createExchange(getExchangePattern());
    }

    public Exchange createExchange(ExchangePattern pattern) {
        Exchange result = new DefaultExchange(getCamelContext(), pattern);
        // Set exchange properties
        ...
        return result;
    }
}
1
通过扩展 DefaultEndpoint 类,实施事件驱动的自定义端点 CustomEndpoint
2
您必须至少有一个构造器,用于将端点 URI、endpointUri 和父 组件 引用 作为参数。
3
实施 createProducer () factory 方法来创建制作者端点。
4
实施 createConsumer () factory 方法来创建事件驱动的消费者实例。
5
通常,不需要 覆盖 createExchange () 方法。默认情况下,从 DefaultEndpoint 继承的实现会创建一个 DefaultExchange 对象,它可用于任何 Apache Camel 组件。但是,如果您需要在 DefaultExchange 对象中初始化一些交换属性,则适合在此处覆盖 createExchange () 方法,以添加 Exchange 属性设置。
重要

不要 覆盖 createPollingConsumer () 方法。

DefaultEndpoint 类提供以下方法的默认实现,在编写自定义端点代码时可能会很有用:

  • getEndpointUri () 方式-确保端点 URI.
  • getCamelContext () WWN-将引用返回对 CamelContext 的参考。
  • getComponent () WWN-sandboxedReturns 对父组件的引用。
  • 创建PollingConsumer () 方式-确保创建 polling consumer。创建的轮询消费者的功能基于事件驱动的消费者。如果您覆盖事件驱动的消费者方法,createConsumer (),则可获得轮询消费者实施。
  • 为这个端点所需的类型,createExchange (Exchange e ) )将给定交换对象 everts 设置为此端点所需的类型。此方法使用覆盖的 createExchange () 端点创建新端点。这可确保方法可用于自定义交换类型。

调度的轮询端点实现

如果您的自定义端点符合调度的轮询模式(请参阅 第 38.1.3 节 “消费者模式和线程”),则通过继承抽象类 org.apache.camel.impl.ScheduledPollEndpoint 来实现。例 40.3 “ScheduledPollEndpoint 实现”

例 40.3. ScheduledPollEndpoint 实现

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.ScheduledPollEndpoint;

public class CustomEndpoint extends ScheduledPollEndpoint {  1

    protected CustomEndpoint(String endpointUri, CustomComponent component) { 2
        super(endpointUri, component);
        // Do any other initialization...
    }

    public Producer createProducer() throws Exception { 3
        Producer result = new CustomProducer(this);
        return result;
    }

    public Consumer createConsumer(Processor processor) throws Exception { 4
        Consumer result = new CustomConsumer(this, processor);
        configureConsumer(result); 5
        return result;
    }

    public boolean isSingleton() {
        return true;
    }

    // Implement the following methods, only if you need to set exchange properties.
    //
    public Exchange createExchange() { 6
        return this.createExchange(getExchangePattern());
    }

    public Exchange createExchange(ExchangePattern pattern) {
        Exchange result = new DefaultExchange(getCamelContext(), pattern);
        // Set exchange properties
        ...
        return result;
    }
}
1
通过扩展 ScheduledPollEndpoint 类,实施调度的 poll 自定义端点 CustomEndpoint
2
您必须至少有一个构造器将端点 URI、endpointUri 和父组件引用(组件引用)用作参数。
3
实施 createProducer () factory 方法来创建制作者端点。
4
实施 createConsumer () factory 方法来创建调度的轮询使用者实例。
5
configureConsumer () 方法在 ScheduledPollEndpoint 基础类中定义,负责将消费者查询选项注入到消费者中。请参阅 “consumer 参数注入”一节
6
通常,不需要 覆盖 createExchange () 方法。默认情况下,从 DefaultEndpoint 继承的实现会创建一个 DefaultExchange 对象,它可用于任何 Apache Camel 组件。但是,如果您需要在 DefaultExchange 对象中初始化一些交换属性,则适合在此处覆盖 createExchange () 方法,以添加 Exchange 属性设置。
重要

不要 覆盖 createPollingConsumer () 方法。

轮询端点实施

如果您的自定义端点符合轮询消费者模式(请参阅 第 38.1.3 节 “消费者模式和线程”),则通过继承抽象类 org.apache.camel.impl.DefaultPollingEndpoint 来实现,如 例 40.4 “DefaultPollingEndpoint 实现” 所示。

例 40.4. DefaultPollingEndpoint 实现

import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultPollingEndpoint;

public class CustomEndpoint extends DefaultPollingEndpoint {
    ...
    public PollingConsumer createPollingConsumer() throws Exception {
        PollingConsumer result = new CustomConsumer(this);
        configureConsumer(result);
        return result;
    }

    // Do NOT implement createConsumer(). It is already implemented in DefaultPollingEndpoint.
    ...
}

由于此 CustomEndpoint 类是轮询端点,您必须实施 createPollingConsumer () 方法,而不是 createConsumer () 方法。从 createPollingConsumer () 返回的使用者实例必须从轮询Consumer 接口继承。有关如何实施轮询消费者的详情,请参考 “轮询消费者实施”一节

除了实施 createPollingConsumer () 方法外,实施 DefaultPollingEndpoint 的步骤也类似于实施 ScheduledPollEndpoint 的步骤。详情请查看 例 40.3 “ScheduledPollEndpoint 实现”

实施 BrowsableEndpoint 接口

如果要公开当前端点中待处理的交换实例列表,您可以实现 org.apache.camel.spi.BrowsableEndpoint 接口,如 例 40.5 “BrowsableEndpoint Interface” 所示。如果端点执行某种形式的传入事件,则实施此接口有意义。例如,Apache Camel SEDA 端点实施 BrowsableEndpoint interface©-strategy-see 例 40.6 “SedaEndpoint 实现”

例 40.5. BrowsableEndpoint Interface

package org.apache.camel.spi;

import java.util.List;

import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;

public interface BrowsableEndpoint extends Endpoint {
    List<Exchange> getExchanges();
}

Example

例 40.6 “SedaEndpoint 实现” 显示 SedaEndpoint 的示例实现。SEDA 端点是 事件驱动的端点 示例。传入的事件存储在 FIFO 队列中( java.util.concurrent.BlockingQueue)和一个 SEDA 消费者启动线程来读取和处理事件。事件本身由 org.apache.camel.Exchange 对象表示。

例 40.6. SedaEndpoint 实现

package org.apache.camel.component.seda;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.BrowsableEndpoint;

public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint { 1
    private BlockingQueue<Exchange> queue;

    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) { 2
        super(endpointUri, component);
        this.queue = queue;
    }

    public SedaEndpoint(String uri, SedaComponent component, Map parameters) { 3
        this(uri, component, component.createQueue(uri, parameters));
    }

    public Producer createProducer() throws Exception { 4
        return new CollectionProducer(this, getQueue());
    }

    public Consumer createConsumer(Processor processor) throws Exception { 5
        return new SedaConsumer(this, processor);
    }

    public BlockingQueue<Exchange> getQueue() { 6
        return queue;
    }

    public boolean isSingleton() { 7
        return true;
    }

    public List<Exchange> getExchanges() { 8
        return new ArrayList<Exchange> getQueue());
    }
}
1
SedaEndpoint 类遵循通过扩展 DefaultEndpoint 类来实施事件驱动的端点的模式。SedaEndpoint 类还实现了 BrowsableEndpoint 接口,后者提供对队列中交换对象列表的访问。
2
遵循事件驱动的消费者的常规模式,SedaEndpoint 定义了采用端点参数、endpointUri 和组件引用参数的构造器。
3
提供了另一个构造器,将队列创建委托给父组件实例。
4
createProducer () factory 方法创建一个 CollectionProducer 实例,这是向队列添加事件的制作者实施。
5
createConsumer () factory 方法创建一个 SedaConsumer 实例,它负责从队列拉取事件并进行处理。
6
getQueue () 方法返回对队列的引用。
7
isSingleton () 方法返回 true,表明应为每个唯一 URI 字符串创建一个端点实例。
8
getExchanges () 方法实现 BrowsableEndpoint 中对应的抽象方法。