001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.camel.component.broker;
018
019import java.util.List;
020import java.util.concurrent.CopyOnWriteArrayList;
021
022import org.apache.activemq.broker.ProducerBrokerExchange;
023import org.apache.activemq.broker.inteceptor.MessageInterceptor;
024import org.apache.activemq.broker.inteceptor.MessageInterceptorRegistry;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.Message;
027import org.apache.camel.Consumer;
028import org.apache.camel.MultipleConsumersSupport;
029import org.apache.camel.Processor;
030import org.apache.camel.Producer;
031import org.apache.camel.Service;
032import org.apache.camel.api.management.ManagedResource;
033import org.apache.camel.impl.DefaultEndpoint;
034import org.apache.camel.spi.Metadata;
035import org.apache.camel.spi.UriEndpoint;
036import org.apache.camel.spi.UriParam;
037import org.apache.camel.spi.UriPath;
038import org.apache.camel.util.UnsafeUriCharactersEncoder;
039
040@ManagedResource(description = "Managed Camel Broker Endpoint")
041@UriEndpoint(scheme = "broker", syntax = "broker:destination", consumerClass = BrokerConsumer.class, title = "Broker", label = "messaging")
042public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, Service {
043
044    static final String PRODUCER_BROKER_EXCHANGE = "producerBrokerExchange";
045
046    private MessageInterceptorRegistry messageInterceptorRegistry;
047    private List<MessageInterceptor> messageInterceptorList = new CopyOnWriteArrayList<MessageInterceptor>();
048
049    @UriPath(name = "destination") @Metadata(required = "true")
050    private String destinationName;
051    private final ActiveMQDestination destination;
052    @UriParam
053    private final BrokerConfiguration configuration;
054
055    public BrokerEndpoint(String uri, BrokerComponent component, String destinationName, ActiveMQDestination destination, BrokerConfiguration configuration) {
056        super(UnsafeUriCharactersEncoder.encode(uri), component);
057        this.destinationName = destinationName;
058        this.destination = destination;
059        this.configuration = configuration;
060    }
061
062    @Override
063    public Producer createProducer() throws Exception {
064        BrokerProducer producer = new BrokerProducer(this);
065        return producer;
066    }
067
068    @Override
069    public Consumer createConsumer(Processor processor) throws Exception {
070        BrokerConsumer consumer = new BrokerConsumer(this, processor);
071        configureConsumer(consumer);
072        return consumer;
073    }
074
075    @Override
076    public boolean isSingleton() {
077        return false;
078    }
079
080    @Override
081    public boolean isMultipleConsumersSupported() {
082        return true;
083    }
084
085    public ActiveMQDestination getDestination() {
086        return destination;
087    }
088
089    /**
090     * The name of the JMS destination
091     */
092    public String getDestinationName() {
093        return destinationName;
094    }
095
096    @Override
097    protected void doStart() throws Exception {
098        super.doStart();
099        messageInterceptorRegistry = MessageInterceptorRegistry.getInstance().get(configuration.getBrokerName());
100        for (MessageInterceptor messageInterceptor : messageInterceptorList) {
101            addMessageInterceptor(messageInterceptor);
102        }
103        messageInterceptorList.clear();
104    }
105
106    @Override
107    protected void doStop() throws Exception {
108        super.doStop();
109    }
110
111    protected void addMessageInterceptor(MessageInterceptor messageInterceptor) {
112        if (isStarted()) {
113            messageInterceptorRegistry.addMessageInterceptor(destination, messageInterceptor);
114        } else {
115            messageInterceptorList.add(messageInterceptor);
116        }
117    }
118
119    protected void removeMessageInterceptor(MessageInterceptor messageInterceptor) {
120        messageInterceptorRegistry.removeMessageInterceptor(destination, messageInterceptor);
121    }
122
123    protected void inject(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
124        ProducerBrokerExchange pbe = producerBrokerExchange;
125        if (message != null) {
126            message.setDestination(destination);
127            if (producerBrokerExchange != null && producerBrokerExchange.getRegionDestination() != null){
128                if (!producerBrokerExchange.getRegionDestination().getActiveMQDestination().equals(destination)){
129                     //The message broker will create a new ProducerBrokerExchange with the
130                     //correct region broker set
131                     pbe = null;
132                }
133            }
134
135            messageInterceptorRegistry.injectMessage(pbe, message);
136        }
137    }
138}