19 package org.sleuthkit.autopsy.coordinationservice;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.TimeUnit;
28 import java.util.logging.Level;
29 import java.util.logging.Logger;
30 import javax.annotation.concurrent.GuardedBy;
31 import javax.annotation.concurrent.ThreadSafe;
32 import org.apache.curator.RetryPolicy;
33 import org.apache.curator.framework.CuratorFramework;
34 import org.apache.curator.framework.CuratorFrameworkFactory;
35 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
36 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
37 import org.apache.curator.retry.ExponentialBackoffRetry;
38 import org.apache.zookeeper.CreateMode;
39 import org.apache.zookeeper.KeeperException;
40 import org.apache.zookeeper.KeeperException.NoNodeException;
41 import org.apache.zookeeper.WatchedEvent;
42 import org.apache.zookeeper.ZooDefs;
43 import org.apache.zookeeper.ZooKeeper;
44 import org.openide.util.Lookup;
61 @GuardedBy(
"CoordinationService.class")
77 boolean result =
false;
78 Object workerThreadWaitNotifyLock =
new Object();
81 ZooKeeper zooKeeper =
new ZooKeeper(connectString, ZOOKEEPER_SESSION_TIMEOUT_MILLIS,
82 (WatchedEvent event) -> {
83 synchronized (workerThreadWaitNotifyLock) {
84 workerThreadWaitNotifyLock.notify();
87 synchronized (workerThreadWaitNotifyLock) {
88 workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS);
90 ZooKeeper.States state = zooKeeper.getState();
91 if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) {
111 Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
113 rootNode = it.next().getNamespaceRoot();
119 }
catch (IOException | InterruptedException | KeeperException | CoordinationServiceException ex) {
120 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
144 RetryPolicy retryPolicy =
new ExponentialBackoffRetry(1000, 3);
147 curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
153 String rootNode = rootNodeName;
155 if (!rootNode.startsWith(
"/")) {
156 rootNode =
"/" + rootNode;
160 String nodePath = rootNode +
"/" + node.getDisplayName();
162 curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
163 }
catch (KeeperException ex) {
164 if (ex.code() != KeeperException.Code.NODEEXISTS) {
167 }
catch (Exception ex) {
197 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
198 if (lock.writeLock().acquire(timeOut, timeUnit)) {
199 return new Lock(nodePath, lock.writeLock());
203 }
catch (Exception ex) {
204 if (ex instanceof InterruptedException) {
205 throw (InterruptedException) ex;
207 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
231 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
232 if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
235 return new Lock(nodePath, lock.writeLock());
236 }
catch (Exception ex) {
237 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
264 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
265 if (lock.readLock().acquire(timeOut, timeUnit)) {
266 return new Lock(nodePath, lock.readLock());
270 }
catch (Exception ex) {
271 if (ex instanceof InterruptedException) {
272 throw (InterruptedException) ex;
274 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
298 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
299 if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
302 return new Lock(nodePath, lock.readLock());
303 }
catch (Exception ex) {
304 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
325 return curator.getData().forPath(fullNodePath);
326 }
catch (NoNodeException ex) {
328 }
catch (Exception ex) {
329 if (ex instanceof InterruptedException) {
330 throw (InterruptedException) ex;
332 throw new CoordinationServiceException(String.format(
"Failed to get data for %s", fullNodePath), ex);
349 public void setNodeData(
CategoryNode category, String nodePath, byte[] data)
throws CoordinationServiceException, InterruptedException {
352 curator.setData().forPath(fullNodePath, data);
353 }
catch (Exception ex) {
354 if (ex instanceof InterruptedException) {
355 throw (InterruptedException) ex;
357 throw new CoordinationServiceException(String.format(
"Failed to set data for %s", fullNodePath), ex);
376 }
catch (Exception ex) {
377 throw new CoordinationServiceException(String.format(
"Failed to get node list for %s", category.getDisplayName()), ex);
396 public final static class CoordinationServiceException
extends Exception {
405 super(message, cause);
414 public static class Lock implements AutoCloseable {
423 private Lock(String nodePath, InterProcessMutex lock) {
425 this.interProcessLock = lock;
432 public void release() throws CoordinationServiceException {
434 this.interProcessLock.release();
435 }
catch (Exception ex) {
436 throw new CoordinationServiceException(String.format(
"Failed to release the lock on %s", nodePath), ex);
441 public void close() throws CoordinationServiceException {
459 this.displayName = displayName;
final InterProcessMutex interProcessLock
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
static final String DEFAULT_NAMESPACE_ROOT
static String getIndexingServerPort()
static final int CONNECTION_TIMEOUT_MILLISECONDS
CategoryNode(String displayName)
static final long serialVersionUID
byte[] getNodeData(CategoryNode category, String nodePath)
static CoordinationService instance
final Map< String, String > categoryNodeToPath
static final int ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS
static final int SESSION_TIMEOUT_MILLISECONDS
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
Lock(String nodePath, InterProcessMutex lock)
Lock tryGetSharedLock(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static boolean isZooKeeperAccessible()
void setNodeData(CategoryNode category, String nodePath, byte[] data)
CoordinationService(String rootNodeName)
CoordinationServiceException(String message)
static final int PORT_OFFSET
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static synchronized CoordinationService getInstance()
List< String > getNodeList(CategoryNode category)
static final int ZOOKEEPER_SESSION_TIMEOUT_MILLIS
CoordinationServiceException(String message, Throwable cause)
static String getIndexingServerHost()
final CuratorFramework curator