001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.net.URI;
020import java.util.Map;
021import java.util.Set;
022import java.util.concurrent.ThreadPoolExecutor;
023import java.util.concurrent.atomic.AtomicReference;
024
025import org.apache.activemq.broker.region.Destination;
026import org.apache.activemq.broker.region.MessageReference;
027import org.apache.activemq.broker.region.Subscription;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.BrokerId;
030import org.apache.activemq.command.BrokerInfo;
031import org.apache.activemq.command.ConnectionInfo;
032import org.apache.activemq.command.ConsumerControl;
033import org.apache.activemq.command.ConsumerInfo;
034import org.apache.activemq.command.DestinationInfo;
035import org.apache.activemq.command.Message;
036import org.apache.activemq.command.MessageAck;
037import org.apache.activemq.command.MessageDispatch;
038import org.apache.activemq.command.MessageDispatchNotification;
039import org.apache.activemq.command.MessagePull;
040import org.apache.activemq.command.ProducerInfo;
041import org.apache.activemq.command.RemoveSubscriptionInfo;
042import org.apache.activemq.command.Response;
043import org.apache.activemq.command.SessionInfo;
044import org.apache.activemq.command.TransactionId;
045import org.apache.activemq.store.PListStore;
046import org.apache.activemq.thread.Scheduler;
047import org.apache.activemq.usage.Usage;
048
049/**
050 * Like a BrokerFilter but it allows you to switch the getNext().broker. This
051 * has more overhead than a BrokerFilter since access to the getNext().broker
052 * has to synchronized since it is mutable
053 *
054 *
055 */
056public class MutableBrokerFilter implements Broker {
057
058    protected AtomicReference<Broker> next = new AtomicReference<Broker>();
059
060    public MutableBrokerFilter(Broker next) {
061        this.next.set(next);
062    }
063
064    @Override
065    public Broker getAdaptor(Class type) {
066        if (type.isInstance(this)) {
067            return this;
068        }
069        return next.get().getAdaptor(type);
070    }
071
072    public Broker getNext() {
073        return next.get();
074    }
075
076    public void setNext(Broker next) {
077        this.next.set(next);
078    }
079
080    @Override
081    public Map<ActiveMQDestination, Destination> getDestinationMap() {
082        return getNext().getDestinationMap();
083    }
084
085    @Override
086    public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination) {
087        return getNext().getDestinationMap(destination);
088    }
089
090    @Override
091    public Set getDestinations(ActiveMQDestination destination) {
092        return getNext().getDestinations(destination);
093    }
094
095    @Override
096    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
097        getNext().acknowledge(consumerExchange, ack);
098    }
099
100    @Override
101    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
102        getNext().addConnection(context, info);
103    }
104
105    @Override
106    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
107        return getNext().addConsumer(context, info);
108    }
109
110    @Override
111    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
112        getNext().addProducer(context, info);
113    }
114
115    @Override
116    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
117        getNext().commitTransaction(context, xid, onePhase);
118    }
119
120    @Override
121    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
122        getNext().removeSubscription(context, info);
123    }
124
125    @Override
126    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
127        return getNext().getPreparedTransactions(context);
128    }
129
130    @Override
131    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
132        return getNext().prepareTransaction(context, xid);
133    }
134
135    @Override
136    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
137        getNext().removeConnection(context, info, error);
138    }
139
140    @Override
141    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
142        getNext().removeConsumer(context, info);
143    }
144
145    @Override
146    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
147        getNext().removeProducer(context, info);
148    }
149
150    @Override
151    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
152        getNext().rollbackTransaction(context, xid);
153    }
154
155    @Override
156    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
157        getNext().send(producerExchange, messageSend);
158    }
159
160    @Override
161    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
162        getNext().beginTransaction(context, xid);
163    }
164
165    @Override
166    public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
167        getNext().forgetTransaction(context, transactionId);
168    }
169
170    @Override
171    public Connection[] getClients() throws Exception {
172        return getNext().getClients();
173    }
174
175    @Override
176    public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception {
177        return getNext().addDestination(context, destination,createIfTemporary);
178    }
179
180    @Override
181    public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
182        getNext().removeDestination(context, destination, timeout);
183    }
184
185    @Override
186    public ActiveMQDestination[] getDestinations() throws Exception {
187        return getNext().getDestinations();
188    }
189
190    @Override
191    public void start() throws Exception {
192        getNext().start();
193    }
194
195    @Override
196    public void stop() throws Exception {
197        getNext().stop();
198    }
199
200    @Override
201    public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
202        getNext().addSession(context, info);
203    }
204
205    @Override
206    public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
207        getNext().removeSession(context, info);
208    }
209
210    @Override
211    public BrokerId getBrokerId() {
212        return getNext().getBrokerId();
213    }
214
215    @Override
216    public String getBrokerName() {
217        return getNext().getBrokerName();
218    }
219
220    @Override
221    public void gc() {
222        getNext().gc();
223    }
224
225    @Override
226    public void addBroker(Connection connection, BrokerInfo info) {
227        getNext().addBroker(connection, info);
228    }
229
230    @Override
231    public void removeBroker(Connection connection, BrokerInfo info) {
232        getNext().removeBroker(connection, info);
233    }
234
235    @Override
236    public BrokerInfo[] getPeerBrokerInfos() {
237        return getNext().getPeerBrokerInfos();
238    }
239
240    @Override
241    public void preProcessDispatch(MessageDispatch messageDispatch) {
242        getNext().preProcessDispatch(messageDispatch);
243    }
244
245    @Override
246    public void postProcessDispatch(MessageDispatch messageDispatch) {
247        getNext().postProcessDispatch(messageDispatch);
248    }
249
250    @Override
251    public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
252        getNext().processDispatchNotification(messageDispatchNotification);
253    }
254
255    @Override
256    public boolean isStopped() {
257        return getNext().isStopped();
258    }
259
260    @Override
261    public Set<ActiveMQDestination> getDurableDestinations() {
262        return getNext().getDurableDestinations();
263    }
264
265    @Override
266    public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
267        getNext().addDestinationInfo(context, info);
268
269    }
270
271    @Override
272    public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
273        getNext().removeDestinationInfo(context, info);
274
275    }
276
277    @Override
278    public boolean isFaultTolerantConfiguration() {
279        return getNext().isFaultTolerantConfiguration();
280    }
281
282    @Override
283    public ConnectionContext getAdminConnectionContext() {
284        return getNext().getAdminConnectionContext();
285    }
286
287    @Override
288    public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
289        getNext().setAdminConnectionContext(adminConnectionContext);
290    }
291
292    @Override
293    public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
294        return getNext().messagePull(context, pull);
295    }
296
297    @Override
298    public PListStore getTempDataStore() {
299        return getNext().getTempDataStore();
300    }
301
302    @Override
303    public URI getVmConnectorURI() {
304        return getNext().getVmConnectorURI();
305    }
306
307    @Override
308    public void brokerServiceStarted() {
309        getNext().brokerServiceStarted();
310    }
311
312    @Override
313    public BrokerService getBrokerService() {
314        return getNext().getBrokerService();
315    }
316
317    @Override
318    public boolean isExpired(MessageReference messageReference) {
319        return getNext().isExpired(messageReference);
320    }
321
322    @Override
323    public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) {
324        getNext().messageExpired(context, message, subscription);
325    }
326
327    @Override
328    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
329                                         Subscription subscription, Throwable poisonCause) {
330        return getNext().sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
331    }
332
333    @Override
334    public Broker getRoot() {
335        return getNext().getRoot();
336    }
337
338    @Override
339    public long getBrokerSequenceId() {
340        return getNext().getBrokerSequenceId();
341    }
342
343    @Override
344    public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
345        getNext().fastProducer(context, producerInfo, destination);
346    }
347
348    @Override
349    public void isFull(ConnectionContext context,Destination destination, Usage usage) {
350        getNext().isFull(context,destination, usage);
351    }
352
353    @Override
354    public void messageConsumed(ConnectionContext context,MessageReference messageReference) {
355        getNext().messageConsumed(context, messageReference);
356    }
357
358    @Override
359    public void messageDelivered(ConnectionContext context,MessageReference messageReference) {
360        getNext().messageDelivered(context, messageReference);
361    }
362
363    @Override
364    public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
365        getNext().messageDiscarded(context, sub, messageReference);
366    }
367
368    @Override
369    public void slowConsumer(ConnectionContext context, Destination dest, Subscription subs) {
370        getNext().slowConsumer(context, dest,subs);
371    }
372
373    @Override
374    public void nowMasterBroker() {
375       getNext().nowMasterBroker();
376    }
377
378    @Override
379    public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
380            ConsumerControl control) {
381        getNext().processConsumerControl(consumerExchange, control);
382    }
383
384    @Override
385    public void reapplyInterceptor() {
386        getNext().reapplyInterceptor();
387    }
388
389    @Override
390    public Scheduler getScheduler() {
391       return getNext().getScheduler();
392    }
393
394    @Override
395    public ThreadPoolExecutor getExecutor() {
396       return getNext().getExecutor();
397    }
398
399    @Override
400    public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
401        getNext().networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp);
402    }
403
404    @Override
405    public void networkBridgeStopped(BrokerInfo brokerInfo) {
406        getNext().networkBridgeStopped(brokerInfo);
407    }
408}