001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.camel.component.broker; 018 019import java.util.List; 020import java.util.concurrent.CopyOnWriteArrayList; 021 022import org.apache.activemq.broker.ProducerBrokerExchange; 023import org.apache.activemq.broker.inteceptor.MessageInterceptor; 024import org.apache.activemq.broker.inteceptor.MessageInterceptorRegistry; 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.Message; 027import org.apache.camel.Consumer; 028import org.apache.camel.MultipleConsumersSupport; 029import org.apache.camel.Processor; 030import org.apache.camel.Producer; 031import org.apache.camel.Service; 032import org.apache.camel.api.management.ManagedResource; 033import org.apache.camel.impl.DefaultEndpoint; 034import org.apache.camel.spi.Metadata; 035import org.apache.camel.spi.UriEndpoint; 036import org.apache.camel.spi.UriParam; 037import org.apache.camel.spi.UriPath; 038import org.apache.camel.util.UnsafeUriCharactersEncoder; 039 040@ManagedResource(description = "Managed Camel Broker Endpoint") 041@UriEndpoint(scheme = "broker", syntax = "broker:destination", consumerClass = BrokerConsumer.class, title = "Broker", label = "messaging") 042public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, Service { 043 044 static final String PRODUCER_BROKER_EXCHANGE = "producerBrokerExchange"; 045 046 private MessageInterceptorRegistry messageInterceptorRegistry; 047 private List<MessageInterceptor> messageInterceptorList = new CopyOnWriteArrayList<MessageInterceptor>(); 048 049 @UriPath(name = "destination") @Metadata(required = "true") 050 private String destinationName; 051 private final ActiveMQDestination destination; 052 @UriParam 053 private final BrokerConfiguration configuration; 054 055 public BrokerEndpoint(String uri, BrokerComponent component, String destinationName, ActiveMQDestination destination, BrokerConfiguration configuration) { 056 super(UnsafeUriCharactersEncoder.encode(uri), component); 057 this.destinationName = destinationName; 058 this.destination = destination; 059 this.configuration = configuration; 060 } 061 062 @Override 063 public Producer createProducer() throws Exception { 064 BrokerProducer producer = new BrokerProducer(this); 065 return producer; 066 } 067 068 @Override 069 public Consumer createConsumer(Processor processor) throws Exception { 070 BrokerConsumer consumer = new BrokerConsumer(this, processor); 071 configureConsumer(consumer); 072 return consumer; 073 } 074 075 @Override 076 public boolean isSingleton() { 077 return false; 078 } 079 080 @Override 081 public boolean isMultipleConsumersSupported() { 082 return true; 083 } 084 085 public ActiveMQDestination getDestination() { 086 return destination; 087 } 088 089 /** 090 * The name of the JMS destination 091 */ 092 public String getDestinationName() { 093 return destinationName; 094 } 095 096 @Override 097 protected void doStart() throws Exception { 098 super.doStart(); 099 messageInterceptorRegistry = MessageInterceptorRegistry.getInstance().get(configuration.getBrokerName()); 100 for (MessageInterceptor messageInterceptor : messageInterceptorList) { 101 addMessageInterceptor(messageInterceptor); 102 } 103 messageInterceptorList.clear(); 104 } 105 106 @Override 107 protected void doStop() throws Exception { 108 super.doStop(); 109 } 110 111 protected void addMessageInterceptor(MessageInterceptor messageInterceptor) { 112 if (isStarted()) { 113 messageInterceptorRegistry.addMessageInterceptor(destination, messageInterceptor); 114 } else { 115 messageInterceptorList.add(messageInterceptor); 116 } 117 } 118 119 protected void removeMessageInterceptor(MessageInterceptor messageInterceptor) { 120 messageInterceptorRegistry.removeMessageInterceptor(destination, messageInterceptor); 121 } 122 123 protected void inject(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception { 124 ProducerBrokerExchange pbe = producerBrokerExchange; 125 if (message != null) { 126 message.setDestination(destination); 127 if (producerBrokerExchange != null && producerBrokerExchange.getRegionDestination() != null){ 128 if (!producerBrokerExchange.getRegionDestination().getActiveMQDestination().equals(destination)){ 129 //The message broker will create a new ProducerBrokerExchange with the 130 //correct region broker set 131 pbe = null; 132 } 133 } 134 135 messageInterceptorRegistry.injectMessage(pbe, message); 136 } 137 } 138}