Autopsy  4.1
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestManager.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2011-2017 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Date;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.logging.Level;
41 import java.util.stream.Collectors;
42 import java.util.stream.Stream;
43 import javax.swing.JOptionPane;
44 import org.netbeans.api.progress.ProgressHandle;
45 import org.openide.util.Cancellable;
46 import org.openide.util.NbBundle;
63 
68 public class IngestManager {
69 
70  private static final Logger logger = Logger.getLogger(IngestManager.class.getName());
71  private static IngestManager instance;
72  private final Object ingestMessageBoxLock = new Object();
73 
74  /*
75  * The ingest manager maintains a mapping of ingest job ids to running
76  * ingest jobs.
77  */
78  private final Map<Long, IngestJob> jobsById;
79 
80  /*
81  * Each runnable/callable task the ingest manager submits to its thread
82  * pools is given a unique thread/task ID.
83  */
84  private final AtomicLong nextThreadId;
85 
86  /*
87  * Ingest jobs may be queued to be started on a pool thread by start ingest
88  * job tasks. A mapping of task ids to the Future objects for each task is
89  * maintained to allow for task cancellation.
90  */
91  private final Map<Long, Future<Void>> startIngestJobTasks;
92  private final ExecutorService startIngestJobsThreadPool;
93 
94  /*
95  * Ingest jobs use an ingest task scheduler to break themselves down into
96  * data source level and file level tasks. The ingest scheduler puts these
97  * ingest tasks into queues for execution on ingest manager pool threads by
98  * ingest task executers. There is a single data source level ingest thread
99  * and a user configurable number of file level ingest threads.
100  */
101  private final ExecutorService dataSourceIngestThreadPool;
102  private static final int MIN_NUMBER_OF_FILE_INGEST_THREADS = 1;
103  private static final int MAX_NUMBER_OF_FILE_INGEST_THREADS = 16;
104  private static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS = 2;
106  private final ExecutorService fileIngestThreadPool;
107 
108  private static final String JOB_EVENT_CHANNEL_NAME = "%s-Ingest-Job-Events"; //NON-NLS
109  private static final String MODULE_EVENT_CHANNEL_NAME = "%s-Ingest-Module-Events"; //NON-NLS
110  private static final Set<String> jobEventNames = Stream.of(IngestJobEvent.values())
111  .map(IngestJobEvent::toString)
112  .collect(Collectors.toSet());
113  private static final Set<String> moduleEventNames = Stream.of(IngestModuleEvent.values())
114  .map(IngestModuleEvent::toString)
115  .collect(Collectors.toSet());
118  private final ExecutorService eventPublishingExecutor;
119 
120  /*
121  * The ingest manager uses an ingest monitor to determine when system
122  * resources are under pressure. If the monitor detects such a situation, it
123  * calls back to the ingest manager to cancel all ingest jobs in progress.
124  */
126 
127  /*
128  * The ingest manager provides access to a top component that is used by
129  * ingest module to post messages for the user. A count of the posts is used
130  * as a cap to avoid bogging down the application.
131  */
132  private static final int MAX_ERROR_MESSAGE_POSTS = 200;
133  private volatile IngestMessageTopComponent ingestMessageBox;
134  private final AtomicLong ingestErrorMessagePosts;
135 
136  /*
137  * The ingest manager supports reporting of ingest processing progress by
138  * collecting snapshots of the activities of the ingest threads, ingest job
139  * progress, and ingest module run times.
140  */
141  private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots;
142  private final ConcurrentHashMap<String, Long> ingestModuleRunTimes;
143 
144  /*
145  * The ingest job creation capability of the ingest manager can be turned on
146  * and off to support an orderly shut down of the application.
147  */
148  private volatile boolean jobCreationIsEnabled;
149 
150  /*
151  * Ingest manager subscribes to service outage notifications. If key
152  * services are down, ingest manager cancels all ingest jobs in progress.
153  */
155 
159  public enum IngestJobEvent {
160 
197  };
198 
202  public enum IngestModuleEvent {
203 
226  };
227 
234  public synchronized static IngestManager getInstance() {
235  if (instance == null) {
241  instance = new IngestManager();
242  instance.subscribeToCaseEvents();
243  }
244  return instance;
245  }
246 
255  private IngestManager() {
256  this.ingestModuleRunTimes = new ConcurrentHashMap<>();
257  this.ingestThreadActivitySnapshots = new ConcurrentHashMap<>();
258  this.ingestErrorMessagePosts = new AtomicLong(0L);
259  this.ingestMonitor = new IngestMonitor();
260  this.eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS
261  this.jobEventPublisher = new AutopsyEventPublisher();
262  this.moduleEventPublisher = new AutopsyEventPublisher();
263  this.dataSourceIngestThreadPool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS
264  this.startIngestJobsThreadPool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS
265  this.nextThreadId = new AtomicLong(0L);
266  this.jobsById = new HashMap<>();
267  this.startIngestJobTasks = new ConcurrentHashMap<>();
268 
269  this.servicesMonitor = ServicesMonitor.getInstance();
271 
273 
274  numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads();
275  if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) {
276  numberOfFileIngestThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS;
277  UserPreferences.setNumberOfFileIngestThreads(numberOfFileIngestThreads);
278  }
279  fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS
280  for (int i = 0; i < numberOfFileIngestThreads; ++i) {
282  }
283  }
284 
290  long threadId = nextThreadId.incrementAndGet();
291  dataSourceIngestThreadPool.submit(new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
292  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
293  }
294 
299  private void startFileIngestThread() {
300  long threadId = nextThreadId.incrementAndGet();
301  fileIngestThreadPool.submit(new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
302  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
303  }
304 
308  private void subscribeToCaseEvents() {
309  Case.addEventSubscriber(Case.Events.CURRENT_CASE.toString(), new PropertyChangeListener() {
310  @Override
311  public void propertyChange(PropertyChangeEvent event) {
312  if (event.getNewValue() != null) {
313  handleCaseOpened();
314  } else {
315  handleCaseClosed();
316  }
317  }
318  });
319  }
320 
326  PropertyChangeListener propChangeListener = new PropertyChangeListener() {
327  @Override
328  public void propertyChange(PropertyChangeEvent evt) {
329  if (evt.getNewValue().equals(ServicesMonitor.ServiceStatus.DOWN.toString())) {
330 
331  // check whether a multi-user case is currently being processed
332  try {
334  return;
335  }
336  } catch (IllegalStateException ignore) {
337  // Thrown by Case.getCurrentCase() when no case is open
338  return;
339  }
340 
341  // one of the services we subscribed to went down
342  String serviceDisplayName = ServicesMonitor.Service.valueOf(evt.getPropertyName()).getDisplayName();
343  logger.log(Level.SEVERE, "Service {0} is down! Cancelling all running ingest jobs", serviceDisplayName); //NON-NLS
344 
345  // display notification if running interactively
347  EventQueue.invokeLater(new Runnable() {
348  @Override
349  public void run() {
350  JOptionPane.showMessageDialog(null,
351  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
352  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
353  JOptionPane.ERROR_MESSAGE);
354  }
355  });
356  }
357 
358  // cancel ingest if running
360  }
361  }
362  };
363 
364  // subscribe to services of interest
365  Set<String> servicesList = new HashSet<>();
366  servicesList.add(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString());
367  servicesList.add(ServicesMonitor.Service.REMOTE_KEYWORD_SEARCH.toString());
368  this.servicesMonitor.addSubscriber(servicesList, propChangeListener);
369  }
370 
371  synchronized void handleCaseOpened() {
372  this.jobCreationIsEnabled = true;
374  try {
381  Case openedCase = Case.getCurrentCase();
382  String channelPrefix = openedCase.getName();
383  if (Case.CaseType.MULTI_USER_CASE == openedCase.getCaseType()) {
384  jobEventPublisher.openRemoteEventChannel(String.format(JOB_EVENT_CHANNEL_NAME, channelPrefix));
385  moduleEventPublisher.openRemoteEventChannel(String.format(MODULE_EVENT_CHANNEL_NAME, channelPrefix));
386  }
387  } catch (IllegalStateException | AutopsyEventException ex) {
388  logger.log(Level.SEVERE, "Failed to open remote events channel", ex); //NON-NLS
389  MessageNotifyUtil.Notify.error(NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.Title"),
390  NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.ErrMsg"));
391  }
392  }
393 
394  synchronized void handleCaseClosed() {
395  /*
396  * TODO (JIRA-2227): IngestManager should wait for cancelled ingest jobs
397  * to complete when a case is closed.
398  */
399  this.cancelAllIngestJobs(IngestJob.CancellationReason.CASE_CLOSED);
400  jobEventPublisher.closeRemoteEventChannel();
401  moduleEventPublisher.closeRemoteEventChannel();
402  this.jobCreationIsEnabled = false;
404  }
405 
411  void initIngestMessageInbox() {
412  synchronized (this.ingestMessageBoxLock) {
413  ingestMessageBox = IngestMessageTopComponent.findInstance();
414  }
415  }
416 
422  void postIngestMessage(IngestMessage message) {
423  synchronized (this.ingestMessageBoxLock) {
424  if (ingestMessageBox != null && RuntimeProperties.runningWithGUI()) {
425  if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
426  ingestMessageBox.displayMessage(message);
427  } else {
428  long errorPosts = ingestErrorMessagePosts.incrementAndGet();
429  if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
430  ingestMessageBox.displayMessage(message);
431  } else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
432  IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
433  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
434  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
435  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.msg", MAX_ERROR_MESSAGE_POSTS));
436  ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
437  }
438  }
439  }
440  }
441  }
442 
443  private void clearIngestMessageBox() {
444  synchronized (this.ingestMessageBoxLock) {
445  if (ingestMessageBox != null) {
446  ingestMessageBox.clearMessages();
447  }
448  ingestErrorMessagePosts.set(0);
449  }
450  }
451 
460  }
461 
469  public void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
470  if (jobCreationIsEnabled) {
471  IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI());
472  if (job.hasIngestPipeline()) {
473  long taskId = nextThreadId.incrementAndGet();
474  Future<Void> task = startIngestJobsThreadPool.submit(new StartIngestJobTask(taskId, job));
475  startIngestJobTasks.put(taskId, task);
476  }
477  }
478  }
479 
491  public synchronized IngestJobStartResult beginIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
492  if (this.jobCreationIsEnabled) {
493  IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI());
494  if (job.hasIngestPipeline()) {
495  return this.startIngestJob(job); // Start job
496  }
497  return new IngestJobStartResult(null, new IngestManagerException("No ingest pipeline created, likely due to no ingest modules being enabled."), null);
498  }
499  return new IngestJobStartResult(null, new IngestManagerException("No case open"), null);
500  }
501 
512  @Deprecated
513  public synchronized IngestJob startIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
514  return beginIngestJob(dataSources, settings).getJob();
515  }
516 
525  @NbBundle.Messages({
526  "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
527  "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
528  "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
529  "IngestManager.startupErr.dlgErrorList=Errors:"
530  })
532  List<IngestModuleError> errors = null;
533  if (this.jobCreationIsEnabled) {
534  // multi-user cases must have multi-user database service running
536  try {
537  if (!servicesMonitor.getServiceStatus(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString()).equals(ServicesMonitor.ServiceStatus.UP.toString())) {
538  // display notification if running interactively
540  EventQueue.invokeLater(new Runnable() {
541  @Override
542  public void run() {
543  String serviceDisplayName = ServicesMonitor.Service.REMOTE_CASE_DATABASE.getDisplayName();
544  JOptionPane.showMessageDialog(null,
545  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
546  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
547  JOptionPane.ERROR_MESSAGE);
548  }
549  });
550  }
551  // abort ingest
552  return new IngestJobStartResult(null, new IngestManagerException("Ingest aborted. Remote database is down"), Collections.<IngestModuleError>emptyList());
553  }
555  return new IngestJobStartResult(null, new IngestManagerException("Database server is down.", ex), Collections.<IngestModuleError>emptyList());
556  }
557  }
558 
559  if (!ingestMonitor.isRunning()) {
560  ingestMonitor.start();
561  }
562 
563  synchronized (jobsById) {
564  jobsById.put(job.getId(), job);
565  }
566  errors = job.start();
567  if (errors.isEmpty()) {
568  this.fireIngestJobStarted(job.getId());
569  IngestManager.logger.log(Level.INFO, "Ingest job {0} started", job.getId()); //NON-NLS
570  } else {
571  synchronized (jobsById) {
572  this.jobsById.remove(job.getId());
573  }
574  for (IngestModuleError error : errors) {
575  logger.log(Level.SEVERE, String.format("Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.getId()), error.getThrowable()); //NON-NLS
576  }
577  IngestManager.logger.log(Level.SEVERE, "Ingest job {0} could not be started", job.getId()); //NON-NLS
579  final StringBuilder message = new StringBuilder();
580  message.append(Bundle.IngestManager_startupErr_dlgMsg()).append("\n");
581  message.append(Bundle.IngestManager_startupErr_dlgSolution()).append("\n\n");
582  message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append("\n");
583  for (IngestModuleError error : errors) {
584  String moduleName = error.getModuleDisplayName();
585  String errorMessage = error.getThrowable().getLocalizedMessage();
586  message.append(moduleName).append(": ").append(errorMessage).append("\n");
587  }
588  message.append("\n\n");
589  EventQueue.invokeLater(() -> {
590  JOptionPane.showMessageDialog(null, message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
591  });
592  }
593  // abort ingest
594  return new IngestJobStartResult(null, new IngestManagerException("Errors occurred while starting ingest"), errors);
595  }
596  }
597  return new IngestJobStartResult(job, null, errors);
598  }
599 
600  synchronized void finishIngestJob(IngestJob job) {
601  long jobId = job.getId();
602  synchronized (jobsById) {
603  jobsById.remove(jobId);
604  }
605  if (!job.isCancelled()) {
606  IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); //NON-NLS
607  fireIngestJobCompleted(jobId);
608  } else {
609  IngestManager.logger.log(Level.INFO, "Ingest job {0} cancelled", jobId); //NON-NLS
610  fireIngestJobCancelled(jobId);
611  }
612  }
613 
619  public boolean isIngestRunning() {
620  synchronized (jobsById) {
621  return !jobsById.isEmpty();
622  }
623  }
624 
631  @Deprecated
632  public void cancelAllIngestJobs() {
634  }
635 
642  /*
643  * Cancel the start job tasks.
644  */
645  for (Future<Void> handle : startIngestJobTasks.values()) {
646  handle.cancel(true);
647  }
648 
649  /*
650  * Cancel the jobs in progress.
651  */
652  synchronized (jobsById) {
653  for (IngestJob job : this.jobsById.values()) {
654  job.cancel(reason);
655  }
656  }
657  }
658 
664  public void addIngestJobEventListener(final PropertyChangeListener listener) {
665  jobEventPublisher.addSubscriber(jobEventNames, listener);
666  }
667 
673  public void removeIngestJobEventListener(final PropertyChangeListener listener) {
674  jobEventPublisher.removeSubscriber(jobEventNames, listener);
675  }
676 
682  public void addIngestModuleEventListener(final PropertyChangeListener listener) {
683  moduleEventPublisher.addSubscriber(moduleEventNames, listener);
684  }
685 
691  public void removeIngestModuleEventListener(final PropertyChangeListener listener) {
692  moduleEventPublisher.removeSubscriber(moduleEventNames, listener);
693  }
694 
703  @Deprecated
704  public static void addPropertyChangeListener(final PropertyChangeListener listener) {
705  instance.jobEventPublisher.addSubscriber(jobEventNames, listener);
706  instance.moduleEventPublisher.addSubscriber(moduleEventNames, listener);
707  }
708 
717  @Deprecated
718  public static void removePropertyChangeListener(final PropertyChangeListener listener) {
719  instance.jobEventPublisher.removeSubscriber(jobEventNames, listener);
720  instance.moduleEventPublisher.removeSubscriber(moduleEventNames, listener);
721  }
722 
728  void fireIngestJobStarted(long ingestJobId) {
729  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.STARTED.toString(), ingestJobId, null);
730  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
731  }
732 
738  void fireIngestJobCompleted(long ingestJobId) {
739  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
740  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
741  }
742 
748  void fireIngestJobCancelled(long ingestJobId) {
749  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
750  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
751  }
752 
760  void fireDataSourceAnalysisStarted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
761  AutopsyEvent event = new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
762  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
763  }
764 
772  void fireDataSourceAnalysisCompleted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
773  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
774  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
775  }
776 
784  void fireDataSourceAnalysisCancelled(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
785  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
786  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
787  }
788 
794  void fireFileIngestDone(AbstractFile file) {
795  AutopsyEvent event = new FileAnalyzedEvent(file);
796  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
797  }
798 
804  void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
805  AutopsyEvent event = new BlackboardPostEvent(moduleDataEvent);
806  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
807  }
808 
816  void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
817  AutopsyEvent event = new ContentChangedEvent(moduleContentEvent);
818  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
819  }
820 
827  void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
828  ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
829  }
830 
837  void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
838  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
839  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
840  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
841 
842  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
843  }
844 
850  void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
851  ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId()));
852  }
853 
859  void setIngestTaskProgressCompleted(FileIngestTask task) {
860  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
861  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId());
862  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
863  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
864  }
865 
872  private void incrementModuleRunTime(String moduleName, Long duration) {
873  if (moduleName.equals("IDLE")) { //NON-NLS
874  return;
875  }
876 
877  synchronized (ingestModuleRunTimes) {
878  Long prevTimeL = ingestModuleRunTimes.get(moduleName);
879  long prevTime = 0;
880  if (prevTimeL != null) {
881  prevTime = prevTimeL;
882  }
883  prevTime += duration;
884  ingestModuleRunTimes.put(moduleName, prevTime);
885  }
886  }
887 
893  Map<String, Long> getModuleRunTimes() {
894  synchronized (ingestModuleRunTimes) {
895  Map<String, Long> times = new HashMap<>(ingestModuleRunTimes);
896  return times;
897  }
898  }
899 
905  List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
906  return new ArrayList<>(ingestThreadActivitySnapshots.values());
907  }
908 
914  List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
915  List<DataSourceIngestJob.Snapshot> snapShots = new ArrayList<>();
916  synchronized (jobsById) {
917  for (IngestJob job : jobsById.values()) {
918  snapShots.addAll(job.getDataSourceIngestJobSnapshots());
919  }
920  }
921  return snapShots;
922  }
923 
930  long getFreeDiskSpace() {
931  if (ingestMonitor != null) {
932  return ingestMonitor.getFreeSpace();
933  } else {
934  return -1;
935  }
936  }
937 
941  private final class StartIngestJobTask implements Callable<Void> {
942 
943  private final long threadId;
944  private final IngestJob job;
945  private ProgressHandle progress;
946 
947  StartIngestJobTask(long threadId, IngestJob job) {
948  this.threadId = threadId;
949  this.job = job;
950  }
951 
952  @Override
953  public Void call() {
954  try {
955  if (Thread.currentThread().isInterrupted()) {
956  synchronized (jobsById) {
957  jobsById.remove(job.getId());
958  }
959  return null;
960  }
961 
963  final String displayName = NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.displayName");
964  this.progress = ProgressHandle.createHandle(displayName, new Cancellable() {
965  @Override
966  public boolean cancel() {
967  if (progress != null) {
968  progress.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.cancelling", displayName));
969  }
970  Future<?> handle = startIngestJobTasks.remove(threadId);
971  handle.cancel(true);
972  return true;
973  }
974  });
975  progress.start();
976  }
977 
978  startIngestJob(job);
979  return null;
980 
981  } finally {
982  if (null != progress) {
983  progress.finish();
984  }
985  startIngestJobTasks.remove(threadId);
986  }
987  }
988 
989  }
990 
994  private final class ExecuteIngestJobsTask implements Runnable {
995 
996  private final long threadId;
997  private final IngestTaskQueue tasks;
998 
999  ExecuteIngestJobsTask(long threadId, IngestTaskQueue tasks) {
1000  this.threadId = threadId;
1001  this.tasks = tasks;
1002  }
1003 
1004  @Override
1005  public void run() {
1006  while (true) {
1007  try {
1008  IngestTask task = tasks.getNextTask(); // Blocks.
1009  task.execute(threadId);
1010  } catch (InterruptedException ex) {
1011  break;
1012  }
1013  if (Thread.currentThread().isInterrupted()) {
1014  break;
1015  }
1016  }
1017  }
1018  }
1019 
1023  private static final class PublishEventTask implements Runnable {
1024 
1025  private final AutopsyEvent event;
1027 
1036  this.event = event;
1037  this.publisher = publisher;
1038  }
1039 
1040  @Override
1041  public void run() {
1042  publisher.publish(event);
1043  }
1044 
1045  }
1046 
1047  static final class IngestThreadActivitySnapshot {
1048 
1049  private final long threadId;
1050  private final Date startTime;
1051  private final String activity;
1052  private final String dataSourceName;
1053  private final String fileName;
1054  private final long jobId;
1055 
1056  // nothing is running on the thread
1057  IngestThreadActivitySnapshot(long threadId) {
1058  this.threadId = threadId;
1059  startTime = new Date();
1060  this.activity = NbBundle.getMessage(this.getClass(), "IngestManager.IngestThreadActivitySnapshot.idleThread");
1061  this.dataSourceName = "";
1062  this.fileName = "";
1063  this.jobId = 0;
1064  }
1065 
1066  // data souce thread
1067  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource) {
1068  this.threadId = threadId;
1069  this.jobId = jobId;
1070  startTime = new Date();
1071  this.activity = activity;
1072  this.dataSourceName = dataSource.getName();
1073  this.fileName = "";
1074  }
1075 
1076  // file ingest thread
1077  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource, AbstractFile file) {
1078  this.threadId = threadId;
1079  this.jobId = jobId;
1080  startTime = new Date();
1081  this.activity = activity;
1082  this.dataSourceName = dataSource.getName();
1083  this.fileName = file.getName();
1084  }
1085 
1086  long getJobId() {
1087  return jobId;
1088  }
1089 
1090  long getThreadId() {
1091  return threadId;
1092  }
1093 
1094  Date getStartTime() {
1095  return startTime;
1096  }
1097 
1098  String getActivity() {
1099  return activity;
1100  }
1101 
1102  String getDataSourceName() {
1103  return dataSourceName;
1104  }
1105 
1106  String getFileName() {
1107  return fileName;
1108  }
1109 
1110  }
1111 
1115  public final static class IngestManagerException extends Exception {
1116 
1117  private static final long serialVersionUID = 1L;
1118 
1124  private IngestManagerException(String message) {
1125  super(message);
1126  }
1127 
1134  private IngestManagerException(String message, Throwable cause) {
1135  super(message, cause);
1136  }
1137  }
1138 
1139 }
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
void removeIngestModuleEventListener(final PropertyChangeListener listener)
static synchronized IngestManager getInstance()
IngestJobStartResult startIngestJob(IngestJob job)
final Map< Long, Future< Void > > startIngestJobTasks
static void addPropertyChangeListener(final PropertyChangeListener listener)
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static void removePropertyChangeListener(final PropertyChangeListener listener)
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
static synchronized ServicesMonitor getInstance()
void removeIngestJobEventListener(final PropertyChangeListener listener)
void incrementModuleRunTime(String moduleName, Long duration)
void addIngestJobEventListener(final PropertyChangeListener listener)
final Map< Long, IngestJob > jobsById
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final Set< String > moduleEventNames
void addIngestModuleEventListener(final PropertyChangeListener listener)
synchronized static Logger getLogger(String name)
Definition: Logger.java:161
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
synchronized IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
static void addEventSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
Definition: Case.java:396

Copyright © 2012-2016 Basis Technology. Generated on: Mon Apr 24 2017
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.