001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.activemq.camel.component; 019 020import java.util.Set; 021 022import javax.annotation.PostConstruct; 023 024import org.apache.activemq.advisory.DestinationEvent; 025import org.apache.activemq.advisory.DestinationListener; 026import org.apache.activemq.advisory.DestinationSource; 027import org.apache.activemq.command.ActiveMQDestination; 028import org.apache.activemq.command.ActiveMQQueue; 029import org.apache.activemq.command.ActiveMQTopic; 030import org.apache.camel.CamelContext; 031import org.apache.camel.CamelContextAware; 032import org.apache.camel.Endpoint; 033import org.apache.camel.component.jms.JmsEndpoint; 034import org.apache.camel.component.jms.JmsQueueEndpoint; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A helper bean which populates a {@link CamelContext} with ActiveMQ Queue endpoints 040 * 041 * @org.apache.xbean.XBean 042 */ 043public class CamelEndpointLoader implements CamelContextAware { 044 private static final transient Logger LOG = LoggerFactory.getLogger(CamelEndpointLoader.class); 045 private CamelContext camelContext; 046 private ActiveMQComponent component; 047 DestinationSource source; 048 049 public CamelEndpointLoader() { 050 } 051 052 public CamelEndpointLoader(CamelContext camelContext, DestinationSource source) { 053 this.camelContext = camelContext; 054 this.source = source; 055 } 056 057 /** 058 * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions 059 * 060 * delegates to afterPropertiesSet, done to prevent backwards incompatible signature change 061 * 062 * fix: AMQ-4676 063 */ 064 @PostConstruct 065 private void postConstruct() { 066 try { 067 afterPropertiesSet(); 068 } catch (Exception ex) { 069 throw new RuntimeException(ex); 070 } 071 } 072 073 /** 074 * 075 * @throws Exception 076 * @org.apache.xbean.InitMethod 077 */ 078 public void afterPropertiesSet() throws Exception { 079 if (source != null) { 080 source.setDestinationListener(new DestinationListener() { 081 @Override 082 public void onDestinationEvent(DestinationEvent event) { 083 try { 084 ActiveMQDestination destination = event.getDestination(); 085 if (destination instanceof ActiveMQQueue) { 086 ActiveMQQueue queue = (ActiveMQQueue) destination; 087 if (event.isAddOperation()) { 088 addQueue(queue); 089 } else { 090 removeQueue(queue); 091 } 092 } else if (destination instanceof ActiveMQTopic) { 093 ActiveMQTopic topic = (ActiveMQTopic) destination; 094 if (event.isAddOperation()) { 095 addTopic(topic); 096 } else { 097 removeTopic(topic); 098 } 099 } 100 } catch (Exception e) { 101 LOG.warn("Caught: " + e, e); 102 } 103 } 104 }); 105 106 Set<ActiveMQQueue> queues = source.getQueues(); 107 for (ActiveMQQueue queue : queues) { 108 addQueue(queue); 109 } 110 111 Set<ActiveMQTopic> topics = source.getTopics(); 112 for (ActiveMQTopic topic : topics) { 113 addTopic(topic); 114 } 115 } 116 } 117 118 // Properties 119 //------------------------------------------------------------------------- 120 @Override 121 public CamelContext getCamelContext() { 122 return camelContext; 123 } 124 125 @Override 126 public void setCamelContext(CamelContext camelContext) { 127 this.camelContext = camelContext; 128 } 129 130 public ActiveMQComponent getComponent() { 131 if (component == null) { 132 component = camelContext.getComponent("activemq", ActiveMQComponent.class); 133 } 134 return component; 135 } 136 137 public void setComponent(ActiveMQComponent component) { 138 this.component = component; 139 } 140 141 // Implementation methods 142 //------------------------------------------------------------------------- 143 144 protected void addQueue(ActiveMQQueue queue) throws Exception { 145 String queueUri = getQueueUri(queue); 146 ActiveMQComponent jmsComponent = getComponent(); 147 Endpoint endpoint = new JmsQueueEndpoint(queueUri, jmsComponent, queue.getPhysicalName(), jmsComponent.getConfiguration()); 148 camelContext.addEndpoint(queueUri, endpoint); 149 } 150 151 protected String getQueueUri(ActiveMQQueue queue) { 152 return "activemq:" + queue.getPhysicalName(); 153 } 154 155 protected void removeQueue(ActiveMQQueue queue) throws Exception { 156 String queueUri = getQueueUri(queue); 157 // lur cache of endpoints so they will disappear in time 158 // this feature needs a new component api - list available endpoints 159 camelContext.removeEndpoints(queueUri); 160 } 161 162 protected void addTopic(ActiveMQTopic topic) throws Exception { 163 String topicUri = getTopicUri(topic); 164 ActiveMQComponent jmsComponent = getComponent(); 165 Endpoint endpoint = new JmsEndpoint(topicUri, jmsComponent, topic.getPhysicalName(), true, jmsComponent.getConfiguration()); 166 camelContext.addEndpoint(topicUri, endpoint); 167 } 168 169 protected String getTopicUri(ActiveMQTopic topic) { 170 return "activemq:topic:" + topic.getPhysicalName(); 171 } 172 173 protected void removeTopic(ActiveMQTopic topic) throws Exception { 174 String topicUri = getTopicUri(topic); 175 // lur cache of endpoints so they will disappear in time 176 // this feature needs a new component api - list available endpoints 177 camelContext.removeEndpoints(topicUri); 178 } 179}