001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020
021import javax.management.ObjectName;
022
023import org.apache.activemq.broker.Broker;
024import org.apache.activemq.broker.TransportConnection;
025import org.apache.activemq.broker.TransportConnector;
026import org.apache.activemq.command.ConnectionInfo;
027import org.apache.activemq.command.Response;
028import org.apache.activemq.thread.TaskRunnerFactory;
029import org.apache.activemq.transport.Transport;
030import org.apache.activemq.util.IOExceptionSupport;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * A managed transport connection
036 */
037public class ManagedTransportConnection extends TransportConnection {
038    private static final Logger LOG = LoggerFactory.getLogger(ManagedTransportConnection.class);
039
040    private final ManagementContext managementContext;
041    private final ObjectName connectorName;
042    private final ConnectionViewMBean mbean;
043
044    private ObjectName byClientIdName;
045    private ObjectName byAddressName;
046
047    private final boolean populateUserName;
048
049    public ManagedTransportConnection(TransportConnector connector, Transport transport, Broker broker,
050                                      TaskRunnerFactory factory, TaskRunnerFactory stopFactory,
051                                      ManagementContext context, ObjectName connectorName)
052        throws IOException {
053        super(connector, transport, broker, factory, stopFactory);
054        this.managementContext = context;
055        this.connectorName = connectorName;
056        this.mbean = new ConnectionView(this, managementContext);
057        this.populateUserName = broker.getBrokerService().isPopulateUserNameInMBeans();
058        if (managementContext.isAllowRemoteAddressInMBeanNames()) {
059            byAddressName = createObjectName("remoteAddress", transport.getRemoteAddress());
060            registerMBean(byAddressName);
061        }
062    }
063
064    @Override
065    public void stopAsync() {
066        if (!isStopping()) {
067            synchronized (this) {
068                unregisterMBean(byClientIdName);
069                unregisterMBean(byAddressName);
070                byClientIdName = null;
071                byAddressName = null;
072            }
073        }
074        super.stopAsync();
075    }
076
077    @Override
078    public Response processAddConnection(ConnectionInfo info) throws Exception {
079        Response answer = super.processAddConnection(info);
080        String clientId = info.getClientId();
081        if (populateUserName) {
082            ((ConnectionView) mbean).setUserName(info.getUserName());
083        }
084        if (clientId != null) {
085            if (byClientIdName == null) {
086                byClientIdName = createObjectName("clientId", clientId);
087                registerMBean(byClientIdName);
088            }
089        }
090        return answer;
091    }
092
093    // Implementation methods
094    // -------------------------------------------------------------------------
095    protected void registerMBean(ObjectName name) {
096        if (name != null) {
097            try {
098                AnnotatedMBean.registerMBean(managementContext, mbean, name);
099            } catch (Throwable e) {
100                LOG.warn("Failed to register MBean {}", name);
101                LOG.debug("Failure reason: ", e);
102            }
103        }
104    }
105
106    protected void unregisterMBean(ObjectName name) {
107        if (name != null) {
108            try {
109                managementContext.unregisterMBean(name);
110            } catch (Throwable e) {
111                LOG.warn("Failed to unregister MBean {}", name);
112                LOG.debug("Failure reason: ", e);
113            }
114        }
115    }
116
117    protected ObjectName createObjectName(String type, String value) throws IOException {
118        try {
119            return BrokerMBeanSupport.createConnectionViewByType(connectorName, type, value);
120        } catch (Throwable e) {
121            throw IOExceptionSupport.create(e);
122        }
123    }
124
125}