19 package org.sleuthkit.autopsy.coordinationservice;
 
   21 import java.io.IOException;
 
   22 import java.util.Collection;
 
   23 import java.util.Iterator;
 
   24 import java.util.List;
 
   26 import java.util.concurrent.ConcurrentHashMap;
 
   27 import java.util.concurrent.TimeUnit;
 
   28 import javax.annotation.concurrent.GuardedBy;
 
   29 import javax.annotation.concurrent.ThreadSafe;
 
   30 import org.apache.curator.RetryPolicy;
 
   31 import org.apache.curator.framework.CuratorFramework;
 
   32 import org.apache.curator.framework.CuratorFrameworkFactory;
 
   33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 
   34 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
 
   35 import org.apache.curator.retry.ExponentialBackoffRetry;
 
   36 import org.apache.zookeeper.CreateMode;
 
   37 import org.apache.zookeeper.KeeperException;
 
   38 import org.apache.zookeeper.KeeperException.NoNodeException;
 
   39 import org.apache.zookeeper.ZooDefs;
 
   40 import org.openide.util.Lookup;
 
   56     @GuardedBy(
"CoordinationService.class")
 
   72         if (null == instance) {
 
   75             Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
 
   77                 rootNode = it.next().getNamespaceRoot();
 
   83             } 
catch (IOException | KeeperException | CoordinationServiceException ex) {
 
   84                 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
 
   85             } 
catch (InterruptedException ex) {
 
   92                 Thread.currentThread().interrupt();
 
   93                 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
 
  113         if (hostName.isEmpty() || port.isEmpty()) {
 
  117             port = Integer.toString(portInt);
 
  128         RetryPolicy retryPolicy = 
new ExponentialBackoffRetry(1000, 3);
 
  129         String connectString = hostName + 
":" + port;
 
  130         curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
 
  136         String rootNode = rootNodeName;
 
  138         if (!rootNode.startsWith(
"/")) {
 
  139             rootNode = 
"/" + rootNode;
 
  143             String nodePath = rootNode + 
"/" + node.getDisplayName();
 
  145                 curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
 
  146             } 
catch (KeeperException ex) {
 
  147                 if (ex.code() != KeeperException.Code.NODEEXISTS) {
 
  150             } 
catch (Exception ex) {
 
  180             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  181             if (lock.writeLock().acquire(timeOut, timeUnit)) {
 
  182                 return new Lock(nodePath, lock.writeLock());
 
  186         } 
catch (Exception ex) {
 
  187             if (ex instanceof InterruptedException) {
 
  188                 throw (InterruptedException) ex;
 
  190                 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
 
  214             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  215             if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
 
  218             return new Lock(nodePath, lock.writeLock());
 
  219         } 
catch (Exception ex) {
 
  220             throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
 
  247             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  248             if (lock.readLock().acquire(timeOut, timeUnit)) {
 
  249                 return new Lock(nodePath, lock.readLock());
 
  253         } 
catch (Exception ex) {
 
  254             if (ex instanceof InterruptedException) {
 
  255                 throw (InterruptedException) ex;
 
  257                 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
 
  281             InterProcessReadWriteLock lock = 
new InterProcessReadWriteLock(
curator, fullNodePath);
 
  282             if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
 
  285             return new Lock(nodePath, lock.readLock());
 
  286         } 
catch (Exception ex) {
 
  287             throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
 
  308             return curator.getData().forPath(fullNodePath);
 
  309         } 
catch (NoNodeException ex) {
 
  311         } 
catch (Exception ex) {
 
  312             if (ex instanceof InterruptedException) {
 
  313                 throw (InterruptedException) ex;
 
  315                 throw new CoordinationServiceException(String.format(
"Failed to get data for %s", fullNodePath), ex);
 
  332     public void setNodeData(
CategoryNode category, String nodePath, byte[] data) 
throws CoordinationServiceException, InterruptedException {
 
  335             curator.setData().forPath(fullNodePath, data);
 
  336         } 
catch (Exception ex) {
 
  337             if (ex instanceof InterruptedException) {
 
  338                 throw (InterruptedException) ex;
 
  340                 throw new CoordinationServiceException(String.format(
"Failed to set data for %s", fullNodePath), ex);
 
  357     public void deleteNode(
CategoryNode category, String nodePath) 
throws CoordinationServiceException, InterruptedException {
 
  360             curator.delete().forPath(fullNodePath);
 
  361         } 
catch (Exception ex) {
 
  362             if (ex instanceof InterruptedException) {
 
  363                 throw (InterruptedException) ex;
 
  365                 throw new CoordinationServiceException(String.format(
"Failed to delete node %s", fullNodePath), ex);
 
  387         } 
catch (Exception ex) {
 
  388             if (ex instanceof InterruptedException) {
 
  389                 throw (InterruptedException) ex;
 
  391                 throw new CoordinationServiceException(String.format(
"Failed to get node list for %s", category.getDisplayName()), ex);
 
  406         if (nodePath.startsWith(
"/")) {
 
  416     public final static class CoordinationServiceException 
extends Exception {
 
  425             super(message, cause);
 
  434     public static class Lock implements AutoCloseable {
 
  443         private Lock(String nodePath, InterProcessMutex lock) {
 
  445             this.interProcessLock = lock;
 
  452         public void release() throws CoordinationServiceException {
 
  454                 this.interProcessLock.release();
 
  455             } 
catch (Exception ex) {
 
  456                 throw new CoordinationServiceException(String.format(
"Failed to release the lock on %s", nodePath), ex);
 
  461         public void close() throws CoordinationServiceException {
 
  481             this.displayName = displayName;
 
final InterProcessMutex interProcessLock
 
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
 
void deleteNode(CategoryNode category, String nodePath)
 
static final String DEFAULT_NAMESPACE_ROOT
 
static String getIndexingServerPort()
 
static String getZkServerPort()
 
static final int CONNECTION_TIMEOUT_MILLISECONDS
 
CategoryNode(String displayName)
 
static final long serialVersionUID
 
static String getZkServerHost()
 
byte[] getNodeData(CategoryNode category, String nodePath)
 
static CoordinationService instance
 
final Map< String, String > categoryNodeToPath
 
static final int SESSION_TIMEOUT_MILLISECONDS
 
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
 
Lock(String nodePath, InterProcessMutex lock)
 
Lock tryGetSharedLock(CategoryNode category, String nodePath)
 
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
 
void setNodeData(CategoryNode category, String nodePath, byte[] data)
 
CoordinationService(String rootNodeName)
 
CoordinationServiceException(String message)
 
static final int PORT_OFFSET
 
static boolean isZooKeeperAccessible(String hostName, String port)
 
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
 
static synchronized CoordinationService getInstance()
 
List< String > getNodeList(CategoryNode category)
 
CoordinationServiceException(String message, Throwable cause)
 
static String getIndexingServerHost()
 
final CuratorFramework curator