19 package org.sleuthkit.autopsy.casemodule;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.beans.PropertyChangeEvent;
23 import java.beans.PropertyChangeListener;
24 import java.io.Serializable;
25 import java.time.Duration;
26 import java.time.Instant;
27 import java.util.Arrays;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
33 import java.util.UUID;
34 import java.util.concurrent.ScheduledThreadPoolExecutor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.logging.Level;
37 import org.netbeans.api.progress.ProgressHandle;
38 import org.netbeans.api.progress.ProgressHandleFactory;
39 import org.openide.util.NbBundle;
57 final class CollaborationMonitor {
59 private static final String EVENT_CHANNEL_NAME =
"%s-Collaboration-Monitor-Events";
60 private static final String COLLABORATION_MONITOR_EVENT =
"COLLABORATION_MONITOR_EVENT";
61 private static final Set<String> CASE_EVENTS_OF_INTEREST =
new HashSet<>(Arrays.asList(
new String[]{Case.Events.ADDING_DATA_SOURCE.toString(), Case.Events.DATA_SOURCE_ADDED.toString(), Case.Events.ADDING_DATA_SOURCE_FAILED.toString()}));
62 private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 2;
63 private static final String PERIODIC_TASK_THREAD_NAME =
"collab-monitor-periodic-tasks-%d";
64 private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
65 private static final long MAX_MISSED_HEARTBEATS = 5;
66 private static final long STALE_TASKS_DETECTION_INTERVAL_MINUTES = 2;
67 private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
68 private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
69 private final String hostName;
70 private final LocalTasksManager localTasksManager;
71 private final RemoteTasksManager remoteTasksManager;
72 private final AutopsyEventPublisher eventPublisher;
73 private final ScheduledThreadPoolExecutor periodicTasksExecutor;
82 CollaborationMonitor() throws CollaborationMonitorException {
87 hostName = NetworkUtils.getLocalHostName();
93 eventPublisher =
new AutopsyEventPublisher();
95 Case openedCase = Case.getCurrentCase();
96 String channelPrefix = openedCase.getTextIndexName();
97 eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, channelPrefix));
98 }
catch (AutopsyEventException ex) {
99 throw new CollaborationMonitorException(
"Failed to initialize", ex);
106 remoteTasksManager =
new RemoteTasksManager();
107 eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
112 localTasksManager =
new LocalTasksManager();
113 IngestManager.getInstance().addIngestJobEventListener(localTasksManager);
114 Case.addEventSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
122 periodicTasksExecutor =
new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS,
new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
123 periodicTasksExecutor.scheduleAtFixedRate(
new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
124 periodicTasksExecutor.scheduleAtFixedRate(
new StaleTaskDetectionTask(), STALE_TASKS_DETECTION_INTERVAL_MINUTES, STALE_TASKS_DETECTION_INTERVAL_MINUTES, TimeUnit.MINUTES);
131 if (null != periodicTasksExecutor) {
132 periodicTasksExecutor.shutdownNow();
134 while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
135 logger.log(Level.WARNING,
"Waited at least {0} seconds for periodic tasks executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS);
137 }
catch (InterruptedException ex) {
138 logger.log(Level.SEVERE,
"Unexpected interrupt while stopping periodic tasks executor", ex);
142 Case.removeEventSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
143 IngestManager.getInstance().removeIngestJobEventListener(localTasksManager);
145 if (null != eventPublisher) {
146 eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
147 eventPublisher.closeRemoteEventChannel();
150 remoteTasksManager.shutdown();
173 uuidsToAddDataSourceTasks =
new HashMap<>();
174 jobIdsTodataSourceAnalysisTasks =
new HashMap<>();
186 String eventName =
event.getPropertyName();
208 String status = NbBundle.getMessage(CollaborationMonitor.class,
"CollaborationMonitor.addingDataSourceStatus.msg", hostName);
209 uuidsToAddDataSourceTasks.put(event.
getDataSourceId().hashCode(),
new Task(++nextTaskId, status));
221 synchronized void removeDataSourceAddTask(UUID dataSourceId) {
222 uuidsToAddDataSourceTasks.remove(dataSourceId.hashCode());
223 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
232 synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) {
233 String status = NbBundle.getMessage(CollaborationMonitor.class,
"CollaborationMonitor.analyzingDataSourceStatus.msg", hostName, event.getDataSource().getName());
234 jobIdsTodataSourceAnalysisTasks.put(event.getDataSourceIngestJobId(),
new Task(++nextTaskId, status));
235 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
245 synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) {
246 jobIdsTodataSourceAnalysisTasks.remove(event.getDataSourceIngestJobId());
247 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, getCurrentTasks()));
255 synchronized Map<Long, Task> getCurrentTasks() {
256 Map<Long, Task> currentTasks =
new HashMap<>();
257 uuidsToAddDataSourceTasks.values().stream().forEach((task) -> {
258 currentTasks.put(task.getId(), task);
260 jobIdsTodataSourceAnalysisTasks.values().stream().forEach((task) -> {
261 currentTasks.put(task.getId(), task);
285 hostsToTasks =
new HashMap<>();
296 if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) {
304 synchronized void shutdown() {
314 synchronized void updateTasks(CollaborationEvent event) {
315 RemoteTasks tasksForHost = hostsToTasks.get(event.getHostName());
316 if (null != tasksForHost) {
317 tasksForHost.update(event);
319 hostsToTasks.put(event.getHostName(),
new RemoteTasks(event));
328 synchronized void finishStaleTasks() {
329 for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
330 Map.Entry<String, RemoteTasks> entry = it.next();
331 RemoteTasks tasksForHost = entry.getValue();
332 if (tasksForHost.isStale()) {
333 tasksForHost.finishAllTasks();
342 synchronized void finishAllTasks() {
343 for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
344 Map.Entry<String, RemoteTasks> entry = it.next();
345 RemoteTasks tasksForHost = entry.getValue();
346 tasksForHost.finishAllTasks();
370 lastUpdateTime = Instant.now();
372 taskIdsToProgressBars =
new HashMap<>();
373 event.getCurrentTasks().values().stream().forEach((task) -> {
374 ProgressHandle progress = ProgressHandleFactory.createHandle(event.getHostName());
376 progress.progress(task.getStatus());
377 taskIdsToProgressBars.put(task.getId(), progress);
391 lastUpdateTime = Instant.now();
397 Map<Long, Task> remoteTasks =
event.getCurrentTasks();
398 remoteTasks.values().stream().forEach((task) -> {
399 ProgressHandle progress = taskIdsToProgressBars.get(task.getId());
400 if (null != progress) {
404 progress.progress(task.getStatus());
409 progress = ProgressHandleFactory.createHandle(event.getHostName());
411 progress.progress(task.getStatus());
412 taskIdsToProgressBars.put(task.getId(), progress);
420 for (Iterator<Map.Entry<Long, ProgressHandle>> iterator = taskIdsToProgressBars.entrySet().iterator(); iterator.hasNext();) {
421 Map.Entry<Long, ProgressHandle> entry = iterator.next();
422 if (!remoteTasks.containsKey(entry.getKey())) {
423 ProgressHandle progress = entry.getValue();
434 void finishAllTasks() {
435 taskIdsToProgressBars.values().stream().forEach((progress) -> {
438 taskIdsToProgressBars.clear();
470 eventPublisher.publishRemotely(
new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
471 }
catch (Exception ex) {
472 logger.log(Level.SEVERE,
"Unexpected exception in HeartbeatTask", ex);
490 remoteTasksManager.finishStaleTasks();
491 }
catch (Exception ex) {
492 logger.log(Level.SEVERE,
"Unexpected exception in StaleTaskDetectionTask", ex);
515 super(COLLABORATION_MONITOR_EVENT, null, null);
525 String getHostName() {
535 Map<Long, Task> getCurrentTasks() {
544 private final static class Task implements Serializable {
547 private final long id;
557 Task(
long id, String status) {
585 final static class CollaborationMonitorException
extends Exception {
593 CollaborationMonitorException(String message) {
604 CollaborationMonitorException(String message, Throwable throwable) {
605 super(message, throwable);
final Map< String, RemoteTasks > hostsToTasks
ADDING_DATA_SOURCE_FAILED
static final long serialVersionUID
DATA_SOURCE_ANALYSIS_COMPLETED
final Map< Integer, Task > uuidsToAddDataSourceTasks
void propertyChange(PropertyChangeEvent event)
Map< Long, ProgressHandle > taskIdsToProgressBars
final Map< Long, Task > jobIdsTodataSourceAnalysisTasks
void propertyChange(PropertyChangeEvent event)
static final long serialVersionUID
final long MAX_MINUTES_WITHOUT_UPDATE
DATA_SOURCE_ANALYSIS_STARTED
final Map< Long, Task > currentTasks