Autopsy  4.10.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
CollaborationMonitor.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2011-2017 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.casemodule;
20 
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.beans.PropertyChangeEvent;
23 import java.beans.PropertyChangeListener;
24 import java.io.Serializable;
25 import java.time.Duration;
26 import java.time.Instant;
27 import java.util.EnumSet;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.UUID;
33 import java.util.concurrent.ScheduledThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35 import java.util.logging.Level;
36 import org.netbeans.api.progress.ProgressHandle;
37 import org.openide.util.NbBundle;
49 import org.sleuthkit.datamodel.Content;
50 
56 final class CollaborationMonitor {
57 
58  private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events"; //NON-NLS
59  private static final String COLLABORATION_MONITOR_EVENT = "COLLABORATION_MONITOR_EVENT"; //NON-NLS
60  private static final Set<Case.Events> CASE_EVENTS_OF_INTEREST = EnumSet.of(Case.Events.ADDING_DATA_SOURCE,
61  Case.Events.DATA_SOURCE_ADDED, Case.Events.ADDING_DATA_SOURCE_FAILED);
62  private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 2;
63  private static final String PERIODIC_TASK_THREAD_NAME = "collab-monitor-periodic-tasks-%d"; //NON-NLS
64  private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
65  private static final long MAX_MISSED_HEARTBEATS = 5;
66  private static final long STALE_TASKS_DETECT_INTERVAL_MINS = 2;
67  private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
68  private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
69  private final String hostName;
70  private final LocalTasksManager localTasksManager;
71  private final RemoteTasksManager remoteTasksManager;
72  private final AutopsyEventPublisher eventPublisher;
73  private final ScheduledThreadPoolExecutor periodicTasksExecutor;
74 
87  CollaborationMonitor(String eventChannelPrefix) throws CollaborationMonitorException {
92  hostName = NetworkUtils.getLocalHostName();
93 
98  eventPublisher = new AutopsyEventPublisher();
99  try {
100  eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, eventChannelPrefix));
101  } catch (AutopsyEventException ex) {
102  throw new CollaborationMonitorException("Failed to initialize", ex);
103  }
104 
109  remoteTasksManager = new RemoteTasksManager();
110  eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
111 
115  localTasksManager = new LocalTasksManager();
116  IngestManager.getInstance().addIngestJobEventListener(localTasksManager);
117  Case.addEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
118 
125  periodicTasksExecutor = new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
126  periodicTasksExecutor.scheduleWithFixedDelay(new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
127  periodicTasksExecutor.scheduleWithFixedDelay(new StaleTaskDetectionTask(), STALE_TASKS_DETECT_INTERVAL_MINS, STALE_TASKS_DETECT_INTERVAL_MINS, TimeUnit.MINUTES);
128  }
129 
133  void shutdown() {
134  if (null != periodicTasksExecutor) {
135  periodicTasksExecutor.shutdownNow();
136  try {
137  while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
138  logger.log(Level.WARNING, "Waited at least {0} seconds for periodic tasks executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS); //NON-NLS
139  }
140  } catch (InterruptedException ex) {
141  logger.log(Level.SEVERE, "Unexpected interrupt while stopping periodic tasks executor", ex); //NON-NLS
142  }
143  }
144 
145  Case.removeEventTypeSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
146  IngestManager.getInstance().removeIngestJobEventListener(localTasksManager);
147 
148  if (null != eventPublisher) {
149  eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
150  eventPublisher.closeRemoteEventChannel();
151  }
152 
153  remoteTasksManager.shutdown();
154  }
155 
163  private final class LocalTasksManager implements PropertyChangeListener {
164 
165  private long nextTaskId;
166  private final Map<Integer, Task> eventIdsToAddDataSourceTasks;
167  private final Map<Long, Task> jobIdsTodataSourceAnalysisTasks;
168 
175  nextTaskId = 0;
176  eventIdsToAddDataSourceTasks = new HashMap<>();
177  jobIdsTodataSourceAnalysisTasks = new HashMap<>();
178  }
179 
186  @Override
187  public void propertyChange(PropertyChangeEvent event) {
188  if (AutopsyEvent.SourceType.LOCAL == ((AutopsyEvent) event).getSourceType()) {
189  String eventName = event.getPropertyName();
190  if (eventName.equals(Case.Events.ADDING_DATA_SOURCE.toString())) {
191  addDataSourceAddTask((AddingDataSourceEvent) event);
192  } else if (eventName.equals(Case.Events.ADDING_DATA_SOURCE_FAILED.toString())) {
193  removeDataSourceAddTask(((AddingDataSourceFailedEvent) event).getAddingDataSourceEventId());
194  } else if (eventName.equals(Case.Events.DATA_SOURCE_ADDED.toString())) {
195  removeDataSourceAddTask(((DataSourceAddedEvent) event).getAddingDataSourceEventId());
196  } else if (eventName.equals(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_STARTED.toString())) {
197  addDataSourceAnalysisTask((DataSourceAnalysisStartedEvent) event);
198  } else if (eventName.equals(IngestManager.IngestJobEvent.DATA_SOURCE_ANALYSIS_COMPLETED.toString())) {
199  removeDataSourceAnalysisTask((DataSourceAnalysisCompletedEvent) event);
200  }
201  }
202  }
203 
210  synchronized void addDataSourceAddTask(AddingDataSourceEvent event) {
211  String status = NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.addingDataSourceStatus.msg", hostName);
212  eventIdsToAddDataSourceTasks.put(event.getEventId().hashCode(), new Task(++nextTaskId, status));
213  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
214  }
215 
223  synchronized void removeDataSourceAddTask(UUID eventId) {
224  eventIdsToAddDataSourceTasks.remove(eventId.hashCode());
225  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
226  }
227 
234  synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) {
235  Content dataSource = event.getDataSource();
236  if (dataSource != null) {
237  String status = NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.analyzingDataSourceStatus.msg", hostName, dataSource.getName());
238  jobIdsTodataSourceAnalysisTasks.put(event.getDataSourceIngestJobId(), new Task(++nextTaskId, status));
239  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
240  }
241  }
242 
250  synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) {
251  jobIdsTodataSourceAnalysisTasks.remove(event.getDataSourceIngestJobId());
252  eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
253  }
254 
260  synchronized Map<Long, Task> getCurrentTasks() {
261  Map<Long, Task> currentTasks = new HashMap<>();
262  eventIdsToAddDataSourceTasks.values().stream().forEach((task) -> {
263  currentTasks.put(task.getId(), task);
264  });
265  jobIdsTodataSourceAnalysisTasks.values().stream().forEach((task) -> {
266  currentTasks.put(task.getId(), task);
267  });
268  return currentTasks;
269  }
270  }
271 
280  private final class RemoteTasksManager implements PropertyChangeListener {
281 
282  private final Map<String, RemoteTasks> hostsToTasks;
283 
290  hostsToTasks = new HashMap<>();
291  }
292 
299  @Override
300  public void propertyChange(PropertyChangeEvent event) {
301  if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) {
302  updateTasks((CollaborationEvent) event);
303  }
304  }
305 
309  synchronized void shutdown() {
310  finishAllTasks();
311  }
312 
319  synchronized void updateTasks(CollaborationEvent event) {
320  RemoteTasks tasksForHost = hostsToTasks.get(event.getHostName());
321  if (null != tasksForHost) {
322  tasksForHost.update(event);
323  } else {
324  hostsToTasks.put(event.getHostName(), new RemoteTasks(event));
325  }
326  }
327 
333  synchronized void finishStaleTasks() {
334  for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
335  Map.Entry<String, RemoteTasks> entry = it.next();
336  RemoteTasks tasksForHost = entry.getValue();
337  if (tasksForHost.isStale()) {
338  tasksForHost.finishAllTasks();
339  it.remove();
340  }
341  }
342  }
343 
347  synchronized void finishAllTasks() {
348  for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
349  Map.Entry<String, RemoteTasks> entry = it.next();
350  RemoteTasks tasksForHost = entry.getValue();
351  tasksForHost.finishAllTasks();
352  it.remove();
353  }
354  }
355 
359  private final class RemoteTasks {
360 
361  private final long MAX_MINUTES_WITHOUT_UPDATE = HEARTBEAT_INTERVAL_MINUTES * MAX_MISSED_HEARTBEATS;
362  private Instant lastUpdateTime;
363  private Map<Long, ProgressHandle> taskIdsToProgressBars;
364 
375  lastUpdateTime = Instant.now();
376 
377  taskIdsToProgressBars = new HashMap<>();
378  event.getCurrentTasks().values().stream().forEach((task) -> {
379  ProgressHandle progress = ProgressHandle.createHandle(event.getHostName());
380  progress.start();
381  progress.progress(task.getStatus());
382  taskIdsToProgressBars.put(task.getId(), progress);
383  });
384  }
385 
392  void update(CollaborationEvent event) {
396  lastUpdateTime = Instant.now();
397 
402  Map<Long, Task> remoteTasks = event.getCurrentTasks();
403  remoteTasks.values().stream().forEach((task) -> {
404  ProgressHandle progress = taskIdsToProgressBars.get(task.getId());
405  if (null != progress) {
409  progress.progress(task.getStatus());
410  } else {
414  progress = ProgressHandle.createHandle(event.getHostName());
415  progress.start();
416  progress.progress(task.getStatus());
417  taskIdsToProgressBars.put(task.getId(), progress);
418  }
419  });
420 
425  for (Iterator<Map.Entry<Long, ProgressHandle>> iterator = taskIdsToProgressBars.entrySet().iterator(); iterator.hasNext();) {
426  Map.Entry<Long, ProgressHandle> entry = iterator.next();
427  if (!remoteTasks.containsKey(entry.getKey())) {
428  ProgressHandle progress = entry.getValue();
429  progress.finish();
430  iterator.remove();
431  }
432  }
433  }
434 
439  void finishAllTasks() {
440  taskIdsToProgressBars.values().stream().forEach((progress) -> {
441  progress.finish();
442  });
443  taskIdsToProgressBars.clear();
444  }
445 
453  boolean isStale() {
454  return Duration.between(lastUpdateTime, Instant.now()).toMinutes() >= MAX_MINUTES_WITHOUT_UPDATE;
455  }
456  }
457 
458  }
459 
467  private final class HeartbeatTask implements Runnable {
468 
472  @Override
473  public void run() {
474  try {
475  eventPublisher.publishRemotely(new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
476  } catch (Exception ex) {
477  logger.log(Level.SEVERE, "Unexpected exception in HeartbeatTask", ex); //NON-NLS
478  }
479  }
480  }
481 
487  private final class StaleTaskDetectionTask implements Runnable {
488 
492  @Override
493  public void run() {
494  try {
495  remoteTasksManager.finishStaleTasks();
496  } catch (Exception ex) {
497  logger.log(Level.SEVERE, "Unexpected exception in StaleTaskDetectionTask", ex); //NON-NLS
498  }
499  }
500  }
501 
506  private final static class CollaborationEvent extends AutopsyEvent implements Serializable {
507 
508  private static final long serialVersionUID = 1L;
509  private final String hostName;
510  private final Map<Long, Task> currentTasks;
511 
519  CollaborationEvent(String hostName, Map<Long, Task> currentTasks) {
520  super(COLLABORATION_MONITOR_EVENT, null, null);
521  this.hostName = hostName;
522  this.currentTasks = currentTasks;
523  }
524 
530  String getHostName() {
531  return hostName;
532  }
533 
540  Map<Long, Task> getCurrentTasks() {
541  return currentTasks;
542  }
543 
544  }
545 
549  private final static class Task implements Serializable {
550 
551  private static final long serialVersionUID = 1L;
552  private final long id;
553  private final String status;
554 
562  Task(long id, String status) {
563  this.id = id;
564  this.status = status;
565  }
566 
573  long getId() {
574  return id;
575  }
576 
582  String getStatus() {
583  return status;
584  }
585  }
586 
590  final static class CollaborationMonitorException extends Exception {
591 
598  CollaborationMonitorException(String message) {
599  super(message);
600  }
601 
609  CollaborationMonitorException(String message, Throwable throwable) {
610  super(message, throwable);
611  }
612  }
613 
614 }

Copyright © 2012-2018 Basis Technology. Generated on: Fri Mar 22 2019
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.