19 package org.sleuthkit.autopsy.coordinationservice;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.Iterator;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.TimeUnit;
27 import javax.annotation.concurrent.GuardedBy;
28 import javax.annotation.concurrent.ThreadSafe;
29 import org.apache.curator.RetryPolicy;
30 import org.apache.curator.framework.CuratorFramework;
31 import org.apache.curator.framework.CuratorFrameworkFactory;
32 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
33 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
34 import org.apache.curator.retry.ExponentialBackoffRetry;
35 import org.apache.zookeeper.CreateMode;
36 import org.apache.zookeeper.KeeperException;
37 import org.apache.zookeeper.KeeperException.NoNodeException;
38 import org.apache.zookeeper.WatchedEvent;
39 import org.apache.zookeeper.ZooDefs;
40 import org.apache.zookeeper.ZooKeeper;
41 import org.openide.util.Lookup;
58 @GuardedBy(
74 boolean result =
75 Object workerThreadWaitNotifyLock =
new Object();
78 ZooKeeper zooKeeper =
79 (WatchedEvent event) -> {
80 synchronized (workerThreadWaitNotifyLock) {
81 workerThreadWaitNotifyLock.notify();
84 synchronized (workerThreadWaitNotifyLock) {
85 workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS);
87 ZooKeeper.States state = zooKeeper.getState();
88 if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) {
108 Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
110 rootNode = it.next().getNamespaceRoot();
116 }
catch (IOException | InterruptedException | KeeperException | CoordinationServiceException ex) {
117 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
141 RetryPolicy retryPolicy =
new ExponentialBackoffRetry(1000, 3);
144 curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
150 String rootNode = rootNodeName;
151 if (!rootNode.startsWith(
"/")) {
152 rootNode =
"/" + rootNode;
156 String nodePath = rootNode +
"/" + node.getDisplayName();
158 curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
159 }
catch (KeeperException ex) {
160 if (ex.code() != KeeperException.Code.NODEEXISTS) {
163 }
catch (Exception ex) {
193 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
194 if (lock.writeLock().acquire(timeOut, timeUnit)) {
195 return new Lock(nodePath, lock.writeLock());
199 }
catch (Exception ex) {
200 if (ex instanceof InterruptedException) {
201 throw (InterruptedException) ex;
203 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
227 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
228 if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
231 return new Lock(nodePath, lock.writeLock());
232 }
catch (Exception ex) {
233 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
260 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
261 if (lock.readLock().acquire(timeOut, timeUnit)) {
262 return new Lock(nodePath, lock.readLock());
266 }
catch (Exception ex) {
267 if (ex instanceof InterruptedException) {
268 throw (InterruptedException) ex;
270 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
294 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
295 if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
298 return new Lock(nodePath, lock.readLock());
299 }
catch (Exception ex) {
300 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
321 return curator.getData().forPath(fullNodePath);
322 }
catch (NoNodeException ex) {
324 }
catch (Exception ex) {
325 if (ex instanceof InterruptedException) {
326 throw (InterruptedException) ex;
328 throw new CoordinationServiceException(String.format(
"Failed to get data for %s", fullNodePath), ex);
345 public void setNodeData(
CategoryNode category, String nodePath, byte[] data)
throws CoordinationServiceException, InterruptedException {
348 curator.setData().forPath(fullNodePath, data);
349 }
catch (Exception ex) {
350 if (ex instanceof InterruptedException) {
351 throw (InterruptedException) ex;
353 throw new CoordinationServiceException(String.format(
"Failed to set data for %s", fullNodePath), ex);
373 public final static class CoordinationServiceException
extends Exception {
382 super(message, cause);
391 public static class Lock implements AutoCloseable {
400 private Lock(String nodePath, InterProcessMutex lock) {
402 this.interProcessLock = lock;
409 public void release() throws CoordinationServiceException {
411 this.interProcessLock.release();
412 }
catch (Exception ex) {
413 throw new CoordinationServiceException(String.format(
"Failed to release the lock on %s", nodePath), ex);
418 public void close() throws CoordinationServiceException {
436 this.displayName = displayName;
final InterProcessMutex interProcessLock
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
static final String DEFAULT_NAMESPACE_ROOT
static String getIndexingServerPort()
CategoryNode(String displayName)
static final long serialVersionUID
byte[] getNodeData(CategoryNode category, String nodePath)
static CoordinationService instance
final Map< String, String > categoryNodeToPath
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
Lock(String nodePath, InterProcessMutex lock)
Lock tryGetSharedLock(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static boolean isZooKeeperAccessible()
void setNodeData(CategoryNode category, String nodePath, byte[] data)
CoordinationService(String rootNodeName)
CoordinationServiceException(String message)
static final int PORT_OFFSET
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static synchronized CoordinationService getInstance()
CoordinationServiceException(String message, Throwable cause)
static String getIndexingServerHost()
final CuratorFramework curator