001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import org.apache.activemq.broker.region.*;
020import org.apache.activemq.broker.region.policy.PolicyEntry;
021import org.apache.activemq.broker.region.policy.PolicyMap;
022
023import java.util.Set;
024
025public class PolicyEntryProcessor extends DefaultConfigurationProcessor {
026
027    public PolicyEntryProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
028        super(plugin, configurationClass);
029    }
030
031    @Override
032    public void addNew(Object o) {
033        PolicyEntry addition = fromDto(o, new PolicyEntry());
034        PolicyMap existingMap = plugin.getBrokerService().getDestinationPolicy();
035        existingMap.put(addition.getDestination(), addition);
036        applyRetrospectively(addition);
037        plugin.info("added policy for: " + addition.getDestination());
038    }
039
040    @Override
041    public void modify(Object existing, Object candidate) {
042        PolicyMap existingMap = plugin.getBrokerService().getDestinationPolicy();
043
044        PolicyEntry updatedEntry = fromDto(candidate, new PolicyEntry());
045
046        Set existingEntry = existingMap.get(updatedEntry.getDestination());
047        if (existingEntry.size() == 1) {
048            updatedEntry = fromDto(candidate, (PolicyEntry) existingEntry.iterator().next());
049            applyRetrospectively(updatedEntry);
050            plugin.info("updated policy for: " + updatedEntry.getDestination());
051        } else {
052            plugin.info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + updatedEntry.getDestination());
053        }
054    }
055
056    protected void applyRetrospectively(PolicyEntry updatedEntry) {
057        RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService().getRegionBroker();
058        for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) {
059            Destination target = destination;
060            if (destination instanceof DestinationFilter) {
061                target = ((DestinationFilter)destination).getNext();
062            }
063            if (target.getActiveMQDestination().isQueue()) {
064                updatedEntry.update((Queue) target);
065            } else if (target.getActiveMQDestination().isTopic()) {
066                updatedEntry.update((Topic) target);
067            }
068            plugin.debug("applied update to:" + target);
069        }
070    }
071}