19 package org.sleuthkit.autopsy.casemodule;
 
   21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
   22 import java.beans.PropertyChangeEvent;
 
   23 import java.beans.PropertyChangeListener;
 
   24 import java.io.Serializable;
 
   25 import java.time.Duration;
 
   26 import java.time.Instant;
 
   27 import java.util.Collections;
 
   28 import java.util.EnumSet;
 
   29 import java.util.HashMap;
 
   30 import java.util.Iterator;
 
   33 import java.util.UUID;
 
   34 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
   35 import java.util.concurrent.TimeUnit;
 
   36 import java.util.logging.Level;
 
   37 import org.netbeans.api.progress.ProgressHandle;
 
   38 import org.openide.util.NbBundle;
 
   57 final class CollaborationMonitor {
 
   59     private static final String EVENT_CHANNEL_NAME = 
"%s-Collaboration-Monitor-Events"; 
 
   60     private static final String COLLABORATION_MONITOR_EVENT = 
"COLLABORATION_MONITOR_EVENT"; 
 
   61     private static final Set<Case.Events> CASE_EVENTS_OF_INTEREST = EnumSet.of(Case.Events.ADDING_DATA_SOURCE,
 
   62             Case.Events.DATA_SOURCE_ADDED, Case.Events.ADDING_DATA_SOURCE_FAILED);
 
   63     private static final Set<IngestManager.IngestJobEvent> INGEST_JOB_EVENTS_OF_INTEREST = EnumSet.of(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_STARTED, IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_COMPLETED);
 
   64     private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 2;
 
   65     private static final String PERIODIC_TASK_THREAD_NAME = 
"collab-monitor-periodic-tasks-%d"; 
 
   66     private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
 
   67     private static final long MAX_MISSED_HEARTBEATS = 5;
 
   68     private static final long STALE_TASKS_DETECT_INTERVAL_MINS = 2;
 
   69     private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
 
   70     private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
 
   71     private final String hostName;
 
   72     private final LocalTasksManager localTasksManager;
 
   73     private final RemoteTasksManager remoteTasksManager;
 
   74     private final AutopsyEventPublisher eventPublisher;
 
   75     private final ScheduledThreadPoolExecutor periodicTasksExecutor;
 
   89     CollaborationMonitor(String eventChannelPrefix) 
throws CollaborationMonitorException {
 
   94         hostName = NetworkUtils.getLocalHostName();
 
  100         eventPublisher = 
new AutopsyEventPublisher();
 
  102             eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, eventChannelPrefix));
 
  103         } 
catch (AutopsyEventException ex) {
 
  104             throw new CollaborationMonitorException(
"Failed to initialize", ex);
 
  111         remoteTasksManager = 
new RemoteTasksManager();
 
  112         eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
 
  117         localTasksManager = 
new LocalTasksManager();
 
  118         IngestManager.getInstance().addIngestJobEventListener(INGEST_JOB_EVENTS_OF_INTEREST, localTasksManager);
 
  119         Case.addEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
 
  127         periodicTasksExecutor = 
new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, 
new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
 
  128         periodicTasksExecutor.scheduleWithFixedDelay(
new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
 
  129         periodicTasksExecutor.scheduleWithFixedDelay(
new StaleTaskDetectionTask(), STALE_TASKS_DETECT_INTERVAL_MINS, STALE_TASKS_DETECT_INTERVAL_MINS, TimeUnit.MINUTES);
 
  136         if (null != periodicTasksExecutor) {
 
  137             periodicTasksExecutor.shutdownNow();
 
  139                 while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
 
  140                     logger.log(Level.WARNING, 
"Waited at least {0} seconds for periodic tasks executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS); 
 
  142             } 
catch (InterruptedException ex) {
 
  143                 logger.log(Level.SEVERE, 
"Unexpected interrupt while stopping periodic tasks executor", ex); 
 
  147         Case.removeEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
 
  148         IngestManager.getInstance().removeIngestJobEventListener(localTasksManager);
 
  150         if (null != eventPublisher) {
 
  151             eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
 
  152             eventPublisher.closeRemoteEventChannel();
 
  155         remoteTasksManager.shutdown();
 
  178             eventIdsToAddDataSourceTasks = 
new HashMap<>();
 
  179             jobIdsTodataSourceAnalysisTasks = 
new HashMap<>();
 
  191                 String eventName = 
event.getPropertyName();
 
  213             String status = NbBundle.getMessage(CollaborationMonitor.class, 
"CollaborationMonitor.addingDataSourceStatus.msg", hostName);
 
  214             eventIdsToAddDataSourceTasks.put(event.
getEventId().hashCode(), 
new Task(++nextTaskId, status));
 
  225         synchronized void removeDataSourceAddTask(UUID eventId) {
 
  226             eventIdsToAddDataSourceTasks.remove(eventId.hashCode());
 
  227             eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
 
  236         synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) {
 
  237             Content dataSource = 
event.getDataSource();
 
  238             if (dataSource != null) {
 
  239                 String status = NbBundle.getMessage(CollaborationMonitor.class, 
"CollaborationMonitor.analyzingDataSourceStatus.msg", hostName, dataSource.getName());
 
  240                 jobIdsTodataSourceAnalysisTasks.put(event.getIngestJobId(), 
new Task(++nextTaskId, status));
 
  241                 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
 
  252         synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) {
 
  253             jobIdsTodataSourceAnalysisTasks.remove(event.getIngestJobId());
 
  254             eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
 
  262         synchronized Map<Long, Task> getCurrentTasks() {
 
  263             Map<Long, Task> currentTasks = 
new HashMap<>();
 
  264             eventIdsToAddDataSourceTasks.values().stream().forEach((task) -> {
 
  265                 currentTasks.put(task.getId(), task);
 
  267             jobIdsTodataSourceAnalysisTasks.values().stream().forEach((task) -> {
 
  268                 currentTasks.put(task.getId(), task);
 
  292             hostsToTasks = 
new HashMap<>();
 
  303             if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) {
 
  311         synchronized void shutdown() {
 
  321         synchronized void updateTasks(CollaborationEvent event) {
 
  322             RemoteTasks tasksForHost = hostsToTasks.get(event.getHostName());
 
  323             if (null != tasksForHost) {
 
  324                 tasksForHost.update(event);
 
  326                 hostsToTasks.put(event.getHostName(), 
new RemoteTasks(event));
 
  335         synchronized void finishStaleTasks() {
 
  336             for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
 
  337                 Map.Entry<String, RemoteTasks> entry = it.next();
 
  338                 RemoteTasks tasksForHost = entry.getValue();
 
  339                 if (tasksForHost.isStale()) {
 
  340                     tasksForHost.finishAllTasks();
 
  349         synchronized void finishAllTasks() {
 
  350             for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
 
  351                 Map.Entry<String, RemoteTasks> entry = it.next();
 
  352                 RemoteTasks tasksForHost = entry.getValue();
 
  353                 tasksForHost.finishAllTasks();
 
  377                 lastUpdateTime = Instant.now();
 
  379                 taskIdsToProgressBars = 
new HashMap<>();
 
  380                 event.getCurrentTasks().values().stream().forEach((task) -> {
 
  381                     ProgressHandle progress = ProgressHandle.createHandle(event.getHostName());
 
  383                     progress.progress(task.getStatus());
 
  384                     taskIdsToProgressBars.put(task.getId(), progress);
 
  398                 lastUpdateTime = Instant.now();
 
  404                 Map<Long, Task> remoteTasks = 
event.getCurrentTasks();
 
  405                 remoteTasks.values().stream().forEach((task) -> {
 
  406                     ProgressHandle progress = taskIdsToProgressBars.get(task.getId());
 
  407                     if (null != progress) {
 
  411                         progress.progress(task.getStatus());
 
  416                         progress = ProgressHandle.createHandle(event.getHostName());
 
  418                         progress.progress(task.getStatus());
 
  419                         taskIdsToProgressBars.put(task.getId(), progress);
 
  427                 for (Iterator<Map.Entry<Long, ProgressHandle>> iterator = taskIdsToProgressBars.entrySet().iterator(); iterator.hasNext();) {
 
  428                     Map.Entry<Long, ProgressHandle> entry = iterator.next();
 
  429                     if (!remoteTasks.containsKey(entry.getKey())) {
 
  430                         ProgressHandle progress = entry.getValue();
 
  441             void finishAllTasks() {
 
  442                 taskIdsToProgressBars.values().stream().forEach((progress) -> {
 
  445                 taskIdsToProgressBars.clear();
 
  477                 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
 
  478             } 
catch (Exception ex) {
 
  479                 logger.log(Level.SEVERE, 
"Unexpected exception in HeartbeatTask", ex); 
 
  497                 remoteTasksManager.finishStaleTasks();
 
  498             } 
catch (Exception ex) {
 
  499                 logger.log(Level.SEVERE, 
"Unexpected exception in StaleTaskDetectionTask", ex); 
 
  522             super(COLLABORATION_MONITOR_EVENT, null, null);
 
  532         String getHostName() {
 
  542         Map<Long, Task> getCurrentTasks() {
 
  543             return Collections.unmodifiableMap(currentTasks);
 
  551     private final static class Task implements Serializable {
 
  553         private static final long serialVersionUID = 1L;
 
  554         private final long id;
 
  564         Task(
long id, String status) {
 
  592     final static class CollaborationMonitorException 
extends Exception {
 
  600         CollaborationMonitorException(String message) {
 
  611         CollaborationMonitorException(String message, Throwable throwable) {
 
  612             super(message, throwable);
 
final Map< String, RemoteTasks > hostsToTasks
 
ADDING_DATA_SOURCE_FAILED
 
final Map< Integer, Task > eventIdsToAddDataSourceTasks
 
DATA_SOURCE_ANALYSIS_COMPLETED
 
void propertyChange(PropertyChangeEvent event)
 
Map< Long, ProgressHandle > taskIdsToProgressBars
 
final Map< Long, Task > jobIdsTodataSourceAnalysisTasks
 
void propertyChange(PropertyChangeEvent event)
 
static final long serialVersionUID
 
final long MAX_MINUTES_WITHOUT_UPDATE
 
DATA_SOURCE_ANALYSIS_STARTED
 
final Map< Long, Task > currentTasks