001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements.  See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.activemq.camel.component;
019
020import java.util.Set;
021
022import javax.annotation.PostConstruct;
023
024import org.apache.activemq.advisory.DestinationEvent;
025import org.apache.activemq.advisory.DestinationListener;
026import org.apache.activemq.advisory.DestinationSource;
027import org.apache.activemq.command.ActiveMQDestination;
028import org.apache.activemq.command.ActiveMQQueue;
029import org.apache.activemq.command.ActiveMQTopic;
030import org.apache.camel.CamelContext;
031import org.apache.camel.CamelContextAware;
032import org.apache.camel.Endpoint;
033import org.apache.camel.component.jms.JmsEndpoint;
034import org.apache.camel.component.jms.JmsQueueEndpoint;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A helper bean which populates a {@link CamelContext} with ActiveMQ Queue endpoints
040 *
041 * @org.apache.xbean.XBean
042 */
043public class CamelEndpointLoader implements CamelContextAware {
044    private static final transient Logger LOG = LoggerFactory.getLogger(CamelEndpointLoader.class);
045    private CamelContext camelContext;
046    private ActiveMQComponent component;
047    DestinationSource source;
048
049    public CamelEndpointLoader() {
050    }
051
052    public CamelEndpointLoader(CamelContext camelContext, DestinationSource source) {
053        this.camelContext = camelContext;
054        this.source = source;
055    }
056
057    /**
058     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
059     *
060     * delegates to afterPropertiesSet, done to prevent backwards incompatible signature change
061     *
062     * fix: AMQ-4676
063     */
064    @PostConstruct
065    private void postConstruct() {
066        try {
067            afterPropertiesSet();
068        } catch (Exception ex) {
069            throw new RuntimeException(ex);
070        }
071    }
072
073    /**
074     *
075     * @throws Exception
076     * @org.apache.xbean.InitMethod
077     */
078    public void afterPropertiesSet() throws Exception {
079        if (source != null) {
080            source.setDestinationListener(new DestinationListener() {
081                @Override
082                public void onDestinationEvent(DestinationEvent event) {
083                    try {
084                        ActiveMQDestination destination = event.getDestination();
085                        if (destination instanceof ActiveMQQueue) {
086                            ActiveMQQueue queue = (ActiveMQQueue) destination;
087                            if (event.isAddOperation()) {
088                                addQueue(queue);
089                            } else {
090                                removeQueue(queue);
091                            }
092                        } else if (destination instanceof ActiveMQTopic) {
093                            ActiveMQTopic topic = (ActiveMQTopic) destination;
094                            if (event.isAddOperation()) {
095                                addTopic(topic);
096                            } else {
097                                removeTopic(topic);
098                            }
099                        }
100                    } catch (Exception e) {
101                        LOG.warn("Caught: " + e, e);
102                    }
103                }
104            });
105
106            Set<ActiveMQQueue> queues = source.getQueues();
107            for (ActiveMQQueue queue : queues) {
108                addQueue(queue);
109            }
110
111            Set<ActiveMQTopic> topics = source.getTopics();
112            for (ActiveMQTopic topic : topics) {
113                addTopic(topic);
114            }
115        }
116    }
117
118    // Properties
119    //-------------------------------------------------------------------------
120    @Override
121    public CamelContext getCamelContext() {
122        return camelContext;
123    }
124
125    @Override
126    public void setCamelContext(CamelContext camelContext) {
127        this.camelContext = camelContext;
128    }
129
130    public ActiveMQComponent getComponent() {
131        if (component == null) {
132            component = camelContext.getComponent("activemq", ActiveMQComponent.class);
133        }
134        return component;
135    }
136
137    public void setComponent(ActiveMQComponent component) {
138        this.component = component;
139    }
140
141    // Implementation methods
142    //-------------------------------------------------------------------------
143
144    protected void addQueue(ActiveMQQueue queue) throws Exception {
145        String queueUri = getQueueUri(queue);
146        ActiveMQComponent jmsComponent = getComponent();
147        Endpoint endpoint = new JmsQueueEndpoint(queueUri, jmsComponent, queue.getPhysicalName(), jmsComponent.getConfiguration());
148        camelContext.addEndpoint(queueUri, endpoint);
149    }
150
151    protected String getQueueUri(ActiveMQQueue queue) {
152        return "activemq:" + queue.getPhysicalName();
153    }
154
155    protected void removeQueue(ActiveMQQueue queue) throws Exception {
156        String queueUri = getQueueUri(queue);
157        // lur cache of endpoints so they will disappear in time
158        // this feature needs a new component api - list available endpoints
159        camelContext.removeEndpoints(queueUri);
160    }
161
162    protected void addTopic(ActiveMQTopic topic) throws Exception {
163        String topicUri = getTopicUri(topic);
164        ActiveMQComponent jmsComponent = getComponent();
165        Endpoint endpoint = new JmsEndpoint(topicUri, jmsComponent, topic.getPhysicalName(), true, jmsComponent.getConfiguration());
166        camelContext.addEndpoint(topicUri, endpoint);
167    }
168
169    protected String getTopicUri(ActiveMQTopic topic) {
170        return "activemq:topic:" + topic.getPhysicalName();
171    }
172
173    protected void removeTopic(ActiveMQTopic topic) throws Exception {
174        String topicUri = getTopicUri(topic);
175        // lur cache of endpoints so they will disappear in time
176        // this feature needs a new component api - list available endpoints
177        camelContext.removeEndpoints(topicUri);
178    }
179}