001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.camel.camelplugin;
019
020import org.apache.activemq.broker.Broker;
021import org.apache.activemq.broker.BrokerContext;
022import org.apache.activemq.broker.BrokerFilter;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.ConsumerBrokerExchange;
025import org.apache.activemq.broker.ProducerBrokerExchange;
026import org.apache.activemq.broker.region.Destination;
027import org.apache.activemq.broker.region.MessageReference;
028import org.apache.activemq.broker.region.Subscription;
029import org.apache.activemq.command.ConsumerControl;
030import org.apache.activemq.command.Message;
031import org.apache.activemq.command.MessageAck;
032import org.apache.activemq.command.MessageDispatch;
033import org.apache.activemq.command.MessagePull;
034import org.apache.activemq.command.Response;
035import org.apache.activemq.command.TransactionId;
036import org.apache.activemq.spring.Utils;
037import org.apache.activemq.usage.Usage;
038import org.apache.camel.impl.DefaultCamelContext;
039import org.apache.camel.model.RouteDefinition;
040import org.apache.camel.model.RoutesDefinition;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043import org.springframework.core.io.Resource;
044
045import java.io.File;
046import java.io.InputStream;
047import java.util.List;
048import java.util.concurrent.CountDownLatch;
049
050/**
051 * A StatisticsBroker You can retrieve a Map Message for a Destination - or
052 * Broker containing statistics as key-value pairs The message must contain a
053 * replyTo Destination - else its ignored
054 *
055 */
056public class CamelRoutesBroker extends BrokerFilter {
057    private static Logger LOG = LoggerFactory.getLogger(CamelRoutesBroker.class);
058    private String routesFile = "";
059    private int checkPeriod = 1000;
060    private Resource theRoutes;
061    private DefaultCamelContext camelContext;
062    private long lastRoutesModified = -1;
063    private CountDownLatch countDownLatch;
064
065    /**
066     * Overide methods to pause the broker whilst camel routes are loaded
067     */
068    @Override
069    public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
070        blockWhileLoadingCamelRoutes();
071        super.send(producerExchange, message);
072    }
073
074    @Override
075    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
076        blockWhileLoadingCamelRoutes();
077        super.acknowledge(consumerExchange, ack);
078    }
079
080    @Override
081    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
082        blockWhileLoadingCamelRoutes();
083        return super.messagePull(context, pull);
084    }
085
086    @Override
087    public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
088        blockWhileLoadingCamelRoutes();
089        super.processConsumerControl(consumerExchange, control);
090    }
091
092    @Override
093    public void reapplyInterceptor() {
094        blockWhileLoadingCamelRoutes();
095        super.reapplyInterceptor();
096    }
097
098    @Override
099    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
100        blockWhileLoadingCamelRoutes();
101        super.beginTransaction(context, xid);
102    }
103
104    @Override
105    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
106        blockWhileLoadingCamelRoutes();
107        return super.prepareTransaction(context, xid);
108    }
109
110    @Override
111    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
112        blockWhileLoadingCamelRoutes();
113        super.rollbackTransaction(context, xid);
114    }
115
116    @Override
117    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
118        blockWhileLoadingCamelRoutes();
119        super.commitTransaction(context, xid, onePhase);
120    }
121
122    @Override
123    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
124        blockWhileLoadingCamelRoutes();
125        super.forgetTransaction(context, transactionId);
126    }
127
128    @Override
129    public void preProcessDispatch(MessageDispatch messageDispatch) {
130        blockWhileLoadingCamelRoutes();
131        super.preProcessDispatch(messageDispatch);
132    }
133
134    @Override
135    public void postProcessDispatch(MessageDispatch messageDispatch) {
136        blockWhileLoadingCamelRoutes();
137        super.postProcessDispatch(messageDispatch);
138    }
139
140    @Override
141    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
142        blockWhileLoadingCamelRoutes();
143        return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
144    }
145
146    @Override
147    public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
148        blockWhileLoadingCamelRoutes();
149        super.messageConsumed(context, messageReference);
150    }
151
152    @Override
153    public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
154        blockWhileLoadingCamelRoutes();
155        super.messageDelivered(context, messageReference);
156    }
157
158    @Override
159    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
160        blockWhileLoadingCamelRoutes();
161        super.messageDiscarded(context, sub, messageReference);
162    }
163
164    @Override
165    public void isFull(ConnectionContext context, Destination destination, Usage usage) {
166        blockWhileLoadingCamelRoutes();
167        super.isFull(context, destination, usage);
168    }
169
170    @Override
171    public void nowMasterBroker() {
172        blockWhileLoadingCamelRoutes();
173        super.nowMasterBroker();
174    }
175
176    /*
177     * Properties
178     */
179
180    public String getRoutesFile() {
181        return routesFile;
182    }
183
184    public void setRoutesFile(String routesFile) {
185        this.routesFile = routesFile;
186    }
187
188    public int getCheckPeriod() {
189        return checkPeriod;
190    }
191
192    public void setCheckPeriod(int checkPeriod) {
193        this.checkPeriod = checkPeriod;
194    }
195
196    public CamelRoutesBroker(Broker next) {
197        super(next);
198    }
199
200    @Override
201    public void start() throws Exception {
202        super.start();
203        LOG.info("Starting CamelRoutesBroker");
204
205        camelContext = new DefaultCamelContext();
206        camelContext.setName("EmbeddedCamel-" + getBrokerName());
207        camelContext.start();
208
209        getBrokerService().getScheduler().executePeriodically(new Runnable() {
210            @Override
211            public void run() {
212                try {
213                    loadCamelRoutes();
214                } catch (Throwable e) {
215                    LOG.error("Failed to load Camel Routes", e);
216                }
217
218            }
219        }, getCheckPeriod());
220    }
221
222
223
224    @Override
225    public void stop() throws Exception {
226        CountDownLatch latch = this.countDownLatch;
227        if (latch != null){
228            latch.countDown();
229        }
230        if (camelContext != null){
231            camelContext.stop();
232        }
233        super.stop();
234    }
235
236    private void loadCamelRoutes() throws Exception{
237        if (theRoutes == null) {
238            String fileToUse = getRoutesFile();
239            if (fileToUse == null || fileToUse.trim().isEmpty()) {
240                BrokerContext brokerContext = getBrokerService().getBrokerContext();
241                if (brokerContext != null) {
242                    String uri = brokerContext.getConfigurationUrl();
243                    Resource resource = Utils.resourceFromString(uri);
244                    if (resource.exists()) {
245                        fileToUse = resource.getFile().getParent();
246                        fileToUse += File.separator;
247                        fileToUse += "routes.xml";
248                    }
249                }
250            }
251            if (fileToUse != null && !fileToUse.isEmpty()){
252                theRoutes = Utils.resourceFromString(fileToUse);
253                setRoutesFile(theRoutes.getFile().getAbsolutePath());
254            }
255        }
256        if (!isStopped() && camelContext != null && theRoutes != null && theRoutes.exists()){
257            long lastModified = theRoutes.lastModified();
258            if (lastModified != lastRoutesModified){
259                CountDownLatch latch = new CountDownLatch(1);
260                this.countDownLatch = latch;
261                lastRoutesModified = lastModified;
262
263                List<RouteDefinition> currentRoutes = camelContext.getRouteDefinitions();
264                for (RouteDefinition rd:currentRoutes){
265                    camelContext.stopRoute(rd);
266                    camelContext.removeRouteDefinition(rd);
267                }
268                InputStream is = theRoutes.getInputStream();
269                RoutesDefinition routesDefinition = camelContext.loadRoutesDefinition(is);
270
271                for (RouteDefinition rd: routesDefinition.getRoutes()){
272                    camelContext.startRoute(rd);
273                }
274                is.close();
275                latch.countDown();
276                this.countDownLatch=null;
277            }
278
279
280        }
281    }
282
283    private void blockWhileLoadingCamelRoutes(){
284        CountDownLatch latch = this.countDownLatch;
285        if (latch != null){
286            try {
287                latch.await();
288            } catch (InterruptedException e) {
289                Thread.currentThread().interrupt();
290            }
291        }
292    }
293
294}