19 package org.sleuthkit.autopsy.casemodule;
 
   21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
   22 import java.beans.PropertyChangeEvent;
 
   23 import java.beans.PropertyChangeListener;
 
   24 import java.io.Serializable;
 
   25 import java.time.Duration;
 
   26 import java.time.Instant;
 
   27 import java.util.Arrays;
 
   28 import java.util.HashMap;
 
   29 import java.util.HashSet;
 
   30 import java.util.Iterator;
 
   33 import java.util.UUID;
 
   34 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
   35 import java.util.concurrent.TimeUnit;
 
   36 import java.util.logging.Level;
 
   37 import org.netbeans.api.progress.ProgressHandle;
 
   38 import org.openide.util.NbBundle;
 
   56 final class CollaborationMonitor {
 
   58     private static final String EVENT_CHANNEL_NAME = 
"%s-Collaboration-Monitor-Events"; 
 
   59     private static final String COLLABORATION_MONITOR_EVENT = 
"COLLABORATION_MONITOR_EVENT"; 
 
   60     private static final Set<String> CASE_EVENTS_OF_INTEREST = 
new HashSet<>(Arrays.asList(
new String[]{Case.Events.ADDING_DATA_SOURCE.toString(), Case.Events.DATA_SOURCE_ADDED.toString(), Case.Events.ADDING_DATA_SOURCE_FAILED.toString()}));
 
   61     private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 2;
 
   62     private static final String PERIODIC_TASK_THREAD_NAME = 
"collab-monitor-periodic-tasks-%d"; 
 
   63     private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
 
   64     private static final long MAX_MISSED_HEARTBEATS = 5;
 
   65     private static final long STALE_TASKS_DETECT_INTERVAL_MINS = 2;
 
   66     private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
 
   67     private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
 
   68     private final String hostName;
 
   69     private final LocalTasksManager localTasksManager;
 
   70     private final RemoteTasksManager remoteTasksManager;
 
   71     private final AutopsyEventPublisher eventPublisher;
 
   72     private final ScheduledThreadPoolExecutor periodicTasksExecutor;
 
   81     CollaborationMonitor() throws CollaborationMonitorException {
 
   86         hostName = NetworkUtils.getLocalHostName();
 
   92         eventPublisher = 
new AutopsyEventPublisher();
 
   94             Case openedCase = Case.getCurrentCase();
 
   95             String channelPrefix = openedCase.getTextIndexName();
 
   96             eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, channelPrefix));
 
   97         } 
catch (AutopsyEventException ex) {
 
   98             throw new CollaborationMonitorException(
"Failed to initialize", ex);
 
  105         remoteTasksManager = 
new RemoteTasksManager();
 
  106         eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
 
  111         localTasksManager = 
new LocalTasksManager();
 
  112         IngestManager.getInstance().addIngestJobEventListener(localTasksManager);
 
  113         Case.addEventSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
 
  121         periodicTasksExecutor = 
new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, 
new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
 
  122         periodicTasksExecutor.scheduleAtFixedRate(
new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
 
  123         periodicTasksExecutor.scheduleAtFixedRate(
new StaleTaskDetectionTask(), STALE_TASKS_DETECT_INTERVAL_MINS, STALE_TASKS_DETECT_INTERVAL_MINS, TimeUnit.MINUTES);
 
  130         if (null != periodicTasksExecutor) {
 
  131             periodicTasksExecutor.shutdownNow();
 
  133                 while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
 
  134                     logger.log(Level.WARNING, 
"Waited at least {0} seconds for periodic tasks executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS); 
 
  136             } 
catch (InterruptedException ex) {
 
  137                 logger.log(Level.SEVERE, 
"Unexpected interrupt while stopping periodic tasks executor", ex); 
 
  141         Case.removeEventSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
 
  142         IngestManager.getInstance().removeIngestJobEventListener(localTasksManager);
 
  144         if (null != eventPublisher) {
 
  145             eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
 
  146             eventPublisher.closeRemoteEventChannel();
 
  149         remoteTasksManager.shutdown();
 
  172             eventIdsToAddDataSourceTasks = 
new HashMap<>();
 
  173             jobIdsTodataSourceAnalysisTasks = 
new HashMap<>();
 
  185                 String eventName = 
event.getPropertyName();
 
  207             String status = NbBundle.getMessage(CollaborationMonitor.class, 
"CollaborationMonitor.addingDataSourceStatus.msg", hostName);
 
  208             eventIdsToAddDataSourceTasks.put(event.
getEventId().hashCode(), 
new Task(++nextTaskId, status));
 
  219         synchronized void removeDataSourceAddTask(UUID eventId) {
 
  220             eventIdsToAddDataSourceTasks.remove(eventId.hashCode());
 
  221             eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
 
  230         synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) {
 
  231             String status = NbBundle.getMessage(CollaborationMonitor.class, 
"CollaborationMonitor.analyzingDataSourceStatus.msg", hostName, event.getDataSource().getName());
 
  232             jobIdsTodataSourceAnalysisTasks.put(event.getDataSourceIngestJobId(), 
new Task(++nextTaskId, status));
 
  233             eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
 
  243         synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) {
 
  244             jobIdsTodataSourceAnalysisTasks.remove(event.getDataSourceIngestJobId());
 
  245             eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
 
  253         synchronized Map<Long, Task> getCurrentTasks() {
 
  254             Map<Long, Task> currentTasks = 
new HashMap<>();
 
  255             eventIdsToAddDataSourceTasks.values().stream().forEach((task) -> {
 
  256                 currentTasks.put(task.getId(), task);
 
  258             jobIdsTodataSourceAnalysisTasks.values().stream().forEach((task) -> {
 
  259                 currentTasks.put(task.getId(), task);
 
  283             hostsToTasks = 
new HashMap<>();
 
  294             if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) {
 
  302         synchronized void shutdown() {
 
  312         synchronized void updateTasks(CollaborationEvent event) {
 
  313             RemoteTasks tasksForHost = hostsToTasks.get(event.getHostName());
 
  314             if (null != tasksForHost) {
 
  315                 tasksForHost.update(event);
 
  317                 hostsToTasks.put(event.getHostName(), 
new RemoteTasks(event));
 
  326         synchronized void finishStaleTasks() {
 
  327             for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
 
  328                 Map.Entry<String, RemoteTasks> entry = it.next();
 
  329                 RemoteTasks tasksForHost = entry.getValue();
 
  330                 if (tasksForHost.isStale()) {
 
  331                     tasksForHost.finishAllTasks();
 
  340         synchronized void finishAllTasks() {
 
  341             for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
 
  342                 Map.Entry<String, RemoteTasks> entry = it.next();
 
  343                 RemoteTasks tasksForHost = entry.getValue();
 
  344                 tasksForHost.finishAllTasks();
 
  368                 lastUpdateTime = Instant.now();
 
  370                 taskIdsToProgressBars = 
new HashMap<>();
 
  371                 event.getCurrentTasks().values().stream().forEach((task) -> {
 
  372                     ProgressHandle progress = ProgressHandle.createHandle(event.getHostName());
 
  374                     progress.progress(task.getStatus());
 
  375                     taskIdsToProgressBars.put(task.getId(), progress);
 
  389                 lastUpdateTime = Instant.now();
 
  395                 Map<Long, Task> remoteTasks = 
event.getCurrentTasks();
 
  396                 remoteTasks.values().stream().forEach((task) -> {
 
  397                     ProgressHandle progress = taskIdsToProgressBars.get(task.getId());
 
  398                     if (null != progress) {
 
  402                         progress.progress(task.getStatus());
 
  407                         progress = ProgressHandle.createHandle(event.getHostName());
 
  409                         progress.progress(task.getStatus());
 
  410                         taskIdsToProgressBars.put(task.getId(), progress);
 
  418                 for (Iterator<Map.Entry<Long, ProgressHandle>> iterator = taskIdsToProgressBars.entrySet().iterator(); iterator.hasNext();) {
 
  419                     Map.Entry<Long, ProgressHandle> entry = iterator.next();
 
  420                     if (!remoteTasks.containsKey(entry.getKey())) {
 
  421                         ProgressHandle progress = entry.getValue();
 
  432             void finishAllTasks() {
 
  433                 taskIdsToProgressBars.values().stream().forEach((progress) -> {
 
  436                 taskIdsToProgressBars.clear();
 
  468                 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
 
  469             } 
catch (Exception ex) {
 
  470                 logger.log(Level.SEVERE, 
"Unexpected exception in HeartbeatTask", ex); 
 
  488                 remoteTasksManager.finishStaleTasks();
 
  489             } 
catch (Exception ex) {
 
  490                 logger.log(Level.SEVERE, 
"Unexpected exception in StaleTaskDetectionTask", ex); 
 
  513             super(COLLABORATION_MONITOR_EVENT, null, null);
 
  523         String getHostName() {
 
  533         Map<Long, Task> getCurrentTasks() {
 
  542     private final static class Task implements Serializable {
 
  545         private final long id;
 
  555         Task(
long id, String status) {
 
  583     final static class CollaborationMonitorException 
extends Exception {
 
  591         CollaborationMonitorException(String message) {
 
  602         CollaborationMonitorException(String message, Throwable throwable) {
 
  603             super(message, throwable);
 
final Map< String, RemoteTasks > hostsToTasks
ADDING_DATA_SOURCE_FAILED
final Map< Integer, Task > eventIdsToAddDataSourceTasks
static final long serialVersionUID
DATA_SOURCE_ANALYSIS_COMPLETED
void propertyChange(PropertyChangeEvent event)
Map< Long, ProgressHandle > taskIdsToProgressBars
final Map< Long, Task > jobIdsTodataSourceAnalysisTasks
void propertyChange(PropertyChangeEvent event)
static final long serialVersionUID
final long MAX_MINUTES_WITHOUT_UPDATE
DATA_SOURCE_ANALYSIS_STARTED
final Map< Long, Task > currentTasks