001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.util.HashMap; 020import java.util.Map; 021import java.util.concurrent.ConcurrentHashMap; 022 023import javax.management.ObjectName; 024 025import org.apache.activemq.broker.BrokerService; 026import org.apache.activemq.broker.jmx.AnnotatedMBean; 027import org.apache.activemq.broker.jmx.BrokerMBeanSupport; 028import org.apache.activemq.broker.jmx.NetworkBridgeView; 029import org.apache.activemq.broker.jmx.NetworkDestinationView; 030import org.apache.activemq.command.ActiveMQDestination; 031import org.apache.activemq.command.Message; 032import org.apache.activemq.thread.Scheduler; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036public class MBeanBridgeDestination { 037 private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class); 038 private final BrokerService brokerService; 039 private final NetworkBridge bridge; 040 private final NetworkBridgeView networkBridgeView; 041 private final NetworkBridgeConfiguration networkBridgeConfiguration; 042 private final Scheduler scheduler; 043 private final Runnable purgeInactiveDestinationViewTask; 044 private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>(); 045 private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>(); 046 private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>(); 047 048 public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) { 049 this.brokerService = brokerService; 050 this.networkBridgeConfiguration = networkBridgeConfiguration; 051 this.bridge = bridge; 052 this.networkBridgeView = networkBridgeView; 053 this.scheduler = brokerService.getScheduler(); 054 purgeInactiveDestinationViewTask = new Runnable() { 055 public void run() { 056 purgeInactiveDestinationViews(); 057 } 058 }; 059 } 060 061 062 public void onOutboundMessage(Message message) { 063 ActiveMQDestination destination = message.getDestination(); 064 NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination); 065 if (networkDestinationView == null) { 066 synchronized (destinationObjectNameMap) { 067 if ((networkDestinationView = outboundDestinationViewMap.get(destination)) == null) { 068 ObjectName bridgeObjectName = bridge.getMbeanObjectName(); 069 try { 070 ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination); 071 networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); 072 AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); 073 destinationObjectNameMap.put(destination, objectName); 074 outboundDestinationViewMap.put(destination, networkDestinationView); 075 076 } catch (Exception e) { 077 LOG.warn("Failed to register " + destination, e); 078 } 079 } 080 } 081 } 082 networkDestinationView.messageSent(); 083 } 084 085 086 public void onInboundMessage(Message message) { 087 ActiveMQDestination destination = message.getDestination(); 088 NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination); 089 if (networkDestinationView == null) { 090 synchronized (destinationObjectNameMap) { 091 if ((networkDestinationView = inboundDestinationViewMap.get(destination)) == null) { 092 ObjectName bridgeObjectName = bridge.getMbeanObjectName(); 093 try { 094 ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination); 095 networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName()); 096 networkBridgeView.addNetworkDestinationView(networkDestinationView); 097 AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName); 098 destinationObjectNameMap.put(destination, objectName); 099 inboundDestinationViewMap.put(destination, networkDestinationView); 100 } catch (Exception e) { 101 LOG.warn("Failed to register " + destination, e); 102 } 103 } 104 } 105 } 106 networkDestinationView.messageSent(); 107 } 108 109 public void start() { 110 if (networkBridgeConfiguration.isGcDestinationViews()) { 111 long period = networkBridgeConfiguration.getGcSweepTime(); 112 if (period > 0) { 113 scheduler.executePeriodically(purgeInactiveDestinationViewTask, period); 114 } 115 } 116 } 117 118 public void stop() { 119 if (!brokerService.isUseJmx()) { 120 return; 121 } 122 123 scheduler.cancel(purgeInactiveDestinationViewTask); 124 for (ObjectName objectName : destinationObjectNameMap.values()) { 125 try { 126 if (objectName != null) { 127 brokerService.getManagementContext().unregisterMBean(objectName); 128 } 129 } catch (Throwable e) { 130 LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); 131 } 132 } 133 destinationObjectNameMap.clear(); 134 outboundDestinationViewMap.clear(); 135 inboundDestinationViewMap.clear(); 136 } 137 138 private void purgeInactiveDestinationViews() { 139 if (!brokerService.isUseJmx()) { 140 return; 141 } 142 purgeInactiveDestinationView(inboundDestinationViewMap); 143 purgeInactiveDestinationView(outboundDestinationViewMap); 144 } 145 146 private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationView> map) { 147 long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime(); 148 for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : map.entrySet()) { 149 if (entry.getValue().getLastAccessTime() <= time) { 150 synchronized (destinationObjectNameMap) { 151 map.remove(entry.getKey()); 152 ObjectName objectName = destinationObjectNameMap.remove(entry.getKey()); 153 if (objectName != null) { 154 try { 155 if (objectName != null) { 156 brokerService.getManagementContext().unregisterMBean(objectName); 157 } 158 } catch (Throwable e) { 159 LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e); 160 } 161 } 162 entry.getValue().close(); 163 } 164 } 165 } 166 } 167}