001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.partition; 018 019import org.apache.activemq.broker.Broker; 020import org.apache.activemq.leveldb.replicated.groups.ZKClient; 021import org.apache.activemq.partition.dto.Partitioning; 022import org.apache.zookeeper.WatchedEvent; 023import org.apache.zookeeper.Watcher; 024import org.apache.zookeeper.data.Stat; 025import org.linkedin.util.clock.Timespan; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.TimeUnit; 031 032/** 033 */ 034public class ZooKeeperPartitionBroker extends PartitionBroker { 035 036 protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperPartitionBroker.class); 037 038 protected volatile ZKClient zk_client = null; 039 protected volatile Partitioning config; 040 protected final CountDownLatch configAcquired = new CountDownLatch(1); 041 042 public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin) { 043 super(broker, plugin); 044 } 045 046 @Override 047 public void start() throws Exception { 048 super.start(); 049 // Lets block a bit until we get our config.. Otherwise just keep 050 // on going.. not a big deal if we get our config later. Perhaps 051 // ZK service is not having a good day. 052 configAcquired.await(5, TimeUnit.SECONDS); 053 } 054 055 @Override 056 protected void onMonitorStop() { 057 zkDisconnect(); 058 } 059 060 @Override 061 protected Partitioning getConfig() { 062 return config; 063 } 064 065 protected ZooKeeperPartitionBrokerPlugin plugin() { 066 return (ZooKeeperPartitionBrokerPlugin)plugin; 067 } 068 069 protected void zkConnect() throws Exception { 070 zk_client = new ZKClient(plugin().getZkAddress(), Timespan.parse(plugin().getZkSessionTmeout()), null); 071 if( plugin().getZkPassword()!=null ) { 072 zk_client.setPassword(plugin().getZkPassword()); 073 } 074 zk_client.start(); 075 zk_client.waitForConnected(Timespan.parse("30s")); 076 } 077 078 protected void zkDisconnect() { 079 if( zk_client!=null ) { 080 zk_client.close(); 081 zk_client = null; 082 } 083 } 084 085 protected void reloadConfiguration() throws Exception { 086 if( zk_client==null ) { 087 LOG.debug("Connecting to ZooKeeper"); 088 try { 089 zkConnect(); 090 LOG.debug("Connected to ZooKeeper"); 091 } catch (Exception e) { 092 LOG.debug("Connection to ZooKeeper failed: "+e); 093 zkDisconnect(); 094 throw e; 095 } 096 } 097 098 byte[] data = null; 099 try { 100 Stat stat = new Stat(); 101 data = zk_client.getData(plugin().getZkPath(), new Watcher() { 102 @Override 103 public void process(WatchedEvent watchedEvent) { 104 try { 105 reloadConfiguration(); 106 } catch (Exception e) { 107 } 108 monitorWakeup(); 109 } 110 }, stat); 111 configAcquired.countDown(); 112 reloadConfigOnPoll = false; 113 } catch (Exception e) { 114 LOG.warn("Could load partitioning configuration: " + e, e); 115 reloadConfigOnPoll = true; 116 } 117 118 try { 119 config = Partitioning.MAPPER.readValue(data, Partitioning.class); 120 } catch (Exception e) { 121 LOG.warn("Invalid partitioning configuration: " + e, e); 122 } 123 } 124 125}