001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.net.URI; 020import java.util.Map; 021import java.util.Set; 022import java.util.concurrent.ThreadPoolExecutor; 023 024import org.apache.activemq.broker.region.Destination; 025import org.apache.activemq.broker.region.MessageReference; 026import org.apache.activemq.broker.region.Subscription; 027import org.apache.activemq.command.ActiveMQDestination; 028import org.apache.activemq.command.BrokerId; 029import org.apache.activemq.command.BrokerInfo; 030import org.apache.activemq.command.ConnectionInfo; 031import org.apache.activemq.command.ConsumerControl; 032import org.apache.activemq.command.ConsumerInfo; 033import org.apache.activemq.command.DestinationInfo; 034import org.apache.activemq.command.Message; 035import org.apache.activemq.command.MessageAck; 036import org.apache.activemq.command.MessageDispatch; 037import org.apache.activemq.command.MessageDispatchNotification; 038import org.apache.activemq.command.MessagePull; 039import org.apache.activemq.command.ProducerInfo; 040import org.apache.activemq.command.RemoveSubscriptionInfo; 041import org.apache.activemq.command.Response; 042import org.apache.activemq.command.SessionInfo; 043import org.apache.activemq.command.TransactionId; 044import org.apache.activemq.store.PListStore; 045import org.apache.activemq.thread.Scheduler; 046import org.apache.activemq.usage.Usage; 047 048/** 049 * Allows you to intercept broker operation so that features such as security 050 * can be implemented as a pluggable filter. 051 * 052 * 053 */ 054public class BrokerFilter implements Broker { 055 056 protected final Broker next; 057 058 public BrokerFilter(Broker next) { 059 this.next = next; 060 } 061 062 @Override 063 public Broker getAdaptor(Class type) { 064 if (type.isInstance(this)) { 065 return this; 066 } 067 return next.getAdaptor(type); 068 } 069 070 @Override 071 public Map<ActiveMQDestination, Destination> getDestinationMap() { 072 return next.getDestinationMap(); 073 } 074 075 @Override 076 public Map<ActiveMQDestination, Destination> getDestinationMap(ActiveMQDestination destination) { 077 return next.getDestinationMap(destination); 078 } 079 080 @Override 081 public Set <Destination>getDestinations(ActiveMQDestination destination) { 082 return next.getDestinations(destination); 083 } 084 085 @Override 086 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { 087 next.acknowledge(consumerExchange, ack); 088 } 089 090 @Override 091 public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { 092 return next.messagePull(context, pull); 093 } 094 095 @Override 096 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 097 next.addConnection(context, info); 098 } 099 100 @Override 101 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 102 return next.addConsumer(context, info); 103 } 104 105 @Override 106 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 107 next.addProducer(context, info); 108 } 109 110 @Override 111 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { 112 next.commitTransaction(context, xid, onePhase); 113 } 114 115 @Override 116 public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { 117 next.removeSubscription(context, info); 118 } 119 120 @Override 121 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { 122 return next.getPreparedTransactions(context); 123 } 124 125 @Override 126 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { 127 return next.prepareTransaction(context, xid); 128 } 129 130 @Override 131 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 132 next.removeConnection(context, info, error); 133 } 134 135 @Override 136 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 137 next.removeConsumer(context, info); 138 } 139 140 @Override 141 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 142 next.removeProducer(context, info); 143 } 144 145 @Override 146 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { 147 next.rollbackTransaction(context, xid); 148 } 149 150 @Override 151 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { 152 next.send(producerExchange, messageSend); 153 } 154 155 @Override 156 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { 157 next.beginTransaction(context, xid); 158 } 159 160 @Override 161 public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { 162 next.forgetTransaction(context, transactionId); 163 } 164 165 @Override 166 public Connection[] getClients() throws Exception { 167 return next.getClients(); 168 } 169 170 @Override 171 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean createIfTemporary) throws Exception { 172 return next.addDestination(context, destination,createIfTemporary); 173 } 174 175 @Override 176 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { 177 next.removeDestination(context, destination, timeout); 178 } 179 180 @Override 181 public ActiveMQDestination[] getDestinations() throws Exception { 182 return next.getDestinations(); 183 } 184 185 @Override 186 public void start() throws Exception { 187 next.start(); 188 } 189 190 @Override 191 public void stop() throws Exception { 192 next.stop(); 193 } 194 195 @Override 196 public void addSession(ConnectionContext context, SessionInfo info) throws Exception { 197 next.addSession(context, info); 198 } 199 200 @Override 201 public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { 202 next.removeSession(context, info); 203 } 204 205 @Override 206 public BrokerId getBrokerId() { 207 return next.getBrokerId(); 208 } 209 210 @Override 211 public String getBrokerName() { 212 return next.getBrokerName(); 213 } 214 215 @Override 216 public void gc() { 217 next.gc(); 218 } 219 220 @Override 221 public void addBroker(Connection connection, BrokerInfo info) { 222 next.addBroker(connection, info); 223 } 224 225 @Override 226 public void removeBroker(Connection connection, BrokerInfo info) { 227 next.removeBroker(connection, info); 228 } 229 230 @Override 231 public BrokerInfo[] getPeerBrokerInfos() { 232 return next.getPeerBrokerInfos(); 233 } 234 235 @Override 236 public void preProcessDispatch(MessageDispatch messageDispatch) { 237 next.preProcessDispatch(messageDispatch); 238 } 239 240 @Override 241 public void postProcessDispatch(MessageDispatch messageDispatch) { 242 next.postProcessDispatch(messageDispatch); 243 } 244 245 @Override 246 public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { 247 next.processDispatchNotification(messageDispatchNotification); 248 } 249 250 @Override 251 public boolean isStopped() { 252 return next.isStopped(); 253 } 254 255 @Override 256 public Set<ActiveMQDestination> getDurableDestinations() { 257 return next.getDurableDestinations(); 258 } 259 260 @Override 261 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 262 next.addDestinationInfo(context, info); 263 } 264 265 @Override 266 public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { 267 next.removeDestinationInfo(context, info); 268 } 269 270 @Override 271 public boolean isFaultTolerantConfiguration() { 272 return next.isFaultTolerantConfiguration(); 273 } 274 275 @Override 276 public ConnectionContext getAdminConnectionContext() { 277 return next.getAdminConnectionContext(); 278 } 279 280 @Override 281 public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { 282 next.setAdminConnectionContext(adminConnectionContext); 283 } 284 285 @Override 286 public PListStore getTempDataStore() { 287 return next.getTempDataStore(); 288 } 289 290 @Override 291 public URI getVmConnectorURI() { 292 return next.getVmConnectorURI(); 293 } 294 295 @Override 296 public void brokerServiceStarted() { 297 next.brokerServiceStarted(); 298 } 299 300 @Override 301 public BrokerService getBrokerService() { 302 return next.getBrokerService(); 303 } 304 305 @Override 306 public boolean isExpired(MessageReference messageReference) { 307 return next.isExpired(messageReference); 308 } 309 310 @Override 311 public void messageExpired(ConnectionContext context, MessageReference message, Subscription subscription) { 312 next.messageExpired(context, message, subscription); 313 } 314 315 @Override 316 public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, 317 Subscription subscription, Throwable poisonCause) { 318 return next.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); 319 } 320 321 @Override 322 public Broker getRoot() { 323 return next.getRoot(); 324 } 325 326 @Override 327 public long getBrokerSequenceId() { 328 return next.getBrokerSequenceId(); 329 } 330 331 332 @Override 333 public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) { 334 next.fastProducer(context, producerInfo, destination); 335 } 336 337 @Override 338 public void isFull(ConnectionContext context,Destination destination, Usage usage) { 339 next.isFull(context,destination, usage); 340 } 341 342 @Override 343 public void messageConsumed(ConnectionContext context,MessageReference messageReference) { 344 next.messageConsumed(context, messageReference); 345 } 346 347 @Override 348 public void messageDelivered(ConnectionContext context,MessageReference messageReference) { 349 next.messageDelivered(context, messageReference); 350 } 351 352 @Override 353 public void messageDiscarded(ConnectionContext context,Subscription sub, MessageReference messageReference) { 354 next.messageDiscarded(context, sub, messageReference); 355 } 356 357 @Override 358 public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) { 359 next.slowConsumer(context, destination,subs); 360 } 361 362 @Override 363 public void nowMasterBroker() { 364 next.nowMasterBroker(); 365 } 366 367 @Override 368 public void processConsumerControl(ConsumerBrokerExchange consumerExchange, 369 ConsumerControl control) { 370 next.processConsumerControl(consumerExchange, control); 371 } 372 373 @Override 374 public void reapplyInterceptor() { 375 next.reapplyInterceptor(); 376 } 377 378 @Override 379 public Scheduler getScheduler() { 380 return next.getScheduler(); 381 } 382 383 @Override 384 public ThreadPoolExecutor getExecutor() { 385 return next.getExecutor(); 386 } 387 388 @Override 389 public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) { 390 next.networkBridgeStarted(brokerInfo, createdByDuplex, remoteIp); 391 } 392 393 @Override 394 public void networkBridgeStopped(BrokerInfo brokerInfo) { 395 next.networkBridgeStopped(brokerInfo); 396 } 397}