001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.camel.component.broker;
018
019import org.apache.activemq.broker.ProducerBrokerExchange;
020import org.apache.activemq.broker.inteceptor.MessageInterceptor;
021import org.apache.activemq.command.Message;
022import org.apache.camel.Endpoint;
023import org.apache.camel.Exchange;
024import org.apache.camel.ExchangePattern;
025import org.apache.camel.Processor;
026import org.apache.camel.component.jms.JmsBinding;
027import org.apache.camel.impl.DefaultConsumer;
028
029public class BrokerConsumer extends DefaultConsumer implements MessageInterceptor {
030    private final JmsBinding jmsBinding = new JmsBinding();
031
032    public BrokerConsumer(Endpoint endpoint, Processor processor) {
033        super(endpoint, processor);
034    }
035
036    @Override
037    protected void doStart() throws Exception {
038        super.doStart();
039        ((BrokerEndpoint) getEndpoint()).addMessageInterceptor(this);
040    }
041
042    @Override
043    protected void doStop() throws Exception {
044        ((BrokerEndpoint) getEndpoint()).removeMessageInterceptor(this);
045        super.doStop();
046    }
047
048    @Override
049    public void intercept(ProducerBrokerExchange producerExchange, Message message) {
050        Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly);
051
052        exchange.setIn(new BrokerJmsMessage((javax.jms.Message) message, jmsBinding));
053        exchange.setProperty(Exchange.BINDING, jmsBinding);
054        exchange.setProperty(BrokerEndpoint.PRODUCER_BROKER_EXCHANGE, producerExchange);
055        try {
056            getProcessor().process(exchange);
057        } catch (Exception e) {
058            exchange.setException(e);
059        }
060
061        if (exchange.getException() != null) {
062            getExceptionHandler().handleException("Error processing intercepted message: " + message, exchange, exchange.getException());
063        }
064    }
065
066}