001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.plugin; 018 019import org.apache.activemq.broker.region.CompositeDestinationInterceptor; 020import org.apache.activemq.broker.region.DestinationInterceptor; 021import org.apache.activemq.broker.region.RegionBroker; 022import org.apache.activemq.broker.region.virtual.*; 023import org.apache.activemq.schema.core.DtoVirtualDestinationInterceptor; 024import org.apache.activemq.schema.core.DtoVirtualTopic; 025import org.apache.activemq.schema.core.DtoCompositeTopic; 026import org.apache.activemq.schema.core.DtoCompositeQueue; 027 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.List; 031 032public class VirtualDestinationInterceptorProcessor extends DefaultConfigurationProcessor { 033 034 public VirtualDestinationInterceptorProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) { 035 super(plugin, configurationClass); 036 } 037 038 @Override 039 public void addNew(Object o) { 040 final DtoVirtualDestinationInterceptor dto = (DtoVirtualDestinationInterceptor) o; 041 plugin.addDestinationWork.add(new Runnable() { 042 public void run() { 043 044 boolean updatedExistingInterceptor = false; 045 RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService().getRegionBroker(); 046 047 for (DestinationInterceptor destinationInterceptor : plugin.getBrokerService().getDestinationInterceptors()) { 048 if (destinationInterceptor instanceof VirtualDestinationInterceptor) { 049 // update existing interceptor 050 final VirtualDestinationInterceptor virtualDestinationInterceptor = 051 (VirtualDestinationInterceptor) destinationInterceptor; 052 053 virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto)); 054 plugin.info("applied updates to: " + virtualDestinationInterceptor); 055 updatedExistingInterceptor = true; 056 } 057 } 058 059 if (!updatedExistingInterceptor) { 060 // add 061 VirtualDestinationInterceptor virtualDestinationInterceptor = 062 new VirtualDestinationInterceptor(); 063 virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto)); 064 065 List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>(); 066 interceptorsList.addAll(Arrays.asList(plugin.getBrokerService().getDestinationInterceptors())); 067 interceptorsList.add(virtualDestinationInterceptor); 068 069 DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{}); 070 plugin.getBrokerService().setDestinationInterceptors(destinationInterceptors); 071 072 ((CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor()).setInterceptors(destinationInterceptors); 073 plugin.info("applied new: " + interceptorsList); 074 } 075 regionBroker.reapplyInterceptor(); 076 } 077 }); 078 } 079 080 @Override 081 public void remove(Object o) { 082 // whack it 083 plugin.addDestinationWork.add(new Runnable() { 084 public void run() { 085 List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>(); 086 for (DestinationInterceptor candidate : plugin.getBrokerService().getDestinationInterceptors()) { 087 if (!(candidate instanceof VirtualDestinationInterceptor)) { 088 interceptorsList.add(candidate); 089 } 090 } 091 DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{}); 092 plugin.getBrokerService().setDestinationInterceptors(destinationInterceptors); 093 ((CompositeDestinationInterceptor) ((RegionBroker) plugin.getBrokerService().getRegionBroker()).getDestinationInterceptor()).setInterceptors(destinationInterceptors); 094 plugin.info("removed VirtualDestinationInterceptor from: " + interceptorsList); 095 } 096 }); 097 } 098 099 private VirtualDestination[] fromDto(DtoVirtualDestinationInterceptor virtualDestinationInterceptor) { 100 List<VirtualDestination> answer = new ArrayList<VirtualDestination>(); 101 for (Object vd : filter(virtualDestinationInterceptor, DtoVirtualDestinationInterceptor.VirtualDestinations.class)) { 102 for (Object vt : filter(vd, DtoVirtualTopic.class)) { 103 answer.add(fromDto(vt, new VirtualTopic())); 104 } 105 for (Object vt : filter(vd, DtoCompositeTopic.class)) { 106 answer.add(fromDto(vt, new CompositeTopic())); 107 } 108 for (Object vt : filter(vd, DtoCompositeQueue.class)) { 109 answer.add(fromDto(vt, new CompositeQueue())); 110 } 111 } 112 VirtualDestination[] array = new VirtualDestination[answer.size()]; 113 answer.toArray(array); 114 return array; 115 } 116}