19 package org.sleuthkit.autopsy.coordinationservice;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.concurrent.GuardedBy;
29 import javax.annotation.concurrent.ThreadSafe;
30 import org.apache.curator.RetryPolicy;
31 import org.apache.curator.framework.CuratorFramework;
32 import org.apache.curator.framework.CuratorFrameworkFactory;
33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
34 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
35 import org.apache.curator.retry.ExponentialBackoffRetry;
36 import org.apache.zookeeper.CreateMode;
37 import org.apache.zookeeper.KeeperException;
38 import org.apache.zookeeper.KeeperException.NoNodeException;
39 import org.apache.zookeeper.ZooDefs;
40 import org.openide.util.Lookup;
56 @GuardedBy(
"CoordinationService.class")
72 if (null == instance) {
75 Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
77 rootNode = it.next().getNamespaceRoot();
83 }
catch (IOException | KeeperException | CoordinationServiceException ex) {
84 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
85 }
catch (InterruptedException ex) {
92 Thread.currentThread().interrupt();
93 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
113 if (hostName.isEmpty() || port.isEmpty()) {
117 port = Integer.toString(portInt);
128 RetryPolicy retryPolicy =
new ExponentialBackoffRetry(1000, 3);
129 String connectString = hostName +
":" + port;
130 curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
136 String rootNode = rootNodeName;
138 if (!rootNode.startsWith(
"/")) {
139 rootNode =
"/" + rootNode;
143 String nodePath = rootNode +
"/" + node.getDisplayName();
145 curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
146 }
catch (KeeperException ex) {
147 if (ex.code() != KeeperException.Code.NODEEXISTS) {
150 }
catch (Exception ex) {
180 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
181 if (lock.writeLock().acquire(timeOut, timeUnit)) {
182 return new Lock(nodePath, lock.writeLock());
186 }
catch (Exception ex) {
187 if (ex instanceof InterruptedException) {
188 throw (InterruptedException) ex;
190 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
214 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
215 if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
218 return new Lock(nodePath, lock.writeLock());
219 }
catch (Exception ex) {
220 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
247 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
248 if (lock.readLock().acquire(timeOut, timeUnit)) {
249 return new Lock(nodePath, lock.readLock());
253 }
catch (Exception ex) {
254 if (ex instanceof InterruptedException) {
255 throw (InterruptedException) ex;
257 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
281 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
282 if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
285 return new Lock(nodePath, lock.readLock());
286 }
catch (Exception ex) {
287 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
308 return curator.getData().forPath(fullNodePath);
309 }
catch (NoNodeException ex) {
311 }
catch (Exception ex) {
312 if (ex instanceof InterruptedException) {
313 throw (InterruptedException) ex;
315 throw new CoordinationServiceException(String.format(
"Failed to get data for %s", fullNodePath), ex);
332 public void setNodeData(
CategoryNode category, String nodePath, byte[] data)
throws CoordinationServiceException, InterruptedException {
335 curator.setData().forPath(fullNodePath, data);
336 }
catch (Exception ex) {
337 if (ex instanceof InterruptedException) {
338 throw (InterruptedException) ex;
340 throw new CoordinationServiceException(String.format(
"Failed to set data for %s", fullNodePath), ex);
357 public void deleteNode(
CategoryNode category, String nodePath)
throws CoordinationServiceException, InterruptedException {
360 curator.delete().forPath(fullNodePath);
361 }
catch (Exception ex) {
362 if (ex instanceof InterruptedException) {
363 throw (InterruptedException) ex;
365 throw new CoordinationServiceException(String.format(
"Failed to delete node %s", fullNodePath), ex);
387 }
catch (Exception ex) {
388 if (ex instanceof InterruptedException) {
389 throw (InterruptedException) ex;
391 throw new CoordinationServiceException(String.format(
"Failed to get node list for %s", category.getDisplayName()), ex);
406 if (nodePath.startsWith(
"/")) {
416 public final static class CoordinationServiceException
extends Exception {
425 super(message, cause);
434 public static class Lock implements AutoCloseable {
443 private Lock(String nodePath, InterProcessMutex lock) {
445 this.interProcessLock = lock;
452 public void release() throws CoordinationServiceException {
454 this.interProcessLock.release();
455 }
catch (Exception ex) {
456 throw new CoordinationServiceException(String.format(
"Failed to release the lock on %s", nodePath), ex);
461 public void close() throws CoordinationServiceException {
481 this.displayName = displayName;
final InterProcessMutex interProcessLock
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
void deleteNode(CategoryNode category, String nodePath)
static final String DEFAULT_NAMESPACE_ROOT
static String getIndexingServerPort()
static String getZkServerPort()
static final int CONNECTION_TIMEOUT_MILLISECONDS
CategoryNode(String displayName)
static final long serialVersionUID
static String getZkServerHost()
byte[] getNodeData(CategoryNode category, String nodePath)
static CoordinationService instance
final Map< String, String > categoryNodeToPath
static final int SESSION_TIMEOUT_MILLISECONDS
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
Lock(String nodePath, InterProcessMutex lock)
Lock tryGetSharedLock(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
void setNodeData(CategoryNode category, String nodePath, byte[] data)
CoordinationService(String rootNodeName)
CoordinationServiceException(String message)
static final int PORT_OFFSET
static boolean isZooKeeperAccessible(String hostName, String port)
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static synchronized CoordinationService getInstance()
List< String > getNodeList(CategoryNode category)
CoordinationServiceException(String message, Throwable cause)
static String getIndexingServerHost()
final CuratorFramework curator