001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.plugin; 018 019import org.apache.activemq.broker.region.*; 020import org.apache.activemq.broker.region.policy.PolicyEntry; 021import org.apache.activemq.broker.region.policy.PolicyMap; 022 023import java.util.Set; 024 025public class PolicyEntryProcessor extends DefaultConfigurationProcessor { 026 027 public PolicyEntryProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) { 028 super(plugin, configurationClass); 029 } 030 031 @Override 032 public void addNew(Object o) { 033 PolicyEntry addition = fromDto(o, new PolicyEntry()); 034 PolicyMap existingMap = plugin.getBrokerService().getDestinationPolicy(); 035 existingMap.put(addition.getDestination(), addition); 036 applyRetrospectively(addition); 037 plugin.info("added policy for: " + addition.getDestination()); 038 } 039 040 @Override 041 public void modify(Object existing, Object candidate) { 042 PolicyMap existingMap = plugin.getBrokerService().getDestinationPolicy(); 043 044 PolicyEntry updatedEntry = fromDto(candidate, new PolicyEntry()); 045 046 Set existingEntry = existingMap.get(updatedEntry.getDestination()); 047 if (existingEntry.size() == 1) { 048 updatedEntry = fromDto(candidate, (PolicyEntry) existingEntry.iterator().next()); 049 applyRetrospectively(updatedEntry); 050 plugin.info("updated policy for: " + updatedEntry.getDestination()); 051 } else { 052 plugin.info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + updatedEntry.getDestination()); 053 } 054 } 055 056 protected void applyRetrospectively(PolicyEntry updatedEntry) { 057 RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService().getRegionBroker(); 058 for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) { 059 Destination target = destination; 060 if (destination instanceof DestinationFilter) { 061 target = ((DestinationFilter)destination).getNext(); 062 } 063 if (target.getActiveMQDestination().isQueue()) { 064 updatedEntry.update((Queue) target); 065 } else if (target.getActiveMQDestination().isTopic()) { 066 updatedEntry.update((Topic) target); 067 } 068 plugin.debug("applied update to:" + target); 069 } 070 } 071}