001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.leveldb.replicated.groups; 018 019import java.io.UnsupportedEncodingException; 020import java.lang.reflect.Field; 021import java.lang.reflect.Method; 022import java.util.*; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.TimeoutException; 027import java.util.concurrent.atomic.AtomicBoolean; 028 029import org.apache.zookeeper.CreateMode; 030import org.apache.zookeeper.KeeperException; 031import org.apache.zookeeper.WatchedEvent; 032import org.apache.zookeeper.Watcher; 033import org.apache.zookeeper.ZooDefs; 034import org.apache.zookeeper.data.ACL; 035import org.apache.zookeeper.data.Id; 036import org.apache.zookeeper.data.Stat; 037import org.linkedin.util.clock.Clock; 038import org.linkedin.util.clock.SystemClock; 039import org.linkedin.util.clock.Timespan; 040import org.linkedin.util.concurrent.ConcurrentUtils; 041import org.linkedin.util.io.PathUtils; 042import org.linkedin.zookeeper.client.ChrootedZKClient; 043import org.linkedin.zookeeper.client.IZooKeeper; 044import org.linkedin.zookeeper.client.IZooKeeperFactory; 045import org.linkedin.zookeeper.client.LifecycleListener; 046import org.linkedin.zookeeper.client.ZooKeeperFactory; 047import org.osgi.framework.InvalidSyntaxException; 048import org.osgi.service.cm.ConfigurationException; 049import org.slf4j.Logger; 050 051public class ZKClient extends org.linkedin.zookeeper.client.AbstractZKClient implements Watcher { 052 053 private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ZKClient.class.getName()); 054 055 private Map<String, String> acls; 056 private String password; 057 058 059 public void start() throws Exception { 060 // Grab the lock to make sure that the registration of the ManagedService 061 // won't be updated immediately but that the initial update will happen first 062 synchronized (_lock) { 063 _stateChangeDispatcher.setDaemon(true); 064 _stateChangeDispatcher.start(); 065 066 doStart(); 067 } 068 } 069 070 public void setACLs(Map<String, String> acls) { 071 this.acls = acls; 072 } 073 074 public void setPassword(String password) { 075 this.password = password; 076 } 077 078 protected void doStart() throws InvalidSyntaxException, ConfigurationException, UnsupportedEncodingException { 079 connect(); 080 } 081 082 @Override 083 public void close() { 084 if (_stateChangeDispatcher != null) { 085 _stateChangeDispatcher.end(); 086 try { 087 _stateChangeDispatcher.join(1000); 088 } catch(Exception e) { 089 LOG.debug("ignored exception", e); 090 } 091 } 092 synchronized(_lock) { 093 if (_zk != null) { 094 try { 095 changeState(State.NONE); 096 _zk.close(); 097 // We try to avoid a NPE when shutting down fabric: 098 // java.lang.NullPointerException 099 // at org.apache.felix.framework.BundleWiringImpl.findClassOrResourceByDelegation(BundleWiringImpl.java:1433) 100 // at org.apache.felix.framework.BundleWiringImpl.access$400(BundleWiringImpl.java:73) 101 // at org.apache.felix.framework.BundleWiringImpl$BundleClassLoader.loadClass(BundleWiringImpl.java:1844) 102 // at java.lang.ClassLoader.loadClass(ClassLoader.java:247) 103 // at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1089) 104 Thread th = getSendThread(); 105 if (th != null) { 106 th.join(1000); 107 } 108 _zk = null; 109 } catch(Exception e) { 110 LOG.debug("ignored exception", e); 111 } 112 } 113 } 114 } 115 116 protected Thread getSendThread() { 117 try { 118 return (Thread) getField(_zk, "_zk", "cnxn", "sendThread"); 119 } catch (Throwable e) { 120 return null; 121 } 122 } 123 124 protected Object getField(Object obj, String... names) throws Exception { 125 for (String name : names) { 126 obj = getField(obj, name); 127 } 128 return obj; 129 } 130 131 protected Object getField(Object obj, String name) throws Exception { 132 Class clazz = obj.getClass(); 133 while (clazz != null) { 134 for (Field f : clazz.getDeclaredFields()) { 135 if (f.getName().equals(name)) { 136 f.setAccessible(true); 137 return f.get(obj); 138 } 139 } 140 } 141 throw new NoSuchFieldError(name); 142 } 143 144 protected void changeState(State newState) { 145 synchronized (_lock) { 146 State oldState = _state; 147 if (oldState != newState) { 148 _stateChangeDispatcher.addEvent(oldState, newState); 149 _state = newState; 150 _lock.notifyAll(); 151 } 152 } 153 } 154 155 public void testGenerateConnectionLoss() throws Exception { 156 waitForConnected(); 157 Object clientCnxnSocket = getField(_zk, "_zk", "cnxn", "sendThread", "clientCnxnSocket"); 158 callMethod(clientCnxnSocket, "testableCloseSocket"); 159 } 160 161 protected Object callMethod(Object obj, String name, Object... args) throws Exception { 162 Class clazz = obj.getClass(); 163 while (clazz != null) { 164 for (Method m : clazz.getDeclaredMethods()) { 165 if (m.getName().equals(name)) { 166 m.setAccessible(true); 167 return m.invoke(obj, args); 168 } 169 } 170 } 171 throw new NoSuchMethodError(name); 172 } 173 174 protected void tryConnect() { 175 synchronized (_lock) { 176 try { 177 connect(); 178 } catch (Throwable e) { 179 LOG.warn("Error while restarting:", e); 180 if (_expiredSessionRecovery == null) { 181 _expiredSessionRecovery = new ExpiredSessionRecovery(); 182 _expiredSessionRecovery.setDaemon(true); 183 _expiredSessionRecovery.start(); 184 } 185 } 186 } 187 } 188 189 public void connect() throws UnsupportedEncodingException { 190 synchronized (_lock) { 191 changeState(State.CONNECTING); 192 _zk = _factory.createZooKeeper(this); 193 if (password != null) { 194 _zk.addAuthInfo("digest", ("fabric:" + password).getBytes("UTF-8")); 195 } 196 } 197 } 198 199 public void process(WatchedEvent event) { 200 if (event.getState() != null) { 201 LOG.debug("event: {}", event.getState()); 202 synchronized (_lock) { 203 switch(event.getState()) 204 { 205 case SyncConnected: 206 changeState(State.CONNECTED); 207 break; 208 209 case Disconnected: 210 if(_state != State.NONE) { 211 changeState(State.RECONNECTING); 212 } 213 break; 214 215 case Expired: 216 // when expired, the zookeeper object is invalid and we need to recreate a new one 217 _zk = null; 218 LOG.warn("Expiration detected: trying to restart..."); 219 tryConnect(); 220 break; 221 default: 222 LOG.warn("unprocessed event state: {}", event.getState()); 223 } 224 } 225 } 226 } 227 228 @Override 229 protected IZooKeeper getZk() { 230 State state = _state; 231 if (state == State.NONE) { 232 throw new IllegalStateException("ZooKeeper client has not been configured yet. You need to either create an ensemble or join one."); 233 } else if (state != State.CONNECTED) { 234 try { 235 waitForConnected(); 236 } catch (Exception e) { 237 throw new IllegalStateException("Error waiting for ZooKeeper connection", e); 238 } 239 } 240 IZooKeeper zk = _zk; 241 if (zk == null) { 242 throw new IllegalStateException("No ZooKeeper connection available"); 243 } 244 return zk; 245 } 246 247 public void waitForConnected(Timespan timeout) throws InterruptedException, TimeoutException { 248 waitForState(State.CONNECTED, timeout); 249 } 250 251 public void waitForConnected() throws InterruptedException, TimeoutException { 252 waitForConnected(null); 253 } 254 255 public void waitForState(State state, Timespan timeout) throws TimeoutException, InterruptedException { 256 long endTime = (timeout == null ? sessionTimeout : timeout).futureTimeMillis(_clock); 257 if (_state != state) { 258 synchronized (_lock) { 259 while (_state != state) { 260 ConcurrentUtils.awaitUntil(_clock, _lock, endTime); 261 } 262 } 263 } 264 } 265 266 @Override 267 public void registerListener(LifecycleListener listener) { 268 if (listener == null) { 269 throw new IllegalStateException("listener is null"); 270 } 271 if (!_listeners.contains(listener)) { 272 _listeners.add(listener); 273 274 } 275 if (_state == State.CONNECTED) { 276 listener.onConnected(); 277 //_stateChangeDispatcher.addEvent(null, State.CONNECTED); 278 } 279 } 280 281 @Override 282 public void removeListener(LifecycleListener listener) { 283 if (listener == null) { 284 throw new IllegalStateException("listener is null"); 285 } 286 _listeners.remove(listener); 287 } 288 289 @Override 290 public org.linkedin.zookeeper.client.IZKClient chroot(String path) { 291 return new ChrootedZKClient(this, adjustPath(path)); 292 } 293 294 @Override 295 public boolean isConnected() { 296 return _state == State.CONNECTED; 297 } 298 299 public boolean isConfigured() { 300 return _state != State.NONE; 301 } 302 303 @Override 304 public String getConnectString() { 305 return _factory.getConnectString(); 306 } 307 308 public static enum State { 309 NONE, 310 CONNECTING, 311 CONNECTED, 312 RECONNECTING 313 } 314 315 private final static String CHARSET = "UTF-8"; 316 317 private final Clock _clock = SystemClock.instance(); 318 private final List<LifecycleListener> _listeners = new CopyOnWriteArrayList<LifecycleListener>(); 319 320 protected final Object _lock = new Object(); 321 protected volatile State _state = State.NONE; 322 323 private final StateChangeDispatcher _stateChangeDispatcher = new StateChangeDispatcher(); 324 325 protected IZooKeeperFactory _factory; 326 protected IZooKeeper _zk; 327 protected Timespan _reconnectTimeout = Timespan.parse("20s"); 328 protected Timespan sessionTimeout = new Timespan(30, Timespan.TimeUnit.SECOND); 329 330 private ExpiredSessionRecovery _expiredSessionRecovery = null; 331 332 private class StateChangeDispatcher extends Thread { 333 private final AtomicBoolean _running = new AtomicBoolean(true); 334 private final BlockingQueue<Boolean> _events = new LinkedBlockingQueue<Boolean>(); 335 336 private StateChangeDispatcher() { 337 super("ZooKeeper state change dispatcher thread"); 338 } 339 340 @Override 341 public void run() { 342 Map<Object, Boolean> history = new IdentityHashMap<Object, Boolean>(); 343 LOG.info("Starting StateChangeDispatcher"); 344 while (_running.get()) { 345 Boolean isConnectedEvent; 346 try { 347 isConnectedEvent = _events.take(); 348 } catch (InterruptedException e) { 349 continue; 350 } 351 if (!_running.get() || isConnectedEvent == null) { 352 continue; 353 } 354 Map<Object, Boolean> newHistory = callListeners(history, isConnectedEvent); 355 // we save which event each listener has seen last 356 // we don't update the map in place because we need to get rid of unregistered listeners 357 history = newHistory; 358 } 359 LOG.info("StateChangeDispatcher terminated."); 360 } 361 362 public void end() { 363 _running.set(false); 364 _events.add(false); 365 } 366 367 public void addEvent(ZKClient.State oldState, ZKClient.State newState) { 368 LOG.debug("addEvent: {} => {}", oldState, newState); 369 if (newState == ZKClient.State.CONNECTED) { 370 _events.add(true); 371 } else if (oldState == ZKClient.State.CONNECTED) { 372 _events.add(false); 373 } 374 } 375 } 376 377 protected Map<Object, Boolean> callListeners(Map<Object, Boolean> history, Boolean connectedEvent) { 378 Map<Object, Boolean> newHistory = new IdentityHashMap<Object, Boolean>(); 379 for (LifecycleListener listener : _listeners) { 380 Boolean previousEvent = history.get(listener); 381 // we propagate the event only if it was not already sent 382 if (previousEvent == null || previousEvent != connectedEvent) { 383 try { 384 if (connectedEvent) { 385 listener.onConnected(); 386 } else { 387 listener.onDisconnected(); 388 } 389 } catch (Throwable e) { 390 LOG.warn("Exception while executing listener (ignored)", e); 391 } 392 } 393 newHistory.put(listener, connectedEvent); 394 } 395 return newHistory; 396 } 397 398 private class ExpiredSessionRecovery extends Thread { 399 private ExpiredSessionRecovery() { 400 super("ZooKeeper expired session recovery thread"); 401 } 402 403 @Override 404 public void run() { 405 LOG.info("Entering recovery mode"); 406 synchronized(_lock) { 407 try { 408 int count = 0; 409 while (_state == ZKClient.State.NONE) { 410 try { 411 count++; 412 LOG.warn("Recovery mode: trying to reconnect to zookeeper [" + count + "]"); 413 ZKClient.this.connect(); 414 } catch (Throwable e) { 415 LOG.warn("Recovery mode: reconnect attempt failed [" + count + "]... waiting for " + _reconnectTimeout, e); 416 try { 417 _lock.wait(_reconnectTimeout.getDurationInMilliseconds()); 418 } catch(InterruptedException e1) { 419 throw new RuntimeException("Recovery mode: wait interrupted... bailing out", e1); 420 } 421 } 422 } 423 } finally { 424 _expiredSessionRecovery = null; 425 LOG.info("Exiting recovery mode."); 426 } 427 } 428 } 429 } 430 431 /** 432 * Constructor 433 */ 434 public ZKClient(String connectString, Timespan sessionTimeout, Watcher watcher) 435 { 436 this(new ZooKeeperFactory(connectString, sessionTimeout, watcher)); 437 } 438 439 /** 440 * Constructor 441 */ 442 public ZKClient(IZooKeeperFactory factory) 443 { 444 this(factory, null); 445 } 446 447 /** 448 * Constructor 449 */ 450 public ZKClient(IZooKeeperFactory factory, String chroot) 451 { 452 super(chroot); 453 _factory = factory; 454 Map<String, String> acls = new HashMap<String, String>(); 455 acls.put("/", "world:anyone:acdrw"); 456 setACLs(acls); 457 } 458 459 static private int getPermFromString(String permString) { 460 int perm = 0; 461 for (int i = 0; i < permString.length(); i++) { 462 switch (permString.charAt(i)) { 463 case 'r': 464 perm |= ZooDefs.Perms.READ; 465 break; 466 case 'w': 467 perm |= ZooDefs.Perms.WRITE; 468 break; 469 case 'c': 470 perm |= ZooDefs.Perms.CREATE; 471 break; 472 case 'd': 473 perm |= ZooDefs.Perms.DELETE; 474 break; 475 case 'a': 476 perm |= ZooDefs.Perms.ADMIN; 477 break; 478 default: 479 System.err 480 .println("Unknown perm type: " + permString.charAt(i)); 481 } 482 } 483 return perm; 484 } 485 486 private static List<ACL> parseACLs(String aclString) { 487 List<ACL> acl; 488 String acls[] = aclString.split(","); 489 acl = new ArrayList<ACL>(); 490 for (String a : acls) { 491 int firstColon = a.indexOf(':'); 492 int lastColon = a.lastIndexOf(':'); 493 if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) { 494 System.err 495 .println(a + " does not have the form scheme:id:perm"); 496 continue; 497 } 498 ACL newAcl = new ACL(); 499 newAcl.setId(new Id(a.substring(0, firstColon), a.substring( 500 firstColon + 1, lastColon))); 501 newAcl.setPerms(getPermFromString(a.substring(lastColon + 1))); 502 acl.add(newAcl); 503 } 504 return acl; 505 } 506 507 public Stat createOrSetByteWithParents(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException { 508 if (exists(path) != null) { 509 return setByteData(path, data); 510 } 511 try { 512 createBytesNodeWithParents(path, data, acl, createMode); 513 return null; 514 } catch(KeeperException.NodeExistsException e) { 515 // this should not happen very often (race condition) 516 return setByteData(path, data); 517 } 518 } 519 520 public String create(String path, CreateMode createMode) throws InterruptedException, KeeperException { 521 return create(path, (byte[]) null, createMode); 522 } 523 524 public String create(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException { 525 return create(path, toByteData(data), createMode); 526 } 527 528 public String create(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException { 529 return getZk().create(adjustPath(path), data, getNodeACLs(path), createMode); 530 } 531 532 public String createWithParents(String path, CreateMode createMode) throws InterruptedException, KeeperException { 533 return createWithParents(path, (byte[]) null, createMode); 534 } 535 536 public String createWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException { 537 return createWithParents(path, toByteData(data), createMode); 538 } 539 540 public String createWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException { 541 createParents(path); 542 return create(path, data, createMode); 543 } 544 545 public Stat createOrSetWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException { 546 return createOrSetWithParents(path, toByteData(data), createMode); 547 } 548 549 public Stat createOrSetWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException { 550 if (exists(path) != null) { 551 return setByteData(path, data); 552 } 553 try { 554 createWithParents(path, data, createMode); 555 return null; 556 } catch (KeeperException.NodeExistsException e) { 557 // this should not happen very often (race condition) 558 return setByteData(path, data); 559 } 560 } 561 562 public void fixACLs(String path, boolean recursive) throws InterruptedException, KeeperException { 563 if (exists(path) != null) { 564 doFixACLs(path, recursive); 565 } 566 } 567 568 private void doFixACLs(String path, boolean recursive) throws KeeperException, InterruptedException { 569 setACL(path, getNodeACLs(path), -1); 570 if (recursive) { 571 for (String child : getChildren(path)) { 572 doFixACLs(path.equals("/") ? "/" + child : path + "/" + child, recursive); 573 } 574 } 575 } 576 577 private List<ACL> getNodeACLs(String path) { 578 String acl = doGetNodeACLs(adjustPath(path)); 579 if (acl == null) { 580 throw new IllegalStateException("Could not find matching ACLs for " + path); 581 } 582 return parseACLs(acl); 583 } 584 585 protected String doGetNodeACLs(String path) { 586 String longestPath = ""; 587 for (String acl : acls.keySet()) { 588 if (acl.length() > longestPath.length() && path.startsWith(acl)) { 589 longestPath = acl; 590 } 591 } 592 return acls.get(longestPath); 593 } 594 595 private void createParents(String path) throws InterruptedException, KeeperException { 596 path = PathUtils.getParentPath(adjustPath(path)); 597 path = PathUtils.removeTrailingSlash(path); 598 List<String> paths = new ArrayList<String>(); 599 while(!path.equals("") && getZk().exists(path, false) == null) { 600 paths.add(path); 601 path = PathUtils.getParentPath(path); 602 path = PathUtils.removeTrailingSlash(path); 603 } 604 Collections.reverse(paths); 605 for(String p : paths) { 606 try { 607 getZk().create(p, 608 null, 609 getNodeACLs(p), 610 CreateMode.PERSISTENT); 611 } catch(KeeperException.NodeExistsException e) { 612 // ok we continue... 613 if (LOG.isDebugEnabled()) { 614 LOG.debug("parent already exists " + p); 615 } 616 } 617 } 618 } 619 620 private byte[] toByteData(String data) { 621 if (data == null) { 622 return null; 623 } else { 624 try { 625 return data.getBytes(CHARSET); 626 } catch(UnsupportedEncodingException e) { 627 throw new RuntimeException(e); 628 } 629 } 630 } 631}