001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.virtual; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.Iterator; 022import java.util.List; 023import java.util.Set; 024 025import org.apache.activemq.broker.Broker; 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.broker.ProducerBrokerExchange; 028import org.apache.activemq.broker.region.Destination; 029import org.apache.activemq.broker.region.DestinationFilter; 030import org.apache.activemq.broker.region.DestinationInterceptor; 031import org.apache.activemq.command.ActiveMQDestination; 032import org.apache.activemq.command.Message; 033import org.apache.activemq.filter.DestinationMap; 034 035/** 036 * Implements <a 037 * href="http://activemq.apache.org/virtual-destinations.html">Virtual 038 * Topics</a>. 039 * 040 * @org.apache.xbean.XBean 041 * 042 */ 043public class VirtualDestinationInterceptor implements DestinationInterceptor { 044 045 private DestinationMap destinationMap = new DestinationMap(); 046 private DestinationMap mappedDestinationMap = new DestinationMap(); 047 048 private VirtualDestination[] virtualDestinations; 049 050 @Override 051 public Destination intercept(Destination destination) { 052 final ActiveMQDestination activeMQDestination = destination.getActiveMQDestination(); 053 Set matchingDestinations = destinationMap.get(activeMQDestination); 054 List<Destination> destinations = new ArrayList<Destination>(); 055 for (Iterator iter = matchingDestinations.iterator(); iter.hasNext();) { 056 VirtualDestination virtualDestination = (VirtualDestination) iter.next(); 057 Destination newDestination = virtualDestination.intercept(destination); 058 destinations.add(newDestination); 059 } 060 if (!destinations.isEmpty()) { 061 if (destinations.size() == 1) { 062 return destinations.get(0); 063 } else { 064 // should rarely be used but here just in case 065 return createCompositeDestination(destination, destinations); 066 } 067 } 068 // check if the destination instead matches any mapped destinations 069 Set mappedDestinations = mappedDestinationMap.get(activeMQDestination); 070 assert mappedDestinations.size() < 2; 071 if (!mappedDestinations.isEmpty()) { 072 // create a mapped destination interceptor 073 VirtualDestination virtualDestination = (VirtualDestination) 074 mappedDestinations.toArray(new VirtualDestination[mappedDestinations.size()])[0]; 075 return virtualDestination.interceptMappedDestination(destination); 076 } 077 078 return destination; 079 } 080 081 @Override 082 public synchronized void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { 083 for (VirtualDestination virt : virtualDestinations) { 084 virt.create(broker, context, destination); 085 } 086 } 087 088 @Override 089 public synchronized void remove(Destination destination) { 090 } 091 092 public VirtualDestination[] getVirtualDestinations() { 093 return virtualDestinations; 094 } 095 096 public void setVirtualDestinations(VirtualDestination[] virtualDestinations) { 097 destinationMap = new DestinationMap(); 098 mappedDestinationMap = new DestinationMap(); 099 this.virtualDestinations = virtualDestinations; 100 for (int i = 0; i < virtualDestinations.length; i++) { 101 VirtualDestination virtualDestination = virtualDestinations[i]; 102 destinationMap.put(virtualDestination.getVirtualDestination(), virtualDestination); 103 mappedDestinationMap.put(virtualDestination.getMappedDestinations(), virtualDestination); 104 } 105 } 106 107 protected Destination createCompositeDestination(Destination destination, final List<Destination> destinations) { 108 return new DestinationFilter(destination) { 109 @Override 110 public void send(ProducerBrokerExchange context, Message messageSend) throws Exception { 111 for (Iterator<Destination> iter = destinations.iterator(); iter.hasNext();) { 112 Destination destination = iter.next(); 113 destination.send(context, messageSend); 114 } 115 } 116 }; 117 } 118 119 @Override 120 public String toString() { 121 return "VirtualDestinationInterceptor" + Arrays.asList(virtualDestinations); 122 } 123}