Autopsy  4.4.1
Graphical digital forensics platform for The Sleuth Kit and other tools.
CollaborationMonitor.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2011-2017 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.casemodule;
20 
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.beans.PropertyChangeEvent;
23 import java.beans.PropertyChangeListener;
24 import java.io.Serializable;
25 import java.time.Duration;
26 import java.time.Instant;
27 import java.util.EnumSet;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.UUID;
33 import java.util.concurrent.ScheduledThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35 import java.util.logging.Level;
36 import org.netbeans.api.progress.ProgressHandle;
37 import org.openide.util.NbBundle;
49 
55 final class CollaborationMonitor {
56 
57  private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events"; //NON-NLS
58  private static final String COLLABORATION_MONITOR_EVENT = "COLLABORATION_MONITOR_EVENT"; //NON-NLS
59  private static final Set<Case.Events> CASE_EVENTS_OF_INTEREST = EnumSet.of(Case.Events.ADDING_DATA_SOURCE,
60  Case.Events.DATA_SOURCE_ADDED, Case.Events.ADDING_DATA_SOURCE_FAILED);
61  private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 2;
62  private static final String PERIODIC_TASK_THREAD_NAME = "collab-monitor-periodic-tasks-%d"; //NON-NLS
63  private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
64  private static final long MAX_MISSED_HEARTBEATS = 5;
65  private static final long STALE_TASKS_DETECT_INTERVAL_MINS = 2;
66  private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
67  private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
68  private final String hostName;
69  private final LocalTasksManager localTasksManager;
70  private final RemoteTasksManager remoteTasksManager;
71  private final AutopsyEventPublisher eventPublisher;
72  private final ScheduledThreadPoolExecutor periodicTasksExecutor;
73 
86  CollaborationMonitor(String eventChannelPrefix) throws CollaborationMonitorException {
91  hostName = NetworkUtils.getLocalHostName();
92 
97  eventPublisher = new AutopsyEventPublisher();
98  try {
99  eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, eventChannelPrefix));
100  } catch (AutopsyEventException ex) {
101  throw new CollaborationMonitorException("Failed to initialize", ex);
102  }
103 
108  remoteTasksManager = new RemoteTasksManager();
109  eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
110 
114  localTasksManager = new LocalTasksManager();
115  IngestManager.getInstance().addIngestJobEventListener(localTasksManager);
116  Case.addEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
117 
124  periodicTasksExecutor = new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
125  periodicTasksExecutor.scheduleAtFixedRate(new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
126  periodicTasksExecutor.scheduleAtFixedRate(new StaleTaskDetectionTask(), STALE_TASKS_DETECT_INTERVAL_MINS, STALE_TASKS_DETECT_INTERVAL_MINS, TimeUnit.MINUTES);
127  }
128 
132  void shutdown() {
133  if (null != periodicTasksExecutor) {
134  periodicTasksExecutor.shutdownNow();
135  try {
136  while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
137  logger.log(Level.WARNING, "Waited at least {0} seconds for periodic tasks executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS); //NON-NLS
138  }
139  } catch (InterruptedException ex) {
140  logger.log(Level.SEVERE, "Unexpected interrupt while stopping periodic tasks executor", ex); //NON-NLS
141  }
142  }
143 
144  Case.removeEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
145  IngestManager.getInstance().removeIngestJobEventListener(localTasksManager);
146 
147  if (null != eventPublisher) {
148  eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
149  eventPublisher.closeRemoteEventChannel();
150  }
151 
152  remoteTasksManager.shutdown();
153  }
154 
162  private final class LocalTasksManager implements PropertyChangeListener {
163 
164  private long nextTaskId;
165  private final Map<Integer, Task> eventIdsToAddDataSourceTasks;
166  private final Map<Long, Task> jobIdsTodataSourceAnalysisTasks;
167 
174  nextTaskId = 0;
175  eventIdsToAddDataSourceTasks = new HashMap<>();
176  jobIdsTodataSourceAnalysisTasks = new HashMap<>();
177  }
178 
185  @Override
186  public void propertyChange(PropertyChangeEvent event) {
187  if (AutopsyEvent.SourceType.LOCAL == ((AutopsyEvent) event).getSourceType()) {
188  String eventName = event.getPropertyName();
189  if (eventName.equals(Case.Events.ADDING_DATA_SOURCE.toString())) {
190  addDataSourceAddTask((AddingDataSourceEvent) event);
191  } else if (eventName.equals(Case.Events.ADDING_DATA_SOURCE_FAILED.toString())) {
192  removeDataSourceAddTask(((AddingDataSourceFailedEvent) event).getAddingDataSourceEventId());
193  } else if (eventName.equals(Case.Events.DATA_SOURCE_ADDED.toString())) {
194  removeDataSourceAddTask(((DataSourceAddedEvent) event).getAddingDataSourceEventId());
195  } else if (eventName.equals(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_STARTED.toString())) {
196  addDataSourceAnalysisTask((DataSourceAnalysisStartedEvent) event);
197  } else if (eventName.equals(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_COMPLETED.toString())) {
198  removeDataSourceAnalysisTask((DataSourceAnalysisCompletedEvent) event);
199  }
200  }
201  }
202 
209  synchronized void addDataSourceAddTask(AddingDataSourceEvent event) {
210  String status = NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.addingDataSourceStatus.msg", hostName);
211  eventIdsToAddDataSourceTasks.put(event.getEventId().hashCode(), new Task(++nextTaskId, status));
212  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
213  }
214 
222  synchronized void removeDataSourceAddTask(UUID eventId) {
223  eventIdsToAddDataSourceTasks.remove(eventId.hashCode());
224  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
225  }
226 
233  synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) {
234  String status = NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.analyzingDataSourceStatus.msg", hostName, event.getDataSource().getName());
235  jobIdsTodataSourceAnalysisTasks.put(event.getDataSourceIngestJobId(), new Task(++nextTaskId, status));
236  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
237  }
238 
246  synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) {
247  jobIdsTodataSourceAnalysisTasks.remove(event.getDataSourceIngestJobId());
248  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
249  }
250 
256  synchronized Map<Long, Task> getCurrentTasks() {
257  Map<Long, Task> currentTasks = new HashMap<>();
258  eventIdsToAddDataSourceTasks.values().stream().forEach((task) -> {
259  currentTasks.put(task.getId(), task);
260  });
261  jobIdsTodataSourceAnalysisTasks.values().stream().forEach((task) -> {
262  currentTasks.put(task.getId(), task);
263  });
264  return currentTasks;
265  }
266  }
267 
276  private final class RemoteTasksManager implements PropertyChangeListener {
277 
278  private final Map<String, RemoteTasks> hostsToTasks;
279 
286  hostsToTasks = new HashMap<>();
287  }
288 
295  @Override
296  public void propertyChange(PropertyChangeEvent event) {
297  if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) {
298  updateTasks((CollaborationEvent) event);
299  }
300  }
301 
305  synchronized void shutdown() {
306  finishAllTasks();
307  }
308 
315  synchronized void updateTasks(CollaborationEvent event) {
316  RemoteTasks tasksForHost = hostsToTasks.get(event.getHostName());
317  if (null != tasksForHost) {
318  tasksForHost.update(event);
319  } else {
320  hostsToTasks.put(event.getHostName(), new RemoteTasks(event));
321  }
322  }
323 
329  synchronized void finishStaleTasks() {
330  for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
331  Map.Entry<String, RemoteTasks> entry = it.next();
332  RemoteTasks tasksForHost = entry.getValue();
333  if (tasksForHost.isStale()) {
334  tasksForHost.finishAllTasks();
335  it.remove();
336  }
337  }
338  }
339 
343  synchronized void finishAllTasks() {
344  for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
345  Map.Entry<String, RemoteTasks> entry = it.next();
346  RemoteTasks tasksForHost = entry.getValue();
347  tasksForHost.finishAllTasks();
348  it.remove();
349  }
350  }
351 
355  private final class RemoteTasks {
356 
357  private final long MAX_MINUTES_WITHOUT_UPDATE = HEARTBEAT_INTERVAL_MINUTES * MAX_MISSED_HEARTBEATS;
358  private Instant lastUpdateTime;
359  private Map<Long, ProgressHandle> taskIdsToProgressBars;
360 
371  lastUpdateTime = Instant.now();
372 
373  taskIdsToProgressBars = new HashMap<>();
374  event.getCurrentTasks().values().stream().forEach((task) -> {
375  ProgressHandle progress = ProgressHandle.createHandle(event.getHostName());
376  progress.start();
377  progress.progress(task.getStatus());
378  taskIdsToProgressBars.put(task.getId(), progress);
379  });
380  }
381 
388  void update(CollaborationEvent event) {
392  lastUpdateTime = Instant.now();
393 
398  Map<Long, Task> remoteTasks = event.getCurrentTasks();
399  remoteTasks.values().stream().forEach((task) -> {
400  ProgressHandle progress = taskIdsToProgressBars.get(task.getId());
401  if (null != progress) {
405  progress.progress(task.getStatus());
406  } else {
410  progress = ProgressHandle.createHandle(event.getHostName());
411  progress.start();
412  progress.progress(task.getStatus());
413  taskIdsToProgressBars.put(task.getId(), progress);
414  }
415  });
416 
421  for (Iterator<Map.Entry<Long, ProgressHandle>> iterator = taskIdsToProgressBars.entrySet().iterator(); iterator.hasNext();) {
422  Map.Entry<Long, ProgressHandle> entry = iterator.next();
423  if (!remoteTasks.containsKey(entry.getKey())) {
424  ProgressHandle progress = entry.getValue();
425  progress.finish();
426  iterator.remove();
427  }
428  }
429  }
430 
435  void finishAllTasks() {
436  taskIdsToProgressBars.values().stream().forEach((progress) -> {
437  progress.finish();
438  });
439  taskIdsToProgressBars.clear();
440  }
441 
449  boolean isStale() {
450  return Duration.between(lastUpdateTime, Instant.now()).toMinutes() >= MAX_MINUTES_WITHOUT_UPDATE;
451  }
452  }
453 
454  }
455 
463  private final class HeartbeatTask implements Runnable {
464 
468  @Override
469  public void run() {
470  try {
471  eventPublisher.publishRemotely(new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
472  } catch (Exception ex) {
473  logger.log(Level.SEVERE, "Unexpected exception in HeartbeatTask", ex); //NON-NLS
474  }
475  }
476  }
477 
483  private final class StaleTaskDetectionTask implements Runnable {
484 
488  @Override
489  public void run() {
490  try {
491  remoteTasksManager.finishStaleTasks();
492  } catch (Exception ex) {
493  logger.log(Level.SEVERE, "Unexpected exception in StaleTaskDetectionTask", ex); //NON-NLS
494  }
495  }
496  }
497 
502  private final static class CollaborationEvent extends AutopsyEvent implements Serializable {
503 
504  private static final long serialVersionUID = 1L;
505  private final String hostName;
506  private final Map<Long, Task> currentTasks;
507 
515  CollaborationEvent(String hostName, Map<Long, Task> currentTasks) {
516  super(COLLABORATION_MONITOR_EVENT, null, null);
517  this.hostName = hostName;
518  this.currentTasks = currentTasks;
519  }
520 
526  String getHostName() {
527  return hostName;
528  }
529 
536  Map<Long, Task> getCurrentTasks() {
537  return currentTasks;
538  }
539 
540  }
541 
545  private final static class Task implements Serializable {
546 
547  private static final long serialVersionUID = 1L;
548  private final long id;
549  private final String status;
550 
558  Task(long id, String status) {
559  this.id = id;
560  this.status = status;
561  }
562 
569  long getId() {
570  return id;
571  }
572 
578  String getStatus() {
579  return status;
580  }
581  }
582 
586  final static class CollaborationMonitorException extends Exception {
587 
594  CollaborationMonitorException(String message) {
595  super(message);
596  }
597 
605  CollaborationMonitorException(String message, Throwable throwable) {
606  super(message, throwable);
607  }
608  }
609 
610 }

Copyright © 2012-2016 Basis Technology. Generated on: Fri Sep 29 2017
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.