19 package org.sleuthkit.autopsy.coordinationservice;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.concurrent.GuardedBy;
29 import javax.annotation.concurrent.ThreadSafe;
30 import org.apache.curator.RetryPolicy;
31 import org.apache.curator.framework.CuratorFramework;
32 import org.apache.curator.framework.CuratorFrameworkFactory;
33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
34 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
35 import org.apache.curator.retry.ExponentialBackoffRetry;
36 import org.apache.zookeeper.CreateMode;
37 import org.apache.zookeeper.KeeperException;
38 import org.apache.zookeeper.KeeperException.NoNodeException;
39 import org.apache.zookeeper.WatchedEvent;
40 import org.apache.zookeeper.ZooDefs;
41 import org.apache.zookeeper.ZooKeeper;
42 import org.openide.util.Lookup;
59 @GuardedBy(
"CoordinationService.class")
75 boolean result =
false;
76 Object workerThreadWaitNotifyLock =
new Object();
79 ZooKeeper zooKeeper =
new ZooKeeper(connectString, ZOOKEEPER_SESSION_TIMEOUT_MILLIS,
80 (WatchedEvent event) -> {
81 synchronized (workerThreadWaitNotifyLock) {
82 workerThreadWaitNotifyLock.notify();
85 synchronized (workerThreadWaitNotifyLock) {
86 workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS);
88 ZooKeeper.States state = zooKeeper.getState();
89 if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) {
109 Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
111 rootNode = it.next().getNamespaceRoot();
117 }
catch (IOException | InterruptedException | KeeperException | CoordinationServiceException ex) {
118 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
142 RetryPolicy retryPolicy =
new ExponentialBackoffRetry(1000, 3);
145 curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
151 String rootNode = rootNodeName;
153 if (!rootNode.startsWith(
"/")) {
154 rootNode =
"/" + rootNode;
158 String nodePath = rootNode +
"/" + node.getDisplayName();
160 curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
161 }
catch (KeeperException ex) {
162 if (ex.code() != KeeperException.Code.NODEEXISTS) {
165 }
catch (Exception ex) {
195 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
196 if (lock.writeLock().acquire(timeOut, timeUnit)) {
197 return new Lock(nodePath, lock.writeLock());
201 }
catch (Exception ex) {
202 if (ex instanceof InterruptedException) {
203 throw (InterruptedException) ex;
205 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
229 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
230 if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
233 return new Lock(nodePath, lock.writeLock());
234 }
catch (Exception ex) {
235 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
262 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
263 if (lock.readLock().acquire(timeOut, timeUnit)) {
264 return new Lock(nodePath, lock.readLock());
268 }
catch (Exception ex) {
269 if (ex instanceof InterruptedException) {
270 throw (InterruptedException) ex;
272 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
296 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
297 if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
300 return new Lock(nodePath, lock.readLock());
301 }
catch (Exception ex) {
302 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
323 return curator.getData().forPath(fullNodePath);
324 }
catch (NoNodeException ex) {
326 }
catch (Exception ex) {
327 if (ex instanceof InterruptedException) {
328 throw (InterruptedException) ex;
330 throw new CoordinationServiceException(String.format(
"Failed to get data for %s", fullNodePath), ex);
347 public void setNodeData(
CategoryNode category, String nodePath, byte[] data)
throws CoordinationServiceException, InterruptedException {
350 curator.setData().forPath(fullNodePath, data);
351 }
catch (Exception ex) {
352 if (ex instanceof InterruptedException) {
353 throw (InterruptedException) ex;
355 throw new CoordinationServiceException(String.format(
"Failed to set data for %s", fullNodePath), ex);
374 }
catch (Exception ex) {
375 throw new CoordinationServiceException(String.format(
"Failed to get node list for %s", category.getDisplayName()), ex);
389 if(nodePath.startsWith(
"/")){
399 public final static class CoordinationServiceException
extends Exception {
408 super(message, cause);
417 public static class Lock implements AutoCloseable {
426 private Lock(String nodePath, InterProcessMutex lock) {
428 this.interProcessLock = lock;
435 public void release() throws CoordinationServiceException {
437 this.interProcessLock.release();
438 }
catch (Exception ex) {
439 throw new CoordinationServiceException(String.format(
"Failed to release the lock on %s", nodePath), ex);
444 public void close() throws CoordinationServiceException {
464 this.displayName = displayName;
final InterProcessMutex interProcessLock
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
static final String DEFAULT_NAMESPACE_ROOT
static String getIndexingServerPort()
static final int CONNECTION_TIMEOUT_MILLISECONDS
CategoryNode(String displayName)
static final long serialVersionUID
byte[] getNodeData(CategoryNode category, String nodePath)
static CoordinationService instance
final Map< String, String > categoryNodeToPath
static final int ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS
static final int SESSION_TIMEOUT_MILLISECONDS
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
Lock(String nodePath, InterProcessMutex lock)
Lock tryGetSharedLock(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static boolean isZooKeeperAccessible()
void setNodeData(CategoryNode category, String nodePath, byte[] data)
CoordinationService(String rootNodeName)
CoordinationServiceException(String message)
static final int PORT_OFFSET
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static synchronized CoordinationService getInstance()
List< String > getNodeList(CategoryNode category)
static final int ZOOKEEPER_SESSION_TIMEOUT_MILLIS
CoordinationServiceException(String message, Throwable cause)
static String getIndexingServerHost()
final CuratorFramework curator