001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.leveldb.replicated.groups;
018
019import java.io.UnsupportedEncodingException;
020import java.lang.reflect.Field;
021import java.lang.reflect.Method;
022import java.util.*;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.TimeoutException;
027import java.util.concurrent.atomic.AtomicBoolean;
028
029import org.apache.zookeeper.CreateMode;
030import org.apache.zookeeper.KeeperException;
031import org.apache.zookeeper.WatchedEvent;
032import org.apache.zookeeper.Watcher;
033import org.apache.zookeeper.ZooDefs;
034import org.apache.zookeeper.data.ACL;
035import org.apache.zookeeper.data.Id;
036import org.apache.zookeeper.data.Stat;
037import org.linkedin.util.clock.Clock;
038import org.linkedin.util.clock.SystemClock;
039import org.linkedin.util.clock.Timespan;
040import org.linkedin.util.concurrent.ConcurrentUtils;
041import org.linkedin.util.io.PathUtils;
042import org.linkedin.zookeeper.client.ChrootedZKClient;
043import org.linkedin.zookeeper.client.IZooKeeper;
044import org.linkedin.zookeeper.client.IZooKeeperFactory;
045import org.linkedin.zookeeper.client.LifecycleListener;
046import org.linkedin.zookeeper.client.ZooKeeperFactory;
047import org.osgi.framework.InvalidSyntaxException;
048import org.osgi.service.cm.ConfigurationException;
049import org.slf4j.Logger;
050
051public class ZKClient extends org.linkedin.zookeeper.client.AbstractZKClient implements Watcher {
052
053    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ZKClient.class.getName());
054
055    private Map<String, String> acls;
056    private String password;
057
058
059    public void start() throws Exception {
060        // Grab the lock to make sure that the registration of the ManagedService
061        // won't be updated immediately but that the initial update will happen first
062        synchronized (_lock) {
063            _stateChangeDispatcher.setDaemon(true);
064            _stateChangeDispatcher.start();
065
066            doStart();
067        }
068    }
069
070    public void setACLs(Map<String, String> acls) {
071        this.acls = acls;
072    }
073
074    public void setPassword(String password) {
075        this.password = password;
076    }
077
078    protected void doStart() throws InvalidSyntaxException, ConfigurationException, UnsupportedEncodingException {
079        connect();
080    }
081
082    @Override
083    public void close() {
084        if (_stateChangeDispatcher != null) {
085            _stateChangeDispatcher.end();
086            try {
087                _stateChangeDispatcher.join(1000);
088            } catch(Exception e) {
089                LOG.debug("ignored exception", e);
090            }
091        }
092        synchronized(_lock) {
093            if (_zk != null) {
094                try {
095                    changeState(State.NONE);
096                    _zk.close();
097                    // We try to avoid a NPE when shutting down fabric:
098                    // java.lang.NullPointerException
099                    //     at org.apache.felix.framework.BundleWiringImpl.findClassOrResourceByDelegation(BundleWiringImpl.java:1433)
100                    //     at org.apache.felix.framework.BundleWiringImpl.access$400(BundleWiringImpl.java:73)
101                    //     at org.apache.felix.framework.BundleWiringImpl$BundleClassLoader.loadClass(BundleWiringImpl.java:1844)
102                    //     at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
103                    //     at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1089)
104                    Thread th = getSendThread();
105                    if (th != null) {
106                        th.join(1000);
107                    }
108                    _zk = null;
109                } catch(Exception e) {
110                    LOG.debug("ignored exception", e);
111                }
112            }
113        }
114    }
115
116    protected Thread getSendThread() {
117        try {
118            return (Thread) getField(_zk, "_zk", "cnxn", "sendThread");
119        } catch (Throwable e) {
120            return null;
121        }
122    }
123
124    protected Object getField(Object obj, String... names) throws Exception {
125        for (String name : names) {
126            obj = getField(obj, name);
127        }
128        return obj;
129    }
130
131    protected Object getField(Object obj, String name) throws Exception {
132        Class clazz = obj.getClass();
133        while (clazz != null) {
134            for (Field f : clazz.getDeclaredFields()) {
135                if (f.getName().equals(name)) {
136                    f.setAccessible(true);
137                    return f.get(obj);
138                }
139            }
140        }
141        throw new NoSuchFieldError(name);
142    }
143
144    protected void changeState(State newState) {
145        synchronized (_lock) {
146            State oldState = _state;
147            if (oldState != newState) {
148                _stateChangeDispatcher.addEvent(oldState, newState);
149                _state = newState;
150                _lock.notifyAll();
151            }
152        }
153    }
154
155    public void testGenerateConnectionLoss() throws Exception {
156        waitForConnected();
157        Object clientCnxnSocket  = getField(_zk, "_zk", "cnxn", "sendThread", "clientCnxnSocket");
158        callMethod(clientCnxnSocket, "testableCloseSocket");
159    }
160
161    protected Object callMethod(Object obj, String name, Object... args) throws Exception {
162        Class clazz = obj.getClass();
163        while (clazz != null) {
164            for (Method m : clazz.getDeclaredMethods()) {
165                if (m.getName().equals(name)) {
166                    m.setAccessible(true);
167                    return m.invoke(obj, args);
168                }
169            }
170        }
171        throw new NoSuchMethodError(name);
172    }
173
174    protected void tryConnect() {
175        synchronized (_lock) {
176            try {
177                connect();
178            } catch (Throwable e) {
179                LOG.warn("Error while restarting:", e);
180                if (_expiredSessionRecovery == null) {
181                    _expiredSessionRecovery = new ExpiredSessionRecovery();
182                    _expiredSessionRecovery.setDaemon(true);
183                    _expiredSessionRecovery.start();
184                }
185            }
186        }
187    }
188
189    public void connect() throws UnsupportedEncodingException {
190        synchronized (_lock) {
191            changeState(State.CONNECTING);
192            _zk = _factory.createZooKeeper(this);
193            if (password != null) {
194                _zk.addAuthInfo("digest", ("fabric:" + password).getBytes("UTF-8"));
195            }
196        }
197    }
198
199    public void process(WatchedEvent event) {
200        if (event.getState() != null) {
201            LOG.debug("event: {}", event.getState());
202            synchronized (_lock) {
203                switch(event.getState())
204                {
205                    case SyncConnected:
206                        changeState(State.CONNECTED);
207                        break;
208
209                    case Disconnected:
210                        if(_state != State.NONE) {
211                            changeState(State.RECONNECTING);
212                        }
213                        break;
214
215                    case Expired:
216                        // when expired, the zookeeper object is invalid and we need to recreate a new one
217                        _zk = null;
218                        LOG.warn("Expiration detected: trying to restart...");
219                        tryConnect();
220                        break;
221                    default:
222                        LOG.warn("unprocessed event state: {}", event.getState());
223                }
224            }
225        }
226    }
227
228    @Override
229    protected IZooKeeper getZk() {
230        State state = _state;
231        if (state == State.NONE) {
232            throw new IllegalStateException("ZooKeeper client has not been configured yet. You need to either create an ensemble or join one.");
233        } else if (state != State.CONNECTED) {
234            try {
235                waitForConnected();
236            } catch (Exception e) {
237                throw new IllegalStateException("Error waiting for ZooKeeper connection", e);
238            }
239        }
240        IZooKeeper zk = _zk;
241        if (zk == null) {
242            throw new IllegalStateException("No ZooKeeper connection available");
243        }
244        return zk;
245    }
246
247    public void waitForConnected(Timespan timeout) throws InterruptedException, TimeoutException {
248        waitForState(State.CONNECTED, timeout);
249    }
250
251    public void waitForConnected() throws InterruptedException, TimeoutException {
252        waitForConnected(null);
253    }
254
255    public void waitForState(State state, Timespan timeout) throws TimeoutException, InterruptedException {
256        long endTime = (timeout == null ? sessionTimeout : timeout).futureTimeMillis(_clock);
257        if (_state != state) {
258            synchronized (_lock) {
259                while (_state != state) {
260                    ConcurrentUtils.awaitUntil(_clock, _lock, endTime);
261                }
262            }
263        }
264    }
265
266    @Override
267    public void registerListener(LifecycleListener listener) {
268        if (listener == null) {
269            throw new IllegalStateException("listener is null");
270        }
271        if (!_listeners.contains(listener)) {
272            _listeners.add(listener);
273
274        }
275        if (_state == State.CONNECTED) {
276            listener.onConnected();
277            //_stateChangeDispatcher.addEvent(null, State.CONNECTED);
278        }
279    }
280
281    @Override
282    public void removeListener(LifecycleListener listener) {
283        if (listener == null) {
284            throw new IllegalStateException("listener is null");
285        }
286        _listeners.remove(listener);
287    }
288
289    @Override
290    public org.linkedin.zookeeper.client.IZKClient chroot(String path) {
291        return new ChrootedZKClient(this, adjustPath(path));
292    }
293
294    @Override
295    public boolean isConnected() {
296        return _state == State.CONNECTED;
297    }
298
299    public boolean isConfigured() {
300        return _state != State.NONE;
301    }
302
303    @Override
304    public String getConnectString() {
305        return _factory.getConnectString();
306    }
307
308    public static enum State {
309        NONE,
310        CONNECTING,
311        CONNECTED,
312        RECONNECTING
313    }
314
315    private final static String CHARSET = "UTF-8";
316
317    private final Clock _clock = SystemClock.instance();
318    private final List<LifecycleListener> _listeners = new CopyOnWriteArrayList<LifecycleListener>();
319
320    protected final Object _lock = new Object();
321    protected volatile State _state = State.NONE;
322
323    private final StateChangeDispatcher _stateChangeDispatcher = new StateChangeDispatcher();
324
325    protected IZooKeeperFactory _factory;
326    protected IZooKeeper _zk;
327    protected Timespan _reconnectTimeout = Timespan.parse("20s");
328    protected Timespan sessionTimeout = new Timespan(30, Timespan.TimeUnit.SECOND);
329
330    private ExpiredSessionRecovery _expiredSessionRecovery = null;
331
332    private class StateChangeDispatcher extends Thread {
333        private final AtomicBoolean _running = new AtomicBoolean(true);
334        private final BlockingQueue<Boolean> _events = new LinkedBlockingQueue<Boolean>();
335
336        private StateChangeDispatcher() {
337            super("ZooKeeper state change dispatcher thread");
338        }
339
340        @Override
341        public void run() {
342            Map<Object, Boolean> history = new IdentityHashMap<Object, Boolean>();
343            LOG.info("Starting StateChangeDispatcher");
344            while (_running.get()) {
345                Boolean isConnectedEvent;
346                try {
347                    isConnectedEvent = _events.take();
348                } catch (InterruptedException e) {
349                    continue;
350                }
351                if (!_running.get() || isConnectedEvent == null) {
352                    continue;
353                }
354                Map<Object, Boolean> newHistory = callListeners(history, isConnectedEvent);
355                // we save which event each listener has seen last
356                // we don't update the map in place because we need to get rid of unregistered listeners
357                history = newHistory;
358            }
359            LOG.info("StateChangeDispatcher terminated.");
360        }
361
362        public void end() {
363            _running.set(false);
364            _events.add(false);
365        }
366
367        public void addEvent(ZKClient.State oldState, ZKClient.State newState) {
368            LOG.debug("addEvent: {} => {}", oldState, newState);
369            if (newState == ZKClient.State.CONNECTED) {
370                _events.add(true);
371            } else if (oldState == ZKClient.State.CONNECTED) {
372                _events.add(false);
373            }
374        }
375    }
376
377    protected Map<Object, Boolean> callListeners(Map<Object, Boolean> history, Boolean connectedEvent) {
378        Map<Object, Boolean> newHistory = new IdentityHashMap<Object, Boolean>();
379        for (LifecycleListener listener : _listeners) {
380            Boolean previousEvent = history.get(listener);
381            // we propagate the event only if it was not already sent
382            if (previousEvent == null || previousEvent != connectedEvent) {
383                try {
384                    if (connectedEvent) {
385                        listener.onConnected();
386                    } else {
387                        listener.onDisconnected();
388                    }
389                } catch (Throwable e) {
390                    LOG.warn("Exception while executing listener (ignored)", e);
391                }
392            }
393            newHistory.put(listener, connectedEvent);
394        }
395        return newHistory;
396    }
397
398    private class ExpiredSessionRecovery extends Thread {
399        private ExpiredSessionRecovery() {
400            super("ZooKeeper expired session recovery thread");
401        }
402
403        @Override
404        public void run() {
405            LOG.info("Entering recovery mode");
406            synchronized(_lock) {
407                try {
408                    int count = 0;
409                    while (_state == ZKClient.State.NONE) {
410                        try {
411                            count++;
412                            LOG.warn("Recovery mode: trying to reconnect to zookeeper [" + count + "]");
413                            ZKClient.this.connect();
414                        } catch (Throwable e) {
415                            LOG.warn("Recovery mode: reconnect attempt failed [" + count + "]... waiting for " + _reconnectTimeout, e);
416                            try {
417                                _lock.wait(_reconnectTimeout.getDurationInMilliseconds());
418                            } catch(InterruptedException e1) {
419                                throw new RuntimeException("Recovery mode: wait interrupted... bailing out", e1);
420                            }
421                        }
422                    }
423                } finally {
424                    _expiredSessionRecovery = null;
425                    LOG.info("Exiting recovery mode.");
426                }
427            }
428        }
429    }
430
431    /**
432     * Constructor
433     */
434    public ZKClient(String connectString, Timespan sessionTimeout, Watcher watcher)
435    {
436        this(new ZooKeeperFactory(connectString, sessionTimeout, watcher));
437    }
438
439    /**
440     * Constructor
441     */
442    public ZKClient(IZooKeeperFactory factory)
443    {
444        this(factory, null);
445    }
446
447    /**
448     * Constructor
449     */
450    public ZKClient(IZooKeeperFactory factory, String chroot)
451    {
452        super(chroot);
453        _factory = factory;
454        Map<String, String> acls = new HashMap<String, String>();
455        acls.put("/", "world:anyone:acdrw");
456        setACLs(acls);
457    }
458
459    static private int getPermFromString(String permString) {
460        int perm = 0;
461        for (int i = 0; i < permString.length(); i++) {
462            switch (permString.charAt(i)) {
463                case 'r':
464                    perm |= ZooDefs.Perms.READ;
465                    break;
466                case 'w':
467                    perm |= ZooDefs.Perms.WRITE;
468                    break;
469                case 'c':
470                    perm |= ZooDefs.Perms.CREATE;
471                    break;
472                case 'd':
473                    perm |= ZooDefs.Perms.DELETE;
474                    break;
475                case 'a':
476                    perm |= ZooDefs.Perms.ADMIN;
477                    break;
478                default:
479                    System.err
480                            .println("Unknown perm type: " + permString.charAt(i));
481            }
482        }
483        return perm;
484    }
485
486    private static List<ACL> parseACLs(String aclString) {
487        List<ACL> acl;
488        String acls[] = aclString.split(",");
489        acl = new ArrayList<ACL>();
490        for (String a : acls) {
491            int firstColon = a.indexOf(':');
492            int lastColon = a.lastIndexOf(':');
493            if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
494                System.err
495                        .println(a + " does not have the form scheme:id:perm");
496                continue;
497            }
498            ACL newAcl = new ACL();
499            newAcl.setId(new Id(a.substring(0, firstColon), a.substring(
500                    firstColon + 1, lastColon)));
501            newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
502            acl.add(newAcl);
503        }
504        return acl;
505    }
506
507    public Stat createOrSetByteWithParents(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
508        if (exists(path) != null) {
509            return setByteData(path, data);
510        }
511        try {
512            createBytesNodeWithParents(path, data, acl, createMode);
513            return null;
514        } catch(KeeperException.NodeExistsException e) {
515            // this should not happen very often (race condition)
516            return setByteData(path, data);
517        }
518    }
519
520    public String create(String path, CreateMode createMode) throws InterruptedException, KeeperException {
521        return create(path, (byte[]) null, createMode);
522    }
523
524    public String create(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
525        return create(path, toByteData(data), createMode);
526    }
527
528    public String create(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
529        return getZk().create(adjustPath(path), data, getNodeACLs(path), createMode);
530    }
531
532    public String createWithParents(String path, CreateMode createMode) throws InterruptedException, KeeperException {
533        return createWithParents(path, (byte[]) null, createMode);
534    }
535
536    public String createWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
537        return createWithParents(path, toByteData(data), createMode);
538    }
539
540    public String createWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
541        createParents(path);
542        return create(path, data, createMode);
543    }
544
545    public Stat createOrSetWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
546        return createOrSetWithParents(path, toByteData(data), createMode);
547    }
548
549    public Stat createOrSetWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
550        if (exists(path) != null) {
551            return setByteData(path, data);
552        }
553        try {
554            createWithParents(path, data, createMode);
555            return null;
556        } catch (KeeperException.NodeExistsException e) {
557            // this should not happen very often (race condition)
558            return setByteData(path, data);
559        }
560    }
561
562    public void fixACLs(String path, boolean recursive) throws InterruptedException, KeeperException {
563        if (exists(path) != null) {
564            doFixACLs(path, recursive);
565        }
566    }
567
568    private void doFixACLs(String path, boolean recursive) throws KeeperException, InterruptedException {
569        setACL(path, getNodeACLs(path), -1);
570        if (recursive) {
571            for (String child : getChildren(path)) {
572                doFixACLs(path.equals("/") ? "/" + child : path + "/" + child, recursive);
573            }
574        }
575    }
576
577    private List<ACL> getNodeACLs(String path) {
578        String acl = doGetNodeACLs(adjustPath(path));
579        if (acl == null) {
580            throw new IllegalStateException("Could not find matching ACLs for " + path);
581        }
582        return parseACLs(acl);
583    }
584
585    protected String doGetNodeACLs(String path) {
586        String longestPath = "";
587        for (String acl : acls.keySet()) {
588            if (acl.length() > longestPath.length() && path.startsWith(acl)) {
589                longestPath = acl;
590            }
591        }
592        return acls.get(longestPath);
593    }
594
595    private void createParents(String path) throws InterruptedException, KeeperException {
596        path = PathUtils.getParentPath(adjustPath(path));
597        path = PathUtils.removeTrailingSlash(path);
598        List<String> paths = new ArrayList<String>();
599        while(!path.equals("") && getZk().exists(path, false) == null) {
600            paths.add(path);
601            path = PathUtils.getParentPath(path);
602            path = PathUtils.removeTrailingSlash(path);
603        }
604        Collections.reverse(paths);
605        for(String p : paths) {
606            try {
607                getZk().create(p,
608                        null,
609                        getNodeACLs(p),
610                        CreateMode.PERSISTENT);
611            } catch(KeeperException.NodeExistsException e) {
612                // ok we continue...
613                if (LOG.isDebugEnabled()) {
614                    LOG.debug("parent already exists " + p);
615                }
616            }
617        }
618    }
619
620    private byte[] toByteData(String data) {
621        if (data == null) {
622            return null;
623        } else {
624            try {
625                return data.getBytes(CHARSET);
626            } catch(UnsupportedEncodingException e) {
627                throw new RuntimeException(e);
628            }
629        }
630    }
631}