Autopsy  4.19.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestJobPipeline.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2014-2021 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.Date;
24 import java.util.HashSet;
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.atomic.AtomicLong;
32 import java.util.logging.Level;
33 import java.util.regex.Matcher;
34 import java.util.regex.Pattern;
35 import java.util.stream.Stream;
36 import javax.annotation.concurrent.GuardedBy;
37 import javax.swing.JOptionPane;
38 import org.netbeans.api.progress.ProgressHandle;
39 import org.openide.util.Cancellable;
40 import org.openide.util.NbBundle;
41 import org.openide.windows.WindowManager;
47 import org.sleuthkit.datamodel.AbstractFile;
48 import org.sleuthkit.datamodel.Content;
49 import org.sleuthkit.datamodel.IngestJobInfo;
50 import org.sleuthkit.datamodel.IngestJobInfo.IngestJobStatusType;
51 import org.sleuthkit.datamodel.IngestModuleInfo;
52 import org.sleuthkit.datamodel.IngestModuleInfo.IngestModuleType;
53 import org.sleuthkit.datamodel.SleuthkitCase;
54 import org.sleuthkit.datamodel.TskCoreException;
57 import org.sleuthkit.datamodel.DataArtifact;
58 import org.sleuthkit.datamodel.DataSource;
59 
65 final class IngestJobPipeline {
66 
67  private static final String AUTOPSY_MODULE_PREFIX = "org.sleuthkit.autopsy";
68 
69  private static final Logger logger = Logger.getLogger(IngestJobPipeline.class.getName());
70 
71  /*
72  * A regular expression for identifying the proxy classes Jython generates
73  * for ingest module factories written using Python. For example:
74  * org.python.proxies.GPX_Parser_Module$GPXParserFileIngestModuleFactory$14
75  */
76  private static final Pattern JYTHON_MODULE_REGEX = Pattern.compile("org\\.python\\.proxies\\.(.+?)\\$(.+?)(\\$[0-9]*)?$");
77 
78  /*
79  * These fields define an ingest pipeline: the parent ingest job, a pipeline
80  * ID, the user's ingest job settings, and the data source to be analyzed.
81  * Optionally, there is a set of files to be analyzed instead of analyzing
82  * all of the files in the data source.
83  *
84  * The pipeline ID is used to associate the pipeline with the ingest tasks
85  * that the ingest task scheduler creates for the ingest job. The ingest job
86  * ID cannot be used for this purpose because the parent ingest job may have
87  * more than one data source and each data source gets its own pipeline.
88  */
89  private final IngestJob parentJob;
90  private static final AtomicLong nextPipelineId = new AtomicLong(0L);
91  private final long pipelineId;
92  private final IngestJobSettings settings;
93  private DataSource dataSource;
94  private final List<AbstractFile> files;
95 
96  /*
97  * An ingest pipeline runs its ingest modules in stages.
98  */
99  private static enum Stages {
100  /*
101  * The pipeline is instantiating ingest modules and loading them into
102  * its child ingest module pipelines.
103  */
105  /*
106  * This stage is unique to a streaming mode ingest job. The pipeline is
107  * running file ingest modules on files streamed to it via
108  * addStreamedFiles(). If configured to have data artifact ingest
109  * modules, the pipeline is also running them on data artifacts
110  * generated by the analysis of the streamed files. This stage ends when
111  * the data source is streamed to the pipeline via
112  * addStreamedDataSource().
113  */
115  /*
116  * The pipeline is running the following three types of ingest modules:
117  * higher priority data source level ingest modules, file ingest
118  * modules, and data artifact ingest modules.
119  */
129  FINALIZATION
130  };
131  private volatile Stages stage = IngestJobPipeline.Stages.INITIALIZATION;
132 
133  /*
134  * The stage field is volatile to allow it to be read by multiple threads.
135  * This lock is used not to guard the stage field, but to make stage
136  * transitions atomic.
137  */
138  private final Object stageTransitionLock = new Object();
139 
140  /*
141  * An ingest pipeline has separate data source level ingest module pipelines
142  * for the first and second stages. Longer running, lower priority modules
143  * belong in the second stage pipeline.
144  */
145  private DataSourceIngestPipeline firstStageDataSourceIngestPipeline;
146  private DataSourceIngestPipeline secondStageDataSourceIngestPipeline;
147  private volatile DataSourceIngestPipeline currentDataSourceIngestPipeline;
148 
149  /*
150  * An ingest pipeline has a collection of identical file ingest module
151  * pipelines, one for each file ingest thread in the ingest manager. The
152  * file ingest threads take and return file ingest pipeline copies from a
153  * blocking queue as they work through the file ingest tasks for the ingest
154  * job. Additionally, a fixed list of all of the file ingest module
155  * pipelines is used to bypass the blocking queue when cycling through the
156  * pipelines to make ingest progress snapshots.
157  */
158  private final LinkedBlockingQueue<FileIngestPipeline> fileIngestPipelinesQueue = new LinkedBlockingQueue<>();
159  private final List<FileIngestPipeline> fileIngestPipelines = new ArrayList<>();
160 
161  /*
162  * An ingest pipeline has a single data artifact ingest module pipeline.
163  */
164  private DataArtifactIngestPipeline artifactIngestPipeline;
165 
166  /*
167  * An ingest pipeline supports cancellation of analysis by individual data
168  * source level ingest modules or cancellation of all remaining analysis by
169  * all of its ingest modules. Cancellation works by setting flags that are
170  * checked by the ingest module pipelines every time they transition from
171  * one module to another. Ingest modules are also expected to check these
172  * flags (via the ingest job context) and stop processing if they are set.
173  * This approach to cancellation means that there can be a variable length
174  * delay between a cancellation request and its fulfillment. Analysis
175  * already completed at the time that cancellation occurs is not discarded.
176  */
177  private volatile boolean currentDataSourceIngestModuleCancelled;
178  private final List<String> cancelledDataSourceIngestModules = new CopyOnWriteArrayList<>();
179  private volatile boolean cancelled;
180  private volatile IngestJob.CancellationReason cancellationReason = IngestJob.CancellationReason.NOT_CANCELLED;
181 
182  /*
183  * An ingest pipeline interacts with the ingest task scheduler to create
184  * ingest tasks for analyzing the data source, files and data artifacts that
185  * are the subject of the ingest job. The scheduler queues the tasks for the
186  * ingest manager's ingest threads. The ingest tasks are the units of work
187  * for the ingest pipeline's child ingest module pipelines.
188  */
189  private static final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance();
190 
191  /*
192  * If running with a GUI, an ingest pipeline reports analysis progress and
193  * allows a user to cancel all or part of the analysis using progress bars
194  * in the lower right hand corner of the main application window.
195  */
196  private final boolean doUI;
197  private final Object dataSourceIngestProgressLock = new Object();
198  private ProgressHandle dataSourceIngestProgressBar;
199  private final Object fileIngestProgressLock = new Object();
200  private final List<String> filesInProgress = new ArrayList<>();
201  private long estimatedFilesToProcess;
202  private long processedFiles;
203  private ProgressHandle fileIngestProgressBar;
204  private final Object artifactIngestProgressLock = new Object();
205  private ProgressHandle artifactIngestProgressBar;
206 
207  /*
208  * Ingest job details are tracked using this object and are recorded in the
209  * case database when the pipeline starts up and shuts down.
210  */
211  private volatile IngestJobInfo ingestJobInfo;
212 
216  private final long createTime;
217 
218  /*
219  * An ingest pipeline allows ingest module pipelines to register and
220  * unregister the ingest thread they are running in when a scheduled ingest
221  * pause occurs and the threads are made to sleep. This allows interruption
222  * of these threads if the ingest job is canceled.
223  */
224  private final Object threadRegistrationLock = new Object();
225  @GuardedBy("threadRegistrationLock")
226  private final Set<Thread> pausedIngestThreads = new HashSet<>();
227 
240  IngestJobPipeline(IngestJob parentJob, Content dataSource, IngestJobSettings settings) throws InterruptedException {
241  this(parentJob, dataSource, Collections.emptyList(), settings);
242  }
243 
259  IngestJobPipeline(IngestJob parentJob, Content dataSource, List<AbstractFile> files, IngestJobSettings settings) throws InterruptedException {
260  if (!(dataSource instanceof DataSource)) {
261  throw new IllegalArgumentException("Passed dataSource that does not implement the DataSource interface"); //NON-NLS
262  }
263  this.parentJob = parentJob;
264  pipelineId = IngestJobPipeline.nextPipelineId.getAndIncrement();
265  this.dataSource = (DataSource) dataSource;
266  this.files = new ArrayList<>();
267  this.files.addAll(files);
268  this.settings = settings;
269  doUI = RuntimeProperties.runningWithGUI();
270  createTime = new Date().getTime();
271  stage = Stages.INITIALIZATION;
272  createIngestModulePipelines();
273  }
274 
286  private static void addToIngestPipelineTemplate(final List<IngestModuleTemplate> sortedModules, final Map<String, IngestModuleTemplate> javaModules, final Map<String, IngestModuleTemplate> jythonModules) {
287  final List<IngestModuleTemplate> autopsyModules = new ArrayList<>();
288  final List<IngestModuleTemplate> thirdPartyModules = new ArrayList<>();
289  Stream.concat(javaModules.entrySet().stream(), jythonModules.entrySet().stream()).forEach((templateEntry) -> {
290  if (templateEntry.getKey().startsWith(AUTOPSY_MODULE_PREFIX)) {
291  autopsyModules.add(templateEntry.getValue());
292  } else {
293  thirdPartyModules.add(templateEntry.getValue());
294  }
295  });
296  sortedModules.addAll(autopsyModules);
297  sortedModules.addAll(thirdPartyModules);
298  }
299 
311  private static String getModuleNameFromJythonClassName(String className) {
312  Matcher m = JYTHON_MODULE_REGEX.matcher(className);
313  if (m.find()) {
314  return String.format("%s.%s", m.group(1), m.group(2)); //NON-NLS
315  } else {
316  return null;
317  }
318  }
319 
330  private static void addModuleTemplateToSortingMap(Map<String, IngestModuleTemplate> mapping, Map<String, IngestModuleTemplate> jythonMapping, IngestModuleTemplate template) {
331  String className = template.getModuleFactory().getClass().getCanonicalName();
332  String jythonName = getModuleNameFromJythonClassName(className);
333  if (jythonName != null) {
334  jythonMapping.put(jythonName, template);
335  } else {
336  mapping.put(className, template);
337  }
338  }
339 
346  private void createIngestModulePipelines() throws InterruptedException {
347  /*
348  * Get the enabled ingest module templates from the ingest job settings.
349  */
350  List<IngestModuleTemplate> enabledTemplates = settings.getEnabledIngestModuleTemplates();
351 
358  Map<String, IngestModuleTemplate> javaDataSourceModuleTemplates = new LinkedHashMap<>();
359  Map<String, IngestModuleTemplate> jythonDataSourceModuleTemplates = new LinkedHashMap<>();
360  Map<String, IngestModuleTemplate> javaFileModuleTemplates = new LinkedHashMap<>();
361  Map<String, IngestModuleTemplate> jythonFileModuleTemplates = new LinkedHashMap<>();
362  Map<String, IngestModuleTemplate> javaArtifactModuleTemplates = new LinkedHashMap<>();
363  Map<String, IngestModuleTemplate> jythonArtifactModuleTemplates = new LinkedHashMap<>();
364  for (IngestModuleTemplate template : enabledTemplates) {
365  if (template.isDataSourceIngestModuleTemplate()) {
366  addModuleTemplateToSortingMap(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, template);
367  }
368  if (template.isFileIngestModuleTemplate()) {
369  addModuleTemplateToSortingMap(javaFileModuleTemplates, jythonFileModuleTemplates, template);
370  }
371  if (template.isDataArtifactIngestModuleTemplate()) {
372  addModuleTemplateToSortingMap(javaArtifactModuleTemplates, jythonArtifactModuleTemplates, template);
373  }
374  }
375 
381  IngestPipelinesConfiguration pipelineConfig = IngestPipelinesConfiguration.getInstance();
382  List<IngestModuleTemplate> firstStageDataSourcePipelineTemplate = createIngestPipelineTemplate(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageOneDataSourceIngestPipelineConfig());
383  List<IngestModuleTemplate> secondStageDataSourcePipelineTemplate = createIngestPipelineTemplate(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageTwoDataSourceIngestPipelineConfig());
384  List<IngestModuleTemplate> filePipelineTemplate = createIngestPipelineTemplate(javaFileModuleTemplates, jythonFileModuleTemplates, pipelineConfig.getFileIngestPipelineConfig());
385  List<IngestModuleTemplate> artifactPipelineTemplate = new ArrayList<>();
386 
395  addToIngestPipelineTemplate(firstStageDataSourcePipelineTemplate, javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates);
396  addToIngestPipelineTemplate(filePipelineTemplate, javaFileModuleTemplates, jythonFileModuleTemplates);
397  addToIngestPipelineTemplate(artifactPipelineTemplate, javaArtifactModuleTemplates, jythonArtifactModuleTemplates);
398 
403  firstStageDataSourceIngestPipeline = new DataSourceIngestPipeline(this, firstStageDataSourcePipelineTemplate);
404  secondStageDataSourceIngestPipeline = new DataSourceIngestPipeline(this, secondStageDataSourcePipelineTemplate);
405  int numberOfFileIngestThreads = IngestManager.getInstance().getNumberOfFileIngestThreads();
406  for (int i = 0; i < numberOfFileIngestThreads; ++i) {
407  FileIngestPipeline pipeline = new FileIngestPipeline(this, filePipelineTemplate);
408  fileIngestPipelinesQueue.put(pipeline);
409  fileIngestPipelines.add(pipeline);
410  }
411  artifactIngestPipeline = new DataArtifactIngestPipeline(this, artifactPipelineTemplate);
412  }
413 
429  private static List<IngestModuleTemplate> createIngestPipelineTemplate(Map<String, IngestModuleTemplate> javaIngestModuleTemplates, Map<String, IngestModuleTemplate> jythonIngestModuleTemplates, List<String> pipelineConfig) {
430  List<IngestModuleTemplate> pipelineTemplate = new ArrayList<>();
431  for (String moduleClassName : pipelineConfig) {
432  if (javaIngestModuleTemplates.containsKey(moduleClassName)) {
433  pipelineTemplate.add(javaIngestModuleTemplates.remove(moduleClassName));
434  } else if (jythonIngestModuleTemplates.containsKey(moduleClassName)) {
435  pipelineTemplate.add(jythonIngestModuleTemplates.remove(moduleClassName));
436  }
437  }
438  return pipelineTemplate;
439  }
440 
446  long getId() {
447  return pipelineId;
448  }
449 
455  String getExecutionContext() {
456  return settings.getExecutionContext();
457  }
458 
464  DataSource getDataSource() {
465  return dataSource;
466  }
467 
474  boolean shouldProcessUnallocatedSpace() {
475  return settings.getProcessUnallocatedSpace();
476  }
477 
483  FilesSet getFileIngestFilter() {
484  return settings.getFileFilter();
485  }
486 
493  boolean hasIngestModules() {
494  return hasFileIngestModules()
495  || hasFirstStageDataSourceIngestModules()
496  || hasSecondStageDataSourceIngestModules()
497  || hasDataArtifactIngestModules();
498  }
499 
506  boolean hasDataSourceIngestModules() {
507  if (stage == Stages.SECOND_STAGE) {
508  return hasSecondStageDataSourceIngestModules();
509  } else {
510  return hasFirstStageDataSourceIngestModules();
511  }
512  }
513 
520  private boolean hasFirstStageDataSourceIngestModules() {
521  return (firstStageDataSourceIngestPipeline.isEmpty() == false);
522  }
523 
530  private boolean hasSecondStageDataSourceIngestModules() {
531  return (secondStageDataSourceIngestPipeline.isEmpty() == false);
532  }
533 
540  boolean hasFileIngestModules() {
541  if (!fileIngestPipelines.isEmpty()) {
542  return !fileIngestPipelines.get(0).isEmpty();
543  }
544  return false;
545  }
546 
553  boolean hasDataArtifactIngestModules() {
554  return (artifactIngestPipeline.isEmpty() == false);
555  }
556 
562  List<IngestModuleError> startUp() {
563  List<IngestModuleError> errors = startUpIngestModulePipelines();
564  if (errors.isEmpty()) {
565  recordIngestJobStartUpInfo();
566  if (hasFirstStageDataSourceIngestModules() || hasFileIngestModules() || hasDataArtifactIngestModules()) {
567  if (parentJob.getIngestMode() == IngestJob.Mode.STREAMING) {
568  startFirstStageInStreamingMode();
569  } else {
570  startFirstStageInBatchMode();
571  }
572  } else if (hasSecondStageDataSourceIngestModules()) {
573  startSecondStage();
574  }
575  }
576  return errors;
577  }
578 
584  void recordIngestJobStartUpInfo() {
585  try {
586  SleuthkitCase caseDb = Case.getCurrentCase().getSleuthkitCase();
587  List<IngestModuleInfo> ingestModuleInfoList = new ArrayList<>();
588  for (IngestModuleTemplate module : settings.getEnabledIngestModuleTemplates()) {
589  IngestModuleType moduleType = getIngestModuleTemplateType(module);
590  IngestModuleInfo moduleInfo = caseDb.addIngestModule(module.getModuleName(), FactoryClassNameNormalizer.normalize(module.getModuleFactory().getClass().getCanonicalName()), moduleType, module.getModuleFactory().getModuleVersionNumber());
591  ingestModuleInfoList.add(moduleInfo);
592  }
593  ingestJobInfo = caseDb.addIngestJob(dataSource, NetworkUtils.getLocalHostName(), ingestModuleInfoList, new Date(this.createTime), new Date(0), IngestJobStatusType.STARTED, "");
594  } catch (TskCoreException ex) {
595  logErrorMessage(Level.SEVERE, "Failed to add ingest job info to case database", ex); //NON-NLS
596  }
597  }
598 
607  private IngestModuleType getIngestModuleTemplateType(IngestModuleTemplate moduleTemplate) {
608  IngestModuleType type = null;
609  if (moduleTemplate.isDataSourceIngestModuleTemplate()) {
610  type = IngestModuleType.DATA_SOURCE_LEVEL;
611  }
612  if (moduleTemplate.isFileIngestModuleTemplate()) {
613  if (type == null) {
614  type = IngestModuleType.FILE_LEVEL;
615  } else {
616  type = IngestModuleType.MULTIPLE;
617  }
618  }
619  if (moduleTemplate.isDataArtifactIngestModuleTemplate()) {
620  if (type == null) {
621  type = IngestModuleType.DATA_ARTIFACT;
622  } else {
623  type = IngestModuleType.MULTIPLE;
624  }
625  }
626  return type;
627  }
628 
642  private List<IngestModuleError> startUpIngestModulePipelines() {
643  List<IngestModuleError> errors = new ArrayList<>();
644  errors.addAll(startUpIngestModulePipeline(firstStageDataSourceIngestPipeline));
645  errors.addAll(startUpIngestModulePipeline(secondStageDataSourceIngestPipeline));
646  for (FileIngestPipeline pipeline : fileIngestPipelines) {
647  List<IngestModuleError> filePipelineErrors = startUpIngestModulePipeline(pipeline);
648  if (!filePipelineErrors.isEmpty()) {
649  /*
650  * If one file pipeline copy can't start up, assume that none of
651  * them will be able to start up for the same reason.
652  */
653  errors.addAll(filePipelineErrors);
654  break;
655  }
656  }
657  errors.addAll(startUpIngestModulePipeline(artifactIngestPipeline));
658  return errors;
659  }
660 
669  private List<IngestModuleError> startUpIngestModulePipeline(IngestTaskPipeline<?> pipeline) {
670  List<IngestModuleError> startUpErrors = pipeline.startUp();
671  if (!startUpErrors.isEmpty()) {
672  List<IngestModuleError> shutDownErrors = pipeline.shutDown();
673  if (!shutDownErrors.isEmpty()) {
674  logIngestModuleErrors(shutDownErrors);
675  }
676  }
677  return startUpErrors;
678  }
679 
685  private void startFirstStageInBatchMode() {
686  synchronized (stageTransitionLock) {
687  logInfoMessage("Starting first stage analysis in batch mode"); //NON-NLS
688  stage = Stages.FIRST_STAGE;
689 
690  /*
691  * Do a count of the files the data source processor has added to
692  * the case database. This estimate will be used for ingest progress
693  * snapshots and for the file ingest progress bar if running with a
694  * GUI.
695  */
696  if (hasFileIngestModules()) {
697  long filesToProcess;
698  if (files.isEmpty()) {
699  filesToProcess = dataSource.accept(new GetFilesCountVisitor());
700  } else {
701  filesToProcess = files.size();
702  }
703  synchronized (fileIngestProgressLock) {
704  estimatedFilesToProcess = filesToProcess;
705  }
706  }
707 
708  /*
709  * If running with a GUI, start ingest progress bars in the lower
710  * right hand corner of the main application window.
711  */
712  if (doUI) {
713  if (hasFileIngestModules()) {
714  startFileIngestProgressBar();
715  }
716  if (hasFirstStageDataSourceIngestModules()) {
717  startDataSourceIngestProgressBar();
718  }
719  if (hasDataArtifactIngestModules()) {
720  startArtifactIngestProgressBar();
721  }
722  }
723 
724  /*
725  * Make the first stage data source level ingest pipeline the
726  * current data source level pipeline.
727  */
728  currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
729 
730  /*
731  * Schedule the first stage ingest tasks and then immediately check
732  * for stage completion. This is necessary because it is possible
733  * that zero tasks will actually make it to task execution due to
734  * the file filter or other ingest job settings. In that case, there
735  * will never be a stage completion check in an ingest thread
736  * executing an ingest task, so such a job would run forever without
737  * a check here.
738  */
739  if (!files.isEmpty() && hasFileIngestModules()) {
740  taskScheduler.scheduleFileIngestTasks(this, files);
741  } else if (hasFirstStageDataSourceIngestModules() || hasFileIngestModules() || hasDataArtifactIngestModules()) {
742  taskScheduler.scheduleIngestTasks(this);
743  }
744  checkForStageCompleted();
745  }
746  }
747 
754  private void startFirstStageInStreamingMode() {
755  synchronized (stageTransitionLock) {
756  logInfoMessage("Starting first stage analysis in streaming mode"); //NON-NLS
757  stage = Stages.FIRST_STAGE_STREAMING;
758 
759  if (doUI) {
760  /*
761  * If running with a GUI, start ingest progress bars in the
762  * lower right hand corner of the main application window.
763  */
764  if (hasFileIngestModules()) {
765  /*
766  * Note that because estimated files remaining to process
767  * still has its initial value of zero, the progress bar
768  * will start in the "indeterminate" state. An estimate of
769  * the files to process can be computed in
770  */
771  startFileIngestProgressBar();
772  }
773  if (hasDataArtifactIngestModules()) {
774  startArtifactIngestProgressBar();
775  }
776  }
777 
778  if (hasDataArtifactIngestModules()) {
779  /*
780  * Schedule artifact ingest tasks for any artifacts currently in
781  * the case database. This needs to be done before any files or
782  * the data source are streamed in to avoid analyzing data
783  * artifacts added to the case database by the data source level
784  * or file level ingest tasks.
785  */
786  taskScheduler.scheduleDataArtifactIngestTasks(this);
787  }
788  }
789  }
790 
795  void addStreamedDataSource() {
796  synchronized (stageTransitionLock) {
797  logInfoMessage("Starting full first stage analysis in streaming mode"); //NON-NLS
798  stage = IngestJobPipeline.Stages.FIRST_STAGE;
799  currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
800 
801  if (hasFileIngestModules()) {
802  /*
803  * Do a count of the files the data source processor has added
804  * to the case database. This estimate will be used for ingest
805  * progress snapshots and for the file ingest progress bar if
806  * running with a GUI.
807  */
808  long filesToProcess = dataSource.accept(new GetFilesCountVisitor());
809  synchronized (fileIngestProgressLock) {
810  estimatedFilesToProcess = filesToProcess;
811  if (doUI && fileIngestProgressBar != null) {
812  fileIngestProgressBar.switchToDeterminate((int) estimatedFilesToProcess);
813  }
814  }
815  }
816 
817  if (doUI) {
818  if (hasFirstStageDataSourceIngestModules()) {
819  startDataSourceIngestProgressBar();
820  }
821  }
822 
823  currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
824  if (hasFirstStageDataSourceIngestModules()) {
825  IngestJobPipeline.taskScheduler.scheduleDataSourceIngestTask(this);
826  } else {
827  /*
828  * If no data source level ingest task is scheduled at this time
829  * and all of the file level and artifact ingest tasks scheduled
830  * during the initial file streaming stage have already
831  * executed, there will never be a stage completion check in an
832  * ingest thread executing an ingest task, so such a job would
833  * run forever without a check here.
834  */
835  checkForStageCompleted();
836  }
837  }
838  }
839 
843  private void startSecondStage() {
844  synchronized (stageTransitionLock) {
845  if (hasSecondStageDataSourceIngestModules()) {
846  logInfoMessage(String.format("Starting second stage ingest task pipelines for %s (objID=%d, jobID=%d)", dataSource.getName(), dataSource.getId(), parentJob.getId())); //NON-NLS
847  stage = IngestJobPipeline.Stages.SECOND_STAGE;
848 
849  if (doUI) {
850  startDataSourceIngestProgressBar();
851  }
852 
853  currentDataSourceIngestPipeline = secondStageDataSourceIngestPipeline;
854  taskScheduler.scheduleDataSourceIngestTask(this);
855  }
856  }
857  }
858 
862  private void startArtifactIngestProgressBar() {
863  if (doUI) {
864  synchronized (artifactIngestProgressLock) {
865  String displayName = NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataArtifactIngest.displayName", this.dataSource.getName());
866  artifactIngestProgressBar = ProgressHandle.createHandle(displayName, new Cancellable() {
867  @Override
868  public boolean cancel() {
869  IngestJobPipeline.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
870  return true;
871  }
872  });
873  artifactIngestProgressBar.start();
874  artifactIngestProgressBar.switchToIndeterminate();
875  }
876  }
877  }
878 
882  private void startDataSourceIngestProgressBar() {
883  if (this.doUI) {
884  synchronized (this.dataSourceIngestProgressLock) {
885  String displayName = NbBundle.getMessage(this.getClass(),
886  "IngestJob.progress.dataSourceIngest.initialDisplayName",
887  this.dataSource.getName());
888  this.dataSourceIngestProgressBar = ProgressHandle.createHandle(displayName, new Cancellable() {
889  @Override
890  public boolean cancel() {
891  // If this method is called, the user has already pressed
892  // the cancel button on the progress bar and the OK button
893  // of a cancelation confirmation dialog supplied by
894  // NetBeans. What remains to be done is to find out whether
895  // the user wants to cancel only the currently executing
896  // data source ingest module or the entire ingest job.
897  DataSourceIngestCancellationPanel panel = new DataSourceIngestCancellationPanel();
898  String dialogTitle = NbBundle.getMessage(IngestJobPipeline.this.getClass(), "IngestJob.cancellationDialog.title");
899  JOptionPane.showConfirmDialog(WindowManager.getDefault().getMainWindow(), panel, dialogTitle, JOptionPane.OK_OPTION, JOptionPane.PLAIN_MESSAGE);
900  if (panel.cancelAllDataSourceIngestModules()) {
901  IngestJobPipeline.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
902  } else {
903  IngestJobPipeline.this.cancelCurrentDataSourceIngestModule();
904  }
905  return true;
906  }
907  });
908  this.dataSourceIngestProgressBar.start();
909  this.dataSourceIngestProgressBar.switchToIndeterminate();
910  }
911  }
912  }
913 
917  private void startFileIngestProgressBar() {
918  if (this.doUI) {
919  synchronized (this.fileIngestProgressLock) {
920  String displayName = NbBundle.getMessage(this.getClass(),
921  "IngestJob.progress.fileIngest.displayName",
922  this.dataSource.getName());
923  this.fileIngestProgressBar = ProgressHandle.createHandle(displayName, new Cancellable() {
924  @Override
925  public boolean cancel() {
926  // If this method is called, the user has already pressed
927  // the cancel button on the progress bar and the OK button
928  // of a cancelation confirmation dialog supplied by
929  // NetBeans.
930  IngestJobPipeline.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
931  return true;
932  }
933  });
934  this.fileIngestProgressBar.start();
935  this.fileIngestProgressBar.switchToDeterminate((int) this.estimatedFilesToProcess);
936  }
937  }
938  }
939 
944  private void checkForStageCompleted() {
945  synchronized (stageTransitionLock) {
946  if (stage == Stages.FIRST_STAGE_STREAMING) {
947  return;
948  }
949  if (taskScheduler.currentTasksAreCompleted(this)) {
950  switch (stage) {
951  case FIRST_STAGE:
952  finishFirstStage();
953  break;
954  case SECOND_STAGE:
955  shutDown();
956  break;
957  }
958  }
959  }
960  }
961 
966  private void finishFirstStage() {
967  synchronized (stageTransitionLock) {
968  logInfoMessage("Finished first stage analysis"); //NON-NLS
969 
970  shutDownIngestModulePipeline(currentDataSourceIngestPipeline);
971  while (!fileIngestPipelinesQueue.isEmpty()) {
972  FileIngestPipeline pipeline = fileIngestPipelinesQueue.poll();
973  shutDownIngestModulePipeline(pipeline);
974  }
975 
976  if (doUI) {
977  synchronized (dataSourceIngestProgressLock) {
978  if (dataSourceIngestProgressBar != null) {
979  dataSourceIngestProgressBar.finish();
980  dataSourceIngestProgressBar = null;
981  }
982  }
983 
984  synchronized (fileIngestProgressLock) {
985  if (fileIngestProgressBar != null) {
986  fileIngestProgressBar.finish();
987  fileIngestProgressBar = null;
988  }
989  }
990  }
991 
992  if (!cancelled && hasSecondStageDataSourceIngestModules()) {
993  startSecondStage();
994  } else {
995  shutDown();
996  }
997  }
998  }
999 
1003  private void shutDown() {
1004  synchronized (stageTransitionLock) {
1005  logInfoMessage("Finished all tasks"); //NON-NLS
1006  stage = IngestJobPipeline.Stages.FINALIZATION;
1007 
1008  shutDownIngestModulePipeline(currentDataSourceIngestPipeline);
1009  shutDownIngestModulePipeline(artifactIngestPipeline);
1010 
1011  if (doUI) {
1012  synchronized (dataSourceIngestProgressLock) {
1013  if (dataSourceIngestProgressBar != null) {
1014  dataSourceIngestProgressBar.finish();
1015  dataSourceIngestProgressBar = null;
1016  }
1017  }
1018 
1019  synchronized (fileIngestProgressLock) {
1020  if (fileIngestProgressBar != null) {
1021  fileIngestProgressBar.finish();
1022  fileIngestProgressBar = null;
1023  }
1024  }
1025 
1026  synchronized (artifactIngestProgressLock) {
1027  if (artifactIngestProgressBar != null) {
1028  artifactIngestProgressBar.finish();
1029  artifactIngestProgressBar = null;
1030  }
1031  }
1032  }
1033 
1034  if (ingestJobInfo != null) {
1035  if (cancelled) {
1036  try {
1037  ingestJobInfo.setIngestJobStatus(IngestJobStatusType.CANCELLED);
1038  } catch (TskCoreException ex) {
1039  logErrorMessage(Level.WARNING, "Failed to update ingest job status in case database", ex);
1040  }
1041  } else {
1042  try {
1043  ingestJobInfo.setIngestJobStatus(IngestJobStatusType.COMPLETED);
1044  } catch (TskCoreException ex) {
1045  logErrorMessage(Level.WARNING, "Failed to update ingest job status in case database", ex);
1046  }
1047  }
1048  try {
1049  ingestJobInfo.setEndDateTime(new Date());
1050  } catch (TskCoreException ex) {
1051  logErrorMessage(Level.WARNING, "Failed to set job end date in case database", ex);
1052  }
1053  }
1054  }
1055 
1056  parentJob.notifyIngestPipelineShutDown(this);
1057  }
1058 
1064  private <T extends IngestTask> void shutDownIngestModulePipeline(IngestTaskPipeline<T> pipeline) {
1065  if (pipeline.isRunning()) {
1066  List<IngestModuleError> errors = new ArrayList<>();
1067  errors.addAll(pipeline.shutDown());
1068  if (!errors.isEmpty()) {
1069  logIngestModuleErrors(errors);
1070  }
1071  }
1072  }
1073 
1081  void execute(DataSourceIngestTask task) {
1082  try {
1083  if (!isCancelled()) {
1084  List<IngestModuleError> errors = new ArrayList<>();
1085  errors.addAll(currentDataSourceIngestPipeline.executeTask(task));
1086  if (!errors.isEmpty()) {
1087  logIngestModuleErrors(errors);
1088  }
1089  }
1090  } finally {
1091  taskScheduler.notifyTaskCompleted(task);
1092  checkForStageCompleted();
1093  }
1094  }
1095 
1102  void execute(FileIngestTask task) {
1103  try {
1104  if (!isCancelled()) {
1105  FileIngestPipeline pipeline = fileIngestPipelinesQueue.take();
1106  if (!pipeline.isEmpty()) {
1107  /*
1108  * Get the file from the task. If the file was "streamed,"
1109  * the task may only have the file object ID and a trip to
1110  * the case database will be required.
1111  */
1112  AbstractFile file;
1113  try {
1114  file = task.getFile();
1115  } catch (TskCoreException ex) {
1116  List<IngestModuleError> errors = new ArrayList<>();
1117  errors.add(new IngestModuleError("Ingest Pipeline", ex));
1118  logIngestModuleErrors(errors);
1119  fileIngestPipelinesQueue.put(pipeline);
1120  return;
1121  }
1122 
1123  synchronized (fileIngestProgressLock) {
1124  ++processedFiles;
1125  if (doUI) {
1126  if (processedFiles <= estimatedFilesToProcess) {
1127  fileIngestProgressBar.progress(file.getName(), (int) processedFiles);
1128  } else {
1129  fileIngestProgressBar.progress(file.getName(), (int) estimatedFilesToProcess);
1130  }
1131  filesInProgress.add(file.getName());
1132  }
1133  }
1134 
1138  List<IngestModuleError> errors = new ArrayList<>();
1139  errors.addAll(pipeline.executeTask(task));
1140  if (!errors.isEmpty()) {
1141  logIngestModuleErrors(errors, file);
1142  }
1143 
1144  if (doUI && !cancelled) {
1145  synchronized (fileIngestProgressLock) {
1150  filesInProgress.remove(file.getName());
1151  if (filesInProgress.size() > 0) {
1152  fileIngestProgressBar.progress(filesInProgress.get(0));
1153  } else {
1154  fileIngestProgressBar.progress("");
1155  }
1156  }
1157  }
1158  }
1159  fileIngestPipelinesQueue.put(pipeline);
1160  }
1161  } catch (InterruptedException ex) {
1162  logger.log(Level.SEVERE, String.format("Unexpected interrupt of file ingest thread during execution of file ingest job (file obj ID = %d)", task.getFileId()), ex);
1163  Thread.currentThread().interrupt(); // Reset thread interrupted flag
1164  } finally {
1165  taskScheduler.notifyTaskCompleted(task);
1166  checkForStageCompleted();
1167  }
1168  }
1169 
1176  void execute(DataArtifactIngestTask task) {
1177  try {
1178  if (!isCancelled() && !artifactIngestPipeline.isEmpty()) {
1179  List<IngestModuleError> errors = new ArrayList<>();
1180  errors.addAll(artifactIngestPipeline.executeTask(task));
1181  if (!errors.isEmpty()) {
1182  logIngestModuleErrors(errors);
1183  }
1184  }
1185  } finally {
1186  taskScheduler.notifyTaskCompleted(task);
1187  checkForStageCompleted();
1188  }
1189  }
1190 
1197  void addStreamedFiles(List<Long> fileObjIds) {
1198  if (hasFileIngestModules()) {
1199  if (stage.equals(Stages.FIRST_STAGE_STREAMING)) {
1200  IngestJobPipeline.taskScheduler.scheduleStreamedFileIngestTasks(this, fileObjIds);
1201  } else {
1202  logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported");
1203  }
1204  }
1205  }
1206 
1214  void addFiles(List<AbstractFile> files) {
1215  if (stage.equals(Stages.FIRST_STAGE_STREAMING)
1216  || stage.equals(Stages.FIRST_STAGE)) {
1217  taskScheduler.fastTrackFileIngestTasks(this, files);
1218  } else {
1219  logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported");
1220  }
1221 
1229  checkForStageCompleted();
1230  }
1231 
1238  void addDataArtifacts(List<DataArtifact> artifacts) {
1239  List<DataArtifact> artifactsToAnalyze = new ArrayList<>(artifacts);
1240  if (stage.equals(Stages.FIRST_STAGE_STREAMING)
1241  || stage.equals(Stages.FIRST_STAGE)
1242  || stage.equals(Stages.SECOND_STAGE)) {
1243  taskScheduler.scheduleDataArtifactIngestTasks(this, artifactsToAnalyze);
1244  } else {
1245  logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported");
1246  }
1247 
1255  checkForStageCompleted();
1256  }
1257 
1264  void updateDataSourceIngestProgressBarDisplayName(String displayName) {
1265  if (this.doUI && !this.cancelled) {
1266  synchronized (this.dataSourceIngestProgressLock) {
1267  this.dataSourceIngestProgressBar.setDisplayName(displayName);
1268  }
1269  }
1270  }
1271 
1280  void switchDataSourceIngestProgressBarToDeterminate(int workUnits) {
1281  if (this.doUI && !this.cancelled) {
1282  synchronized (this.dataSourceIngestProgressLock) {
1283  if (null != this.dataSourceIngestProgressBar) {
1284  this.dataSourceIngestProgressBar.switchToDeterminate(workUnits);
1285  }
1286  }
1287  }
1288  }
1289 
1295  void switchDataSourceIngestProgressBarToIndeterminate() {
1296  if (this.doUI && !this.cancelled) {
1297  synchronized (this.dataSourceIngestProgressLock) {
1298  if (null != this.dataSourceIngestProgressBar) {
1299  this.dataSourceIngestProgressBar.switchToIndeterminate();
1300  }
1301  }
1302  }
1303  }
1304 
1311  void advanceDataSourceIngestProgressBar(int workUnits) {
1312  if (doUI && !cancelled) {
1313  synchronized (dataSourceIngestProgressLock) {
1314  if (null != dataSourceIngestProgressBar) {
1315  dataSourceIngestProgressBar.progress("", workUnits);
1316  }
1317  }
1318  }
1319  }
1320 
1327  void advanceDataSourceIngestProgressBar(String currentTask) {
1328  if (doUI && !cancelled) {
1329  synchronized (dataSourceIngestProgressLock) {
1330  if (null != dataSourceIngestProgressBar) {
1331  dataSourceIngestProgressBar.progress(currentTask);
1332  }
1333  }
1334  }
1335  }
1336 
1345  void advanceDataSourceIngestProgressBar(String currentTask, int workUnits) {
1346  if (this.doUI && !this.cancelled) {
1347  synchronized (this.fileIngestProgressLock) {
1348  this.dataSourceIngestProgressBar.progress(currentTask, workUnits);
1349  }
1350  }
1351  }
1352 
1360  boolean currentDataSourceIngestModuleIsCancelled() {
1361  return this.currentDataSourceIngestModuleCancelled;
1362  }
1363 
1370  void currentDataSourceIngestModuleCancellationCompleted(String moduleDisplayName) {
1371  this.currentDataSourceIngestModuleCancelled = false;
1372  this.cancelledDataSourceIngestModules.add(moduleDisplayName);
1373 
1374  if (this.doUI) {
1382  synchronized (this.dataSourceIngestProgressLock) {
1383  this.dataSourceIngestProgressBar.finish();
1384  this.dataSourceIngestProgressBar = null;
1385  this.startDataSourceIngestProgressBar();
1386  }
1387  }
1388  }
1389 
1395  DataSourceIngestPipeline.DataSourcePipelineModule getCurrentDataSourceIngestModule() {
1396  if (currentDataSourceIngestPipeline != null) {
1397  return (DataSourceIngestPipeline.DataSourcePipelineModule) currentDataSourceIngestPipeline.getCurrentlyRunningModule();
1398  } else {
1399  return null;
1400  }
1401  }
1402 
1407  void cancelCurrentDataSourceIngestModule() {
1408  this.currentDataSourceIngestModuleCancelled = true;
1409  }
1410 
1417  void cancel(IngestJob.CancellationReason reason) {
1418  this.cancelled = true;
1419  this.cancellationReason = reason;
1420  IngestJobPipeline.taskScheduler.cancelPendingFileTasksForIngestJob(this);
1421 
1422  if (this.doUI) {
1423  synchronized (this.dataSourceIngestProgressLock) {
1424  if (null != dataSourceIngestProgressBar) {
1425  dataSourceIngestProgressBar.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataSourceIngest.initialDisplayName", this.dataSource.getName()));
1426  dataSourceIngestProgressBar.progress(NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling"));
1427  }
1428  }
1429 
1430  synchronized (this.fileIngestProgressLock) {
1431  if (null != this.fileIngestProgressBar) {
1432  this.fileIngestProgressBar.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestJob.progress.fileIngest.displayName", this.dataSource.getName()));
1433  this.fileIngestProgressBar.progress(NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling"));
1434  }
1435  }
1436  }
1437 
1438  synchronized (threadRegistrationLock) {
1439  for (Thread thread : pausedIngestThreads) {
1440  thread.interrupt();
1441  }
1442  pausedIngestThreads.clear();
1443  }
1444 
1445  // If a data source had no tasks in progress it may now be complete.
1446  checkForStageCompleted();
1447  }
1448 
1455  boolean isCancelled() {
1456  return this.cancelled;
1457  }
1458 
1464  IngestJob.CancellationReason getCancellationReason() {
1465  return this.cancellationReason;
1466  }
1467 
1474  private void logInfoMessage(String message) {
1475  logger.log(Level.INFO, String.format("%s (data source = %s, objId = %d, pipeline id = %d, ingest job id = %d)", message, this.dataSource.getName(), this.dataSource.getId(), pipelineId, ingestJobInfo.getIngestJobId())); //NON-NLS
1476  }
1477 
1486  private void logErrorMessage(Level level, String message, Throwable throwable) {
1487  logger.log(level, String.format("%s (data source = %s, objId = %d, pipeline id = %d, ingest job id = %d)", message, this.dataSource.getName(), this.dataSource.getId(), pipelineId, ingestJobInfo.getIngestJobId()), throwable); //NON-NLS
1488  }
1489 
1497  private void logErrorMessage(Level level, String message) {
1498  logger.log(level, String.format("%s (data source = %s, objId = %d, pipeline id = %d, ingest job id %d)", message, this.dataSource.getName(), this.dataSource.getId(), pipelineId, ingestJobInfo.getIngestJobId())); //NON-NLS
1499  }
1500 
1506  private void logIngestModuleErrors(List<IngestModuleError> errors) {
1507  for (IngestModuleError error : errors) {
1508  logErrorMessage(Level.SEVERE, String.format("%s experienced an error during analysis", error.getModuleDisplayName()), error.getThrowable()); //NON-NLS
1509  }
1510  }
1511 
1518  private void logIngestModuleErrors(List<IngestModuleError> errors, AbstractFile file) {
1519  for (IngestModuleError error : errors) {
1520  logErrorMessage(Level.SEVERE, String.format("%s experienced an error during analysis while processing file %s, object ID %d", error.getModuleDisplayName(), file.getName(), file.getId()), error.getThrowable()); //NON-NLS
1521  }
1522  }
1523 
1529  Snapshot getSnapshot(boolean getIngestTasksSnapshot) {
1535  boolean fileIngestRunning = false;
1536  Date fileIngestStartTime = null;
1537  for (FileIngestPipeline pipeline : this.fileIngestPipelines) {
1538  if (pipeline.isRunning()) {
1539  fileIngestRunning = true;
1540  }
1541  Date pipelineStartTime = pipeline.getStartTime();
1542  if (null != pipelineStartTime && (null == fileIngestStartTime || pipelineStartTime.before(fileIngestStartTime))) {
1543  fileIngestStartTime = pipelineStartTime;
1544  }
1545  }
1546 
1547  long processedFilesCount = 0;
1548  long estimatedFilesToProcessCount = 0;
1549  long snapShotTime = new Date().getTime();
1550  IngestJobTasksSnapshot tasksSnapshot = null;
1551  if (getIngestTasksSnapshot) {
1552  synchronized (fileIngestProgressLock) {
1553  processedFilesCount = this.processedFiles;
1554  estimatedFilesToProcessCount = this.estimatedFilesToProcess;
1555  snapShotTime = new Date().getTime();
1556  }
1557  tasksSnapshot = taskScheduler.getTasksSnapshotForJob(pipelineId);
1558  }
1559 
1560  return new Snapshot(dataSource.getName(),
1561  pipelineId, createTime,
1562  getCurrentDataSourceIngestModule(),
1563  fileIngestRunning, fileIngestStartTime,
1564  cancelled, cancellationReason, cancelledDataSourceIngestModules,
1565  processedFilesCount, estimatedFilesToProcessCount, snapShotTime, tasksSnapshot);
1566  }
1567 
1574  void registerPausedIngestThread(Thread thread) {
1575  synchronized (threadRegistrationLock) {
1576  pausedIngestThreads.add(thread);
1577  }
1578  }
1579 
1586  void unregisterPausedIngestThread(Thread thread) {
1587  synchronized (threadRegistrationLock) {
1588  pausedIngestThreads.remove(thread);
1589  }
1590  }
1591 
1592 }

Copyright © 2012-2021 Basis Technology. Generated on: Fri Aug 6 2021
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.