001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.partition;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.leveldb.replicated.groups.ZKClient;
021import org.apache.activemq.partition.dto.Partitioning;
022import org.apache.zookeeper.WatchedEvent;
023import org.apache.zookeeper.Watcher;
024import org.apache.zookeeper.data.Stat;
025import org.linkedin.util.clock.Timespan;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.TimeUnit;
031
032/**
033 */
034public class ZooKeeperPartitionBroker extends PartitionBroker {
035
036    protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperPartitionBroker.class);
037
038    protected volatile ZKClient zk_client = null;
039    protected volatile Partitioning config;
040    protected final CountDownLatch configAcquired = new CountDownLatch(1);
041
042    public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin) {
043        super(broker, plugin);
044    }
045
046    @Override
047    public void start() throws Exception {
048        super.start();
049        // Lets block a bit until we get our config.. Otherwise just keep
050        // on going.. not a big deal if we get our config later.  Perhaps
051        // ZK service is not having a good day.
052        configAcquired.await(5, TimeUnit.SECONDS);
053    }
054
055    @Override
056    protected void onMonitorStop() {
057        zkDisconnect();
058    }
059
060    @Override
061    protected Partitioning getConfig() {
062        return config;
063    }
064
065    protected ZooKeeperPartitionBrokerPlugin plugin() {
066        return (ZooKeeperPartitionBrokerPlugin)plugin;
067    }
068
069    protected void zkConnect() throws Exception {
070        zk_client = new ZKClient(plugin().getZkAddress(), Timespan.parse(plugin().getZkSessionTmeout()), null);
071        if( plugin().getZkPassword()!=null ) {
072            zk_client.setPassword(plugin().getZkPassword());
073        }
074        zk_client.start();
075        zk_client.waitForConnected(Timespan.parse("30s"));
076    }
077
078    protected void zkDisconnect() {
079        if( zk_client!=null ) {
080            zk_client.close();
081            zk_client = null;
082        }
083    }
084
085    protected void reloadConfiguration() throws Exception {
086        if( zk_client==null )  {
087            LOG.debug("Connecting to ZooKeeper");
088            try {
089                zkConnect();
090                LOG.debug("Connected to ZooKeeper");
091            } catch (Exception e) {
092                LOG.debug("Connection to ZooKeeper failed: "+e);
093                zkDisconnect();
094                throw e;
095            }
096        }
097
098        byte[] data = null;
099        try {
100            Stat stat = new Stat();
101            data = zk_client.getData(plugin().getZkPath(), new Watcher() {
102                @Override
103                public void process(WatchedEvent watchedEvent) {
104                    try {
105                        reloadConfiguration();
106                    } catch (Exception e) {
107                    }
108                    monitorWakeup();
109                }
110            }, stat);
111            configAcquired.countDown();
112            reloadConfigOnPoll = false;
113        } catch (Exception e) {
114            LOG.warn("Could load partitioning configuration: " + e, e);
115            reloadConfigOnPoll = true;
116        }
117
118        try {
119            config = Partitioning.MAPPER.readValue(data, Partitioning.class);
120        } catch (Exception e) {
121            LOG.warn("Invalid partitioning configuration: " + e, e);
122        }
123    }
124
125}