001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.partition;
018
019import java.net.InetSocketAddress;
020import java.net.Socket;
021import java.net.SocketAddress;
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Map;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029
030import org.apache.activemq.broker.Broker;
031import org.apache.activemq.broker.BrokerFilter;
032import org.apache.activemq.broker.ConnectionContext;
033import org.apache.activemq.broker.ProducerBrokerExchange;
034import org.apache.activemq.broker.TransportConnection;
035import org.apache.activemq.command.ActiveMQDestination;
036import org.apache.activemq.command.ConnectionControl;
037import org.apache.activemq.command.ConnectionId;
038import org.apache.activemq.command.ConnectionInfo;
039import org.apache.activemq.command.Message;
040import org.apache.activemq.partition.dto.Partitioning;
041import org.apache.activemq.partition.dto.Target;
042import org.apache.activemq.state.ConsumerState;
043import org.apache.activemq.state.SessionState;
044import org.apache.activemq.transport.Transport;
045import org.apache.activemq.util.LRUCache;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A BrokerFilter which partitions client connections over a cluster of brokers.
051 *
052 * It can use a client identifier like client id, authenticated user name, source ip
053 * address or even destination being used by the connection to figure out which
054 * is the best broker in the cluster that the connection should be using and then
055 * redirects failover clients to that broker.
056 */
057public class PartitionBroker extends BrokerFilter {
058
059    protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class);
060    protected final PartitionBrokerPlugin plugin;
061    protected boolean reloadConfigOnPoll = true;
062
063    public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) {
064        super(broker);
065        this.plugin = plugin;
066    }
067
068    @Override
069    public void start() throws Exception {
070        super.start();
071        getExecutor().execute(new Runnable() {
072            @Override
073            public void run() {
074                Thread.currentThread().setName("Partition Monitor");
075                onMonitorStart();
076                try {
077                    runPartitionMonitor();
078                } catch (Exception e) {
079                    onMonitorStop();
080                }
081            }
082        });
083    }
084
085    protected void onMonitorStart() {
086    }
087    protected void onMonitorStop() {
088    }
089
090    protected void runPartitionMonitor() {
091        while( !isStopped() ) {
092            try {
093                monitorWait();
094            } catch (InterruptedException e) {
095                break;
096            }
097
098            if(reloadConfigOnPoll) {
099                try {
100                    reloadConfiguration();
101                } catch (Exception e) {
102                    continue;
103                }
104            }
105
106            for( ConnectionMonitor monitor: monitors.values()) {
107                checkTarget(monitor);
108            }
109        }
110    }
111
112    protected void monitorWait() throws InterruptedException {
113        synchronized (this) {
114            this.wait(1000);
115        }
116    }
117
118    protected void monitorWakeup()  {
119        synchronized (this) {
120            this.notifyAll();
121        }
122    }
123
124    protected void reloadConfiguration() throws Exception {
125    }
126
127    protected void checkTarget(ConnectionMonitor monitor) {
128
129        // can we find a preferred target for the connection?
130        Target targetDTO = pickBestBroker(monitor);
131        if( targetDTO == null || targetDTO.ids==null) {
132            LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId());
133            return;
134        }
135
136        // Are we one the the targets?
137        if( targetDTO.ids.contains(getBrokerName()) ) {
138            LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId());
139            return;
140        }
141
142        // Then we need to move the connection over.
143        String connectionString = getConnectionString(targetDTO.ids);
144        if( connectionString==null ) {
145            LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids);
146            return;
147        }
148
149        LOG.info("Redirecting connection to: " + connectionString);
150        TransportConnection connection = (TransportConnection)monitor.context.getConnection();
151        ConnectionControl cc = new ConnectionControl();
152        cc.setConnectedBrokers(connectionString);
153        cc.setRebalanceConnection(true);
154        connection.dispatchAsync(cc);
155    }
156
157    protected String getConnectionString(HashSet<String> ids) {
158        StringBuilder rc = new StringBuilder();
159        for (String id : ids) {
160            String url = plugin.getBrokerURL(this, id);
161            if( url!=null ) {
162                if( rc.length()!=0 ) {
163                    rc.append(',');
164                }
165                rc.append(url);
166            }
167        }
168        if( rc.length()==0 )
169            return null;
170        return rc.toString();
171    }
172
173    static private class Score {
174        int value;
175    }
176
177    protected Target pickBestBroker(ConnectionMonitor monitor) {
178
179        if( getConfig() ==null )
180            return null;
181
182        if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) {
183            TransportConnection connection = (TransportConnection)monitor.context.getConnection();
184            Transport transport = connection.getTransport();
185            Socket socket = transport.narrow(Socket.class);
186            if( socket !=null ) {
187                SocketAddress address = socket.getRemoteSocketAddress();
188                if( address instanceof InetSocketAddress) {
189                    String ip = ((InetSocketAddress) address).getAddress().getHostAddress();
190                    Target targetDTO = getConfig().bySourceIp.get(ip);
191                    if( targetDTO!=null ) {
192                        return targetDTO;
193                    }
194                }
195            }
196        }
197
198        if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) {
199            String userName = monitor.context.getUserName();
200            if( userName !=null ) {
201                Target targetDTO = getConfig().byUserName.get(userName);
202                if( targetDTO!=null ) {
203                    return targetDTO;
204                }
205            }
206        }
207
208        if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) {
209            String clientId = monitor.context.getClientId();
210            if( clientId!=null ) {
211                Target targetDTO = getConfig().byClientId.get(clientId);
212                if( targetDTO!=null ) {
213                    return targetDTO;
214                }
215            }
216        }
217
218        if(
219             (getConfig().byQueue !=null && !getConfig().byQueue.isEmpty())
220          || (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty())
221          ) {
222
223            // Collect the destinations the connection is consuming from...
224            HashSet<ActiveMQDestination> dests = new HashSet<ActiveMQDestination>();
225            for (SessionState session : monitor.context.getConnectionState().getSessionStates()) {
226                for (ConsumerState consumer : session.getConsumerStates()) {
227                    ActiveMQDestination destination = consumer.getInfo().getDestination();
228                    if( destination.isComposite() ) {
229                        dests.addAll(Arrays.asList(destination.getCompositeDestinations()));
230                    } else {
231                        dests.addAll(Collections.singletonList(destination));
232                    }
233                }
234            }
235
236            // Group them by the partitioning target for the destinations and score them..
237            HashMap<Target, Score> targetScores = new HashMap<Target, Score>();
238            for (ActiveMQDestination dest : dests) {
239                Target target = getTarget(dest);
240                if( target!=null ) {
241                    Score score = targetScores.get(target);
242                    if( score == null ) {
243                        score = new Score();
244                        targetScores.put(target, score);
245                    }
246                    score.value++;
247                }
248            }
249
250            // The target with largest score wins..
251            if( !targetScores.isEmpty() ) {
252                Target bestTarget = null;
253                int bestScore=0;
254                for (Map.Entry<Target, Score> entry : targetScores.entrySet()) {
255                    if( entry.getValue().value > bestScore ) {
256                        bestTarget = entry.getKey();
257                    }
258                }
259                return bestTarget;
260            }
261
262            // If we get here is because there were no consumers, or the destinations for those
263            // consumers did not have an assigned destination..  So partition based on producer
264            // usage.
265            Target best = monitor.findBestProducerTarget(this);
266            if( best!=null ) {
267                return best;
268            }
269        }
270        return null;
271    }
272
273    protected Target getTarget(ActiveMQDestination dest) {
274        Partitioning config = getConfig();
275        if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty() ) {
276            return config.byQueue.get(dest.getPhysicalName());
277        } else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty() ) {
278            return config.byTopic.get(dest.getPhysicalName());
279        }
280        return null;
281    }
282
283    protected final ConcurrentMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>();
284
285    @Override
286    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
287        if( info.isFaultTolerant() ) {
288            ConnectionMonitor monitor = new ConnectionMonitor(context);
289            monitors.put(info.getConnectionId(), monitor);
290            super.addConnection(context, info);
291            checkTarget(monitor);
292        } else {
293            super.addConnection(context, info);
294        }
295    }
296
297    @Override
298    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
299        super.removeConnection(context, info, error);
300        if( info.isFaultTolerant() ) {
301            monitors.remove(info.getConnectionId());
302        }
303    }
304
305    @Override
306    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
307        ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId());
308        if( monitor!=null ) {
309            monitor.onSend(producerExchange, messageSend);
310        }
311    }
312
313    protected Partitioning getConfig() {
314        return plugin.getConfig();
315    }
316
317
318    static class Traffic {
319        long messages;
320        long bytes;
321    }
322
323    static class ConnectionMonitor {
324
325        final ConnectionContext context;
326        LRUCache<ActiveMQDestination, Traffic> trafficPerDestination =  new LRUCache<ActiveMQDestination, Traffic>();
327
328        public ConnectionMonitor(ConnectionContext context) {
329            this.context = context;
330        }
331
332        synchronized public Target findBestProducerTarget(PartitionBroker broker) {
333            Target best = null;
334            long bestSize = 0 ;
335            for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) {
336                Traffic t = entry.getValue();
337                // Once we get enough messages...
338                if( t.messages < broker.plugin.getMinTransferCount()) {
339                    continue;
340                }
341                if( t.bytes > bestSize) {
342                    bestSize = t.bytes;
343                    Target target = broker.getTarget(entry.getKey());
344                    if( target!=null ) {
345                        best = target;
346                    }
347                }
348            }
349            return best;
350        }
351
352        synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) {
353            ActiveMQDestination dest = message.getDestination();
354            Traffic traffic = trafficPerDestination.get(dest);
355            if( traffic == null ) {
356                traffic = new Traffic();
357                trafficPerDestination.put(dest, traffic);
358            }
359            traffic.messages += 1;
360            traffic.bytes += message.getSize();
361        }
362
363
364    }
365
366}