001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.ConcurrentHashMap;
022
023import javax.management.ObjectName;
024
025import org.apache.activemq.broker.BrokerService;
026import org.apache.activemq.broker.jmx.AnnotatedMBean;
027import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
028import org.apache.activemq.broker.jmx.NetworkBridgeView;
029import org.apache.activemq.broker.jmx.NetworkDestinationView;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.Message;
032import org.apache.activemq.thread.Scheduler;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036public class MBeanBridgeDestination {
037    private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
038    private final BrokerService brokerService;
039    private final NetworkBridge bridge;
040    private final NetworkBridgeView networkBridgeView;
041    private final NetworkBridgeConfiguration networkBridgeConfiguration;
042    private final Scheduler scheduler;
043    private final Runnable purgeInactiveDestinationViewTask;
044    private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>();
045    private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
046    private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
047
048    public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) {
049        this.brokerService = brokerService;
050        this.networkBridgeConfiguration = networkBridgeConfiguration;
051        this.bridge = bridge;
052        this.networkBridgeView = networkBridgeView;
053        this.scheduler = brokerService.getScheduler();
054        purgeInactiveDestinationViewTask = new Runnable() {
055            public void run() {
056                purgeInactiveDestinationViews();
057            }
058        };
059    }
060
061
062    public void onOutboundMessage(Message message) {
063        ActiveMQDestination destination = message.getDestination();
064        NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination);
065        if (networkDestinationView == null) {
066            synchronized (destinationObjectNameMap) {
067                if ((networkDestinationView = outboundDestinationViewMap.get(destination)) == null) {
068                    ObjectName bridgeObjectName = bridge.getMbeanObjectName();
069                    try {
070                        ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
071                        networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
072                        AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
073                        destinationObjectNameMap.put(destination, objectName);
074                        outboundDestinationViewMap.put(destination, networkDestinationView);
075
076                    } catch (Exception e) {
077                        LOG.warn("Failed to register " + destination, e);
078                    }
079                }
080            }
081        }
082        networkDestinationView.messageSent();
083    }
084
085
086    public void onInboundMessage(Message message) {
087        ActiveMQDestination destination = message.getDestination();
088        NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination);
089        if (networkDestinationView == null) {
090            synchronized (destinationObjectNameMap) {
091                if ((networkDestinationView = inboundDestinationViewMap.get(destination)) == null) {
092                    ObjectName bridgeObjectName = bridge.getMbeanObjectName();
093                    try {
094                        ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
095                        networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
096                        networkBridgeView.addNetworkDestinationView(networkDestinationView);
097                        AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
098                        destinationObjectNameMap.put(destination, objectName);
099                        inboundDestinationViewMap.put(destination, networkDestinationView);
100                    } catch (Exception e) {
101                        LOG.warn("Failed to register " + destination, e);
102                    }
103                }
104            }
105        }
106        networkDestinationView.messageSent();
107    }
108
109    public void start() {
110        if (networkBridgeConfiguration.isGcDestinationViews()) {
111            long period = networkBridgeConfiguration.getGcSweepTime();
112            if (period > 0) {
113                scheduler.executePeriodically(purgeInactiveDestinationViewTask, period);
114            }
115        }
116    }
117
118    public void stop() {
119        if (!brokerService.isUseJmx()) {
120            return;
121        }
122
123        scheduler.cancel(purgeInactiveDestinationViewTask);
124        for (ObjectName objectName : destinationObjectNameMap.values()) {
125            try {
126                if (objectName != null) {
127                    brokerService.getManagementContext().unregisterMBean(objectName);
128                }
129            } catch (Throwable e) {
130                LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
131            }
132        }
133        destinationObjectNameMap.clear();
134        outboundDestinationViewMap.clear();
135        inboundDestinationViewMap.clear();
136    }
137
138    private void purgeInactiveDestinationViews() {
139        if (!brokerService.isUseJmx()) {
140            return;
141        }
142        purgeInactiveDestinationView(inboundDestinationViewMap);
143        purgeInactiveDestinationView(outboundDestinationViewMap);
144    }
145
146    private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationView> map) {
147        long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime();
148        for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : map.entrySet()) {
149            if (entry.getValue().getLastAccessTime() <= time) {
150                synchronized (destinationObjectNameMap) {
151                    map.remove(entry.getKey());
152                    ObjectName objectName = destinationObjectNameMap.remove(entry.getKey());
153                    if (objectName != null) {
154                        try {
155                            if (objectName != null) {
156                                brokerService.getManagementContext().unregisterMBean(objectName);
157                            }
158                        } catch (Throwable e) {
159                            LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
160                        }
161                    }
162                    entry.getValue().close();
163                }
164            }
165        }
166    }
167}