19 package org.sleuthkit.autopsy.coordinationservice;
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.concurrent.GuardedBy;
29 import javax.annotation.concurrent.ThreadSafe;
30 import org.apache.curator.RetryPolicy;
31 import org.apache.curator.framework.CuratorFramework;
32 import org.apache.curator.framework.CuratorFrameworkFactory;
33 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
34 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
35 import org.apache.curator.retry.ExponentialBackoffRetry;
36 import org.apache.curator.utils.ZKPaths;
37 import org.apache.zookeeper.CreateMode;
38 import org.apache.zookeeper.KeeperException;
39 import org.apache.zookeeper.KeeperException.NoNodeException;
40 import org.apache.zookeeper.ZooDefs;
41 import org.openide.util.Lookup;
57 @GuardedBy(
"CoordinationService.class")
73 if (null == instance) {
76 Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
78 rootNode = it.next().getNamespaceRoot();
84 }
catch (IOException | KeeperException | CoordinationServiceException ex) {
85 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
86 }
catch (InterruptedException ex) {
93 Thread.currentThread().interrupt();
94 throw new CoordinationServiceException(
"Failed to create coordination service", ex);
114 if (hostName.isEmpty() || port.isEmpty()) {
118 port = Integer.toString(portInt);
129 RetryPolicy retryPolicy =
new ExponentialBackoffRetry(1000, 3);
130 String connectString = hostName +
":" + port;
131 curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
137 String rootNode = rootNodeName;
139 if (!rootNode.startsWith(
"/")) {
140 rootNode =
"/" + rootNode;
144 String nodePath = rootNode +
"/" + node.getDisplayName();
146 curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath);
147 }
catch (KeeperException ex) {
148 if (ex.code() != KeeperException.Code.NODEEXISTS) {
151 }
catch (Exception ex) {
170 while(fullNodePath.endsWith(
"/")) {
171 fullNodePath = fullNodePath.substring(0, fullNodePath.length() - 1);
176 ZKPaths.mkdirs(
curator.getZookeeperClient().getZooKeeper(), fullNodePath);
178 }
catch (Exception ex) {
204 String fullNodePath =
"";
208 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
209 if (lock.writeLock().acquire(timeOut, timeUnit)) {
210 return new Lock(nodePath, lock.writeLock());
214 }
catch (Exception ex) {
215 if (ex instanceof InterruptedException) {
216 throw (InterruptedException) ex;
218 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
240 String fullNodePath =
"";
244 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
245 if (!lock.writeLock().acquire(0, TimeUnit.SECONDS)) {
248 return new Lock(nodePath, lock.writeLock());
249 }
catch (Exception ex) {
250 throw new CoordinationServiceException(String.format(
"Failed to get exclusive lock for %s", fullNodePath), ex);
275 String fullNodePath =
"";
279 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
280 if (lock.readLock().acquire(timeOut, timeUnit)) {
281 return new Lock(nodePath, lock.readLock());
285 }
catch (Exception ex) {
286 if (ex instanceof InterruptedException) {
287 throw (InterruptedException) ex;
289 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
311 String fullNodePath =
"";
315 InterProcessReadWriteLock lock =
new InterProcessReadWriteLock(
curator, fullNodePath);
316 if (!lock.readLock().acquire(0, TimeUnit.SECONDS)) {
319 return new Lock(nodePath, lock.readLock());
320 }
catch (Exception ex) {
321 throw new CoordinationServiceException(String.format(
"Failed to get shared lock for %s", fullNodePath), ex);
340 String fullNodePath =
"";
346 return curator.getData().forPath(fullNodePath);
347 }
catch (NoNodeException ex) {
349 }
catch (Exception ex) {
350 if (ex instanceof InterruptedException) {
351 throw (InterruptedException) ex;
353 throw new CoordinationServiceException(String.format(
"Failed to get data for %s", fullNodePath), ex);
370 public void setNodeData(
CategoryNode category, String nodePath, byte[] data)
throws CoordinationServiceException, InterruptedException {
373 curator.setData().forPath(fullNodePath, data);
374 }
catch (Exception ex) {
375 if (ex instanceof InterruptedException) {
376 throw (InterruptedException) ex;
378 throw new CoordinationServiceException(String.format(
"Failed to set data for %s", fullNodePath), ex);
395 public void deleteNode(
CategoryNode category, String nodePath)
throws CoordinationServiceException, InterruptedException {
398 curator.delete().forPath(fullNodePath);
399 }
catch (Exception ex) {
400 if (ex instanceof InterruptedException) {
401 throw (InterruptedException) ex;
403 throw new CoordinationServiceException(String.format(
"Failed to delete node %s", fullNodePath), ex);
425 }
catch (Exception ex) {
426 if (ex instanceof InterruptedException) {
427 throw (InterruptedException) ex;
429 throw new CoordinationServiceException(String.format(
"Failed to get node list for %s", category.getDisplayName()), ex);
444 if (nodePath.startsWith(
"/")) {
454 public final static class CoordinationServiceException
extends Exception {
463 super(message, cause);
472 public static class Lock implements AutoCloseable {
481 private Lock(String nodePath, InterProcessMutex lock) {
483 this.interProcessLock = lock;
490 public void release() throws CoordinationServiceException {
492 this.interProcessLock.release();
493 }
catch (Exception ex) {
494 throw new CoordinationServiceException(String.format(
"Failed to release the lock on %s", nodePath), ex);
499 public void close() throws CoordinationServiceException {
519 this.displayName = displayName;
final InterProcessMutex interProcessLock
String upsertNodePath(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath)
void deleteNode(CategoryNode category, String nodePath)
static final String DEFAULT_NAMESPACE_ROOT
static String getIndexingServerPort()
static String getZkServerPort()
static final int CONNECTION_TIMEOUT_MILLISECONDS
CategoryNode(String displayName)
static final long serialVersionUID
static String getZkServerHost()
byte[] getNodeData(CategoryNode category, String nodePath)
static CoordinationService instance
final Map< String, String > categoryNodeToPath
static final int SESSION_TIMEOUT_MILLISECONDS
String getFullyQualifiedNodePath(CategoryNode category, String nodePath)
Lock(String nodePath, InterProcessMutex lock)
Lock tryGetSharedLock(CategoryNode category, String nodePath)
Lock tryGetExclusiveLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
void setNodeData(CategoryNode category, String nodePath, byte[] data)
CoordinationService(String rootNodeName)
CoordinationServiceException(String message)
static final int PORT_OFFSET
static boolean isZooKeeperAccessible(String hostName, String port)
Lock tryGetSharedLock(CategoryNode category, String nodePath, int timeOut, TimeUnit timeUnit)
static synchronized CoordinationService getInstance()
List< String > getNodeList(CategoryNode category)
CoordinationServiceException(String message, Throwable cause)
static String getIndexingServerHost()
final CuratorFramework curator