001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport;
018
019import java.io.IOException;
020
021import org.apache.activemq.command.WireFormatInfo;
022import org.apache.activemq.wireformat.WireFormat;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025
026/**
027 * Used to make sure that commands are arriving periodically from the peer of
028 * the transport.
029 */
030public class InactivityMonitor extends AbstractInactivityMonitor {
031
032    private static final Logger LOG = LoggerFactory.getLogger(InactivityMonitor.class);
033
034    private WireFormatInfo localWireFormatInfo;
035    private WireFormatInfo remoteWireFormatInfo;
036
037    private boolean ignoreRemoteWireFormat = false;
038    private boolean ignoreAllWireFormatInfo = false;
039
040    public InactivityMonitor(Transport next, WireFormat wireFormat) {
041        super(next, wireFormat);
042        if (this.wireFormat == null) {
043            this.ignoreAllWireFormatInfo = true;
044        }
045    }
046
047    @Override
048    public void start() throws Exception {
049        startConnectCheckTask();
050        super.start();
051    }
052
053    @Override
054    protected void processInboundWireFormatInfo(WireFormatInfo info) throws IOException {
055        stopConnectCheckTask();
056        IOException error = null;
057        remoteWireFormatInfo = info;
058        try {
059            startMonitorThreads();
060        } catch (IOException e) {
061            error = e;
062        }
063        if (error != null) {
064            onException(error);
065        }
066    }
067
068    @Override
069    protected void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException{
070        localWireFormatInfo = info;
071        startMonitorThreads();
072    }
073
074    @Override
075    protected synchronized void startMonitorThreads() throws IOException {
076        if (isMonitorStarted()) {
077            return;
078        }
079
080        long readCheckTime = getReadCheckTime();
081
082        if (readCheckTime > 0) {
083            setWriteCheckTime(writeCheckValueFromReadCheck(readCheckTime));
084        }
085
086        super.startMonitorThreads();
087    }
088
089    private long writeCheckValueFromReadCheck(long readCheckTime) {
090        return readCheckTime>3 ? readCheckTime/3 : readCheckTime;
091    }
092
093    @Override
094    protected boolean configuredOk() throws IOException {
095        boolean configured = false;
096        if (ignoreAllWireFormatInfo) {
097            configured = true;
098        } else if (localWireFormatInfo != null && remoteWireFormatInfo != null) {
099            if (!ignoreRemoteWireFormat) {
100                if (LOG.isDebugEnabled()) {
101                    LOG.debug("Using min of local: " + localWireFormatInfo + " and remote: " + remoteWireFormatInfo);
102                }
103
104                long readCheckTime = Math.min(localWireFormatInfo.getMaxInactivityDuration(), remoteWireFormatInfo.getMaxInactivityDuration());
105                long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime);
106
107                setReadCheckTime(readCheckTime);
108                setInitialDelayTime(Math.min(localWireFormatInfo.getMaxInactivityDurationInitalDelay(), remoteWireFormatInfo.getMaxInactivityDurationInitalDelay()));
109                setWriteCheckTime(writeCheckTime);
110
111            } else {
112                if (LOG.isDebugEnabled()) {
113                    LOG.debug("Using local: " + localWireFormatInfo);
114                }
115
116                long readCheckTime = localWireFormatInfo.getMaxInactivityDuration();
117                long writeCheckTime = writeCheckValueFromReadCheck(readCheckTime);
118
119                setReadCheckTime(readCheckTime);
120                setInitialDelayTime(localWireFormatInfo.getMaxInactivityDurationInitalDelay());
121                setWriteCheckTime(writeCheckTime);
122            }
123            configured = true;
124        }
125
126        return configured;
127    }
128
129    public boolean isIgnoreAllWireFormatInfo() {
130        return ignoreAllWireFormatInfo;
131    }
132
133    public void setIgnoreAllWireFormatInfo(boolean ignoreAllWireFormatInfo) {
134        this.ignoreAllWireFormatInfo = ignoreAllWireFormatInfo;
135    }
136
137    public boolean isIgnoreRemoteWireFormat() {
138        return ignoreRemoteWireFormat;
139    }
140
141    public void setIgnoreRemoteWireFormat(boolean ignoreRemoteWireFormat) {
142        this.ignoreRemoteWireFormat = ignoreRemoteWireFormat;
143    }
144}