Autopsy  4.18.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestJobPipeline.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2014-2021 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.Date;
24 import java.util.HashSet;
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.atomic.AtomicLong;
32 import java.util.logging.Level;
33 import java.util.regex.Matcher;
34 import java.util.regex.Pattern;
35 import java.util.stream.Stream;
36 import javax.annotation.concurrent.GuardedBy;
37 import javax.swing.JOptionPane;
38 import org.netbeans.api.progress.ProgressHandle;
39 import org.openide.util.Cancellable;
40 import org.openide.util.NbBundle;
41 import org.openide.windows.WindowManager;
48 import org.sleuthkit.datamodel.AbstractFile;
49 import org.sleuthkit.datamodel.Content;
50 import org.sleuthkit.datamodel.IngestJobInfo;
51 import org.sleuthkit.datamodel.IngestJobInfo.IngestJobStatusType;
52 import org.sleuthkit.datamodel.IngestModuleInfo;
53 import org.sleuthkit.datamodel.IngestModuleInfo.IngestModuleType;
54 import org.sleuthkit.datamodel.SleuthkitCase;
55 import org.sleuthkit.datamodel.TskCoreException;
58 
63 final class IngestJobPipeline {
64 
65  private static String AUTOPSY_MODULE_PREFIX = "org.sleuthkit.autopsy";
66 
67  private static final Logger logger = Logger.getLogger(IngestJobPipeline.class.getName());
68 
69  // to match something like: "org.python.proxies.GPX_Parser_Module$GPXParserFileIngestModuleFactory$14"
70  private static final Pattern JYTHON_REGEX = Pattern.compile("org\\.python\\.proxies\\.(.+?)\\$(.+?)(\\$[0-9]*)?$");
71 
78  private final IngestJob parentJob;
79  private static final AtomicLong nextJobId = new AtomicLong(0L);
80  private final long id;
81  private final IngestJobSettings settings;
82  private Content dataSource = null;
83  private final IngestJob.Mode ingestMode;
84  private final List<AbstractFile> files = new ArrayList<>();
85 
89  private static enum Stages {
90 
112  FINALIZATION
113  };
114  private volatile Stages stage = IngestJobPipeline.Stages.INITIALIZATION;
115  private final Object stageCompletionCheckLock = new Object();
116 
125  private final Object dataSourceIngestPipelineLock = new Object();
126  private DataSourceIngestPipeline firstStageDataSourceIngestPipeline;
127  private DataSourceIngestPipeline secondStageDataSourceIngestPipeline;
128  private DataSourceIngestPipeline currentDataSourceIngestPipeline;
129 
137  private final LinkedBlockingQueue<FileIngestPipeline> fileIngestPipelinesQueue = new LinkedBlockingQueue<>();
138  private final List<FileIngestPipeline> fileIngestPipelines = new ArrayList<>();
139 
151  private volatile boolean currentDataSourceIngestModuleCancelled;
152  private final List<String> cancelledDataSourceIngestModules = new CopyOnWriteArrayList<>();
153  private volatile boolean cancelled;
154  private volatile IngestJob.CancellationReason cancellationReason = IngestJob.CancellationReason.NOT_CANCELLED;
155 
160  private static final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance();
161 
166  private final boolean doUI;
167 
172  private final Object dataSourceIngestProgressLock = new Object();
173  private ProgressHandle dataSourceIngestProgress;
174 
179  private final Object fileIngestProgressLock = new Object();
180  private final List<String> filesInProgress = new ArrayList<>();
181  private long estimatedFilesToProcess;
182  private long processedFiles;
183  private ProgressHandle fileIngestProgress;
184  private String currentFileIngestModule = "";
185  private String currentFileIngestTask = "";
186  private final List<IngestModuleInfo> ingestModules = new ArrayList<>();
187  private volatile IngestJobInfo ingestJob;
188 
192  private final long createTime;
193 
194  /*
195  * An ingest pipeline allows ingest module pipelines to register and
196  * unregister the ingest thread they are running in when a scheduled ingest
197  * pause occurs and the threads are made to sleep. This allows interruption
198  * of these threads if the ingest job is canceled.
199  */
200  private final Object threadRegistrationLock = new Object();
201  @GuardedBy("threadRegistrationLock")
202  private final Set<Thread> pausedIngestThreads = new HashSet<>();
203 
213  IngestJobPipeline(IngestJob parentJob, Content dataSource, IngestJobSettings settings) {
214  this(parentJob, dataSource, Collections.emptyList(), settings);
215  }
216 
228  IngestJobPipeline(IngestJob parentJob, Content dataSource, List<AbstractFile> files, IngestJobSettings settings) {
229  this.parentJob = parentJob;
230  this.id = IngestJobPipeline.nextJobId.getAndIncrement();
231  this.dataSource = dataSource;
232  this.files.addAll(files);
233  this.ingestMode = parentJob.getIngestMode();
234  this.settings = settings;
235  this.doUI = RuntimeProperties.runningWithGUI();
236  this.createTime = new Date().getTime();
237  this.stage = Stages.INITIALIZATION;
238  this.createIngestPipelines();
239  }
240 
251  private static void addOrdered(final List<IngestModuleTemplate> dest,
252  final Map<String, IngestModuleTemplate> src, final Map<String, IngestModuleTemplate> jythonSrc) {
253 
254  final List<IngestModuleTemplate> autopsyModules = new ArrayList<>();
255  final List<IngestModuleTemplate> thirdPartyModules = new ArrayList<>();
256 
257  Stream.concat(src.entrySet().stream(), jythonSrc.entrySet().stream()).forEach((templateEntry) -> {
258  if (templateEntry.getKey().startsWith(AUTOPSY_MODULE_PREFIX)) {
259  autopsyModules.add(templateEntry.getValue());
260  } else {
261  thirdPartyModules.add(templateEntry.getValue());
262  }
263  });
264 
265  dest.addAll(autopsyModules);
266  dest.addAll(thirdPartyModules);
267  }
268 
279  private static String getJythonName(String canonicalName) {
280  Matcher m = JYTHON_REGEX.matcher(canonicalName);
281  if (m.find()) {
282  return String.format("%s.%s", m.group(1), m.group(2));
283  } else {
284  return null;
285  }
286  }
287 
297  private static void addModule(Map<String, IngestModuleTemplate> mapping,
298  Map<String, IngestModuleTemplate> jythonMapping, IngestModuleTemplate template) {
299 
300  String className = template.getModuleFactory().getClass().getCanonicalName();
301  String jythonName = getJythonName(className);
302  if (jythonName != null) {
303  jythonMapping.put(jythonName, template);
304  } else {
305  mapping.put(className, template);
306  }
307  }
308 
312  private void createIngestPipelines() {
313  List<IngestModuleTemplate> ingestModuleTemplates = this.settings.getEnabledIngestModuleTemplates();
314 
318  Map<String, IngestModuleTemplate> dataSourceModuleTemplates = new LinkedHashMap<>();
319  Map<String, IngestModuleTemplate> fileModuleTemplates = new LinkedHashMap<>();
320 
321  // mappings for jython modules. These mappings are only used to determine modules in the pipelineconfig.xml.
322  Map<String, IngestModuleTemplate> jythonDataSourceModuleTemplates = new LinkedHashMap<>();
323  Map<String, IngestModuleTemplate> jythonFileModuleTemplates = new LinkedHashMap<>();
324 
325  for (IngestModuleTemplate template : ingestModuleTemplates) {
326  if (template.isDataSourceIngestModuleTemplate()) {
327  addModule(dataSourceModuleTemplates, jythonDataSourceModuleTemplates, template);
328  }
329  if (template.isFileIngestModuleTemplate()) {
330  addModule(fileModuleTemplates, jythonFileModuleTemplates, template);
331  }
332  }
333 
338  IngestPipelinesConfiguration pipelineConfigs = IngestPipelinesConfiguration.getInstance();
339  List<IngestModuleTemplate> firstStageDataSourceModuleTemplates = IngestJobPipeline.getConfiguredIngestModuleTemplates(
340  dataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfigs.getStageOneDataSourceIngestPipelineConfig());
341 
342  List<IngestModuleTemplate> fileIngestModuleTemplates = IngestJobPipeline.getConfiguredIngestModuleTemplates(
343  fileModuleTemplates, jythonFileModuleTemplates, pipelineConfigs.getFileIngestPipelineConfig());
344 
345  List<IngestModuleTemplate> secondStageDataSourceModuleTemplates = IngestJobPipeline.getConfiguredIngestModuleTemplates(
346  dataSourceModuleTemplates, null, pipelineConfigs.getStageTwoDataSourceIngestPipelineConfig());
347 
353  addOrdered(firstStageDataSourceModuleTemplates, dataSourceModuleTemplates, jythonDataSourceModuleTemplates);
354  addOrdered(fileIngestModuleTemplates, fileModuleTemplates, jythonFileModuleTemplates);
355 
359  this.firstStageDataSourceIngestPipeline = new DataSourceIngestPipeline(this, firstStageDataSourceModuleTemplates);
360  this.secondStageDataSourceIngestPipeline = new DataSourceIngestPipeline(this, secondStageDataSourceModuleTemplates);
361 
365  try {
366  int numberOfFileIngestThreads = IngestManager.getInstance().getNumberOfFileIngestThreads();
367  for (int i = 0; i < numberOfFileIngestThreads; ++i) {
368  FileIngestPipeline pipeline = new FileIngestPipeline(this, fileIngestModuleTemplates);
369  this.fileIngestPipelinesQueue.put(pipeline);
370  this.fileIngestPipelines.add(pipeline);
371  }
372  } catch (InterruptedException ex) {
378  Thread.currentThread().interrupt();
379  }
380  try {
381  SleuthkitCase skCase = Case.getCurrentCaseThrows().getSleuthkitCase();
382  this.addIngestModules(firstStageDataSourceModuleTemplates, IngestModuleType.DATA_SOURCE_LEVEL, skCase);
383  this.addIngestModules(fileIngestModuleTemplates, IngestModuleType.FILE_LEVEL, skCase);
384  this.addIngestModules(secondStageDataSourceModuleTemplates, IngestModuleType.DATA_SOURCE_LEVEL, skCase);
385  } catch (TskCoreException | NoCurrentCaseException ex) {
386  logErrorMessage(Level.WARNING, "Failed to add ingest modules listing to case database", ex);
387  }
388  }
389 
390  private void addIngestModules(List<IngestModuleTemplate> templates, IngestModuleType type, SleuthkitCase skCase) throws TskCoreException {
391  for (IngestModuleTemplate module : templates) {
392  ingestModules.add(skCase.addIngestModule(module.getModuleName(), FactoryClassNameNormalizer.normalize(module.getModuleFactory().getClass().getCanonicalName()), type, module.getModuleFactory().getModuleVersionNumber()));
393  }
394  }
395 
416  private static List<IngestModuleTemplate> getConfiguredIngestModuleTemplates(
417  Map<String, IngestModuleTemplate> ingestModuleTemplates, Map<String, IngestModuleTemplate> jythonIngestModuleTemplates, List<String> pipelineConfig) {
418  List<IngestModuleTemplate> templates = new ArrayList<>();
419  for (String moduleClassName : pipelineConfig) {
420  if (ingestModuleTemplates != null && ingestModuleTemplates.containsKey(moduleClassName)) {
421  templates.add(ingestModuleTemplates.remove(moduleClassName));
422  } else if (jythonIngestModuleTemplates != null && jythonIngestModuleTemplates.containsKey(moduleClassName)) {
423  templates.add(jythonIngestModuleTemplates.remove(moduleClassName));
424  }
425  }
426  return templates;
427  }
428 
434  long getId() {
435  return this.id;
436  }
437 
443  String getExecutionContext() {
444  return this.settings.getExecutionContext();
445  }
446 
452  Content getDataSource() {
453  return this.dataSource;
454  }
455 
462  boolean shouldProcessUnallocatedSpace() {
463  return this.settings.getProcessUnallocatedSpace();
464  }
465 
471  FilesSet getFileIngestFilter() {
472  return this.settings.getFileFilter();
473  }
474 
480  boolean hasIngestPipeline() {
481  return this.hasFirstStageDataSourceIngestPipeline()
482  || this.hasFileIngestPipeline()
483  || this.hasSecondStageDataSourceIngestPipeline();
484  }
485 
492  private boolean hasFirstStageDataSourceIngestPipeline() {
493  return (this.firstStageDataSourceIngestPipeline.isEmpty() == false);
494  }
495 
502  private boolean hasSecondStageDataSourceIngestPipeline() {
503  return (this.secondStageDataSourceIngestPipeline.isEmpty() == false);
504  }
505 
511  private boolean hasFileIngestPipeline() {
512  if (!this.fileIngestPipelines.isEmpty()) {
513  return !this.fileIngestPipelines.get(0).isEmpty();
514  }
515  return false;
516  }
517 
523  List<IngestModuleError> start() {
524  if (dataSource == null) {
525  // TODO - Remove once data source is always present during initialization
526  throw new IllegalStateException("Ingest started before setting data source");
527  }
528  List<IngestModuleError> errors = startUpIngestPipelines();
529  if (errors.isEmpty()) {
530  try {
531  this.ingestJob = Case.getCurrentCaseThrows().getSleuthkitCase().addIngestJob(dataSource, NetworkUtils.getLocalHostName(), ingestModules, new Date(this.createTime), new Date(0), IngestJobStatusType.STARTED, "");
532  } catch (TskCoreException | NoCurrentCaseException ex) {
533  logErrorMessage(Level.WARNING, "Failed to add ingest job info to case database", ex); //NON-NLS
534  }
535 
536  if (this.hasFirstStageDataSourceIngestPipeline() || this.hasFileIngestPipeline()) {
537  if (ingestMode == IngestJob.Mode.BATCH) {
538  logInfoMessage("Starting first stage analysis"); //NON-NLS
539  this.startFirstStage();
540  } else {
541  logInfoMessage("Preparing for first stage analysis"); //NON-NLS
542  this.startFileIngestStreaming();
543  }
544  } else if (this.hasSecondStageDataSourceIngestPipeline()) {
545  logInfoMessage("Starting second stage analysis"); //NON-NLS
546  this.startSecondStage();
547  }
548  }
549  return errors;
550  }
551 
558  private List<IngestModuleError> startUpIngestPipelines() {
559  List<IngestModuleError> errors = new ArrayList<>();
560 
561  /*
562  * Start the data-source-level ingest module pipelines.
563  */
564  errors.addAll(this.firstStageDataSourceIngestPipeline.startUp());
565  errors.addAll(this.secondStageDataSourceIngestPipeline.startUp());
566 
567  /*
568  * If the data-source-level ingest pipelines were successfully started,
569  * start the file-level ingest pipelines (one per pipeline file ingest
570  * thread).
571  */
572  if (errors.isEmpty()) {
573  for (FileIngestPipeline pipeline : this.fileIngestPipelinesQueue) {
574  errors.addAll(pipeline.startUp());
575  if (!errors.isEmpty()) {
576  /*
577  * If there are start up errors, the ingest job will not
578  * proceed, so shut down any file ingest pipelines that did
579  * start up.
580  */
581  while (!this.fileIngestPipelinesQueue.isEmpty()) {
582  FileIngestPipeline startedPipeline = this.fileIngestPipelinesQueue.poll();
583  if (startedPipeline.isRunning()) {
584  List<IngestModuleError> shutDownErrors = startedPipeline.shutDown();
585  if (!shutDownErrors.isEmpty()) {
586  /*
587  * The start up errors will ultimately be
588  * reported to the user for possible remedy, but
589  * the shut down errors are logged here.
590  */
591  logIngestModuleErrors(shutDownErrors);
592  }
593  }
594  }
595  break;
596  }
597  }
598  }
599  return errors;
600  }
601 
605  private void startFirstStage() {
606  this.stage = IngestJobPipeline.Stages.FIRST_STAGE_FILES_AND_DATASOURCE;
607 
608  if (this.hasFileIngestPipeline()) {
609  synchronized (this.fileIngestProgressLock) {
610  this.estimatedFilesToProcess = this.dataSource.accept(new GetFilesCountVisitor());
611  }
612  }
613 
614  if (this.doUI) {
618  if (this.hasFirstStageDataSourceIngestPipeline()) {
619  this.startDataSourceIngestProgressBar();
620  }
621  if (this.hasFileIngestPipeline()) {
622  this.startFileIngestProgressBar();
623  }
624  }
625 
630  synchronized (this.dataSourceIngestPipelineLock) {
631  this.currentDataSourceIngestPipeline = this.firstStageDataSourceIngestPipeline;
632  }
633 
637  if (this.hasFirstStageDataSourceIngestPipeline() && this.hasFileIngestPipeline()) {
638  logInfoMessage("Scheduling first stage data source and file level analysis tasks"); //NON-NLS
639  IngestJobPipeline.taskScheduler.scheduleIngestTasks(this);
640  } else if (this.hasFirstStageDataSourceIngestPipeline()) {
641  logInfoMessage("Scheduling first stage data source level analysis tasks"); //NON-NLS
642  IngestJobPipeline.taskScheduler.scheduleDataSourceIngestTask(this);
643  } else {
644  logInfoMessage("Scheduling file level analysis tasks, no first stage data source level analysis configured"); //NON-NLS
645  IngestJobPipeline.taskScheduler.scheduleFileIngestTasks(this, this.files);
646 
655  this.checkForStageCompleted();
656  }
657  }
658 
663  private void startFileIngestStreaming() {
664  synchronized (this.stageCompletionCheckLock) {
665  this.stage = IngestJobPipeline.Stages.FIRST_STAGE_FILES_ONLY;
666  }
667 
668  if (this.hasFileIngestPipeline()) {
669  synchronized (this.fileIngestProgressLock) {
670  this.estimatedFilesToProcess = 0; // Set to indeterminate until the data source is complete
671  }
672  }
673 
674  if (this.doUI) {
675  if (this.hasFileIngestPipeline()) {
676  this.startFileIngestProgressBar();
677  }
678  }
679 
680  logInfoMessage("Waiting for streaming files"); //NON-NLS
681  }
682 
687  private void startDataSourceIngestStreaming() {
688 
689  // Now that the data source is complete, we can get the estimated number of
690  // files and switch to a determinate progress bar.
691  synchronized (fileIngestProgressLock) {
692  if (null != this.fileIngestProgress) {
693  estimatedFilesToProcess = dataSource.accept(new GetFilesCountVisitor());
694  fileIngestProgress.switchToDeterminate((int) estimatedFilesToProcess);
695  }
696  }
697 
698  if (this.doUI) {
702  if (this.hasFirstStageDataSourceIngestPipeline()) {
703  this.startDataSourceIngestProgressBar();
704  }
705  }
706 
711  synchronized (this.dataSourceIngestPipelineLock) {
712  this.currentDataSourceIngestPipeline = this.firstStageDataSourceIngestPipeline;
713  }
714 
715  logInfoMessage("Scheduling first stage data source level analysis tasks"); //NON-NLS
716  synchronized (this.stageCompletionCheckLock) {
717  this.stage = IngestJobPipeline.Stages.FIRST_STAGE_FILES_AND_DATASOURCE;
718  IngestJobPipeline.taskScheduler.scheduleDataSourceIngestTask(this);
719  }
720  }
721 
725  private void startSecondStage() {
726  logInfoMessage("Starting second stage analysis"); //NON-NLS
727  this.stage = IngestJobPipeline.Stages.SECOND_STAGE;
728  if (this.doUI) {
729  this.startDataSourceIngestProgressBar();
730  }
731  synchronized (this.dataSourceIngestPipelineLock) {
732  this.currentDataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline;
733  }
734  logInfoMessage("Scheduling second stage data source level analysis tasks"); //NON-NLS
735  IngestJobPipeline.taskScheduler.scheduleDataSourceIngestTask(this);
736  }
737 
741  private void startDataSourceIngestProgressBar() {
742  if (this.doUI) {
743  synchronized (this.dataSourceIngestProgressLock) {
744  String displayName = NbBundle.getMessage(this.getClass(),
745  "IngestJob.progress.dataSourceIngest.initialDisplayName",
746  this.dataSource.getName());
747  this.dataSourceIngestProgress = ProgressHandle.createHandle(displayName, new Cancellable() {
748  @Override
749  public boolean cancel() {
750  // If this method is called, the user has already pressed
751  // the cancel button on the progress bar and the OK button
752  // of a cancelation confirmation dialog supplied by
753  // NetBeans. What remains to be done is to find out whether
754  // the user wants to cancel only the currently executing
755  // data source ingest module or the entire ingest job.
756  DataSourceIngestCancellationPanel panel = new DataSourceIngestCancellationPanel();
757  String dialogTitle = NbBundle.getMessage(IngestJobPipeline.this.getClass(), "IngestJob.cancellationDialog.title");
758  JOptionPane.showConfirmDialog(WindowManager.getDefault().getMainWindow(), panel, dialogTitle, JOptionPane.OK_OPTION, JOptionPane.PLAIN_MESSAGE);
759  if (panel.cancelAllDataSourceIngestModules()) {
760  IngestJobPipeline.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
761  } else {
762  IngestJobPipeline.this.cancelCurrentDataSourceIngestModule();
763  }
764  return true;
765  }
766  });
767  this.dataSourceIngestProgress.start();
768  this.dataSourceIngestProgress.switchToIndeterminate();
769  }
770  }
771  }
772 
776  private void startFileIngestProgressBar() {
777  if (this.doUI) {
778  synchronized (this.fileIngestProgressLock) {
779  String displayName = NbBundle.getMessage(this.getClass(),
780  "IngestJob.progress.fileIngest.displayName",
781  this.dataSource.getName());
782  this.fileIngestProgress = ProgressHandle.createHandle(displayName, new Cancellable() {
783  @Override
784  public boolean cancel() {
785  // If this method is called, the user has already pressed
786  // the cancel button on the progress bar and the OK button
787  // of a cancelation confirmation dialog supplied by
788  // NetBeans.
789  IngestJobPipeline.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
790  return true;
791  }
792  });
793  this.fileIngestProgress.start();
794  this.fileIngestProgress.switchToDeterminate((int) this.estimatedFilesToProcess);
795  }
796  }
797  }
798 
803  private void checkForStageCompleted() {
804  if (ingestMode == IngestJob.Mode.BATCH) {
805  checkForStageCompletedBatch();
806  } else {
807  checkForStageCompletedStreaming();
808  }
809  }
810 
815  private void checkForStageCompletedBatch() {
816  synchronized (this.stageCompletionCheckLock) {
817  if (IngestJobPipeline.taskScheduler.currentTasksAreCompleted(this)) {
818  switch (this.stage) {
819  case FIRST_STAGE_FILES_AND_DATASOURCE:
820  this.finishFirstStage();
821  break;
822  case SECOND_STAGE:
823  this.finish();
824  break;
825  }
826  }
827  }
828  }
829 
834  private void checkForStageCompletedStreaming() {
835  synchronized (this.stageCompletionCheckLock) {
836  if (IngestJobPipeline.taskScheduler.currentTasksAreCompleted(this)) {
837  switch (this.stage) {
838  case FIRST_STAGE_FILES_ONLY:
839  // Nothing to do here - need to wait for the data source
840  break;
841  case FIRST_STAGE_FILES_AND_DATASOURCE:
842  // Finish file and data source ingest, start second stage (if applicable)
843  this.finishFirstStage();
844  break;
845  case SECOND_STAGE:
846  this.finish();
847  break;
848  }
849  }
850  }
851  }
852 
857  private void finishFirstStage() {
858  logInfoMessage("Finished first stage analysis"); //NON-NLS
859 
860  // Shut down the file ingest pipelines. Note that no shut down is
861  // required for the data source ingest pipeline because data source
862  // ingest modules do not have a shutdown() method.
863  List<IngestModuleError> errors = new ArrayList<>();
864  while (!this.fileIngestPipelinesQueue.isEmpty()) {
865  FileIngestPipeline pipeline = fileIngestPipelinesQueue.poll();
866  if (pipeline.isRunning()) {
867  errors.addAll(pipeline.shutDown());
868  }
869  }
870  if (!errors.isEmpty()) {
871  logIngestModuleErrors(errors);
872  }
873 
874  if (this.doUI) {
875  // Finish the first stage data source ingest progress bar, if it hasn't
876  // already been finished.
877  synchronized (this.dataSourceIngestProgressLock) {
878  if (this.dataSourceIngestProgress != null) {
879  this.dataSourceIngestProgress.finish();
880  this.dataSourceIngestProgress = null;
881  }
882  }
883 
884  // Finish the file ingest progress bar, if it hasn't already
885  // been finished.
886  synchronized (this.fileIngestProgressLock) {
887  if (this.fileIngestProgress != null) {
888  this.fileIngestProgress.finish();
889  this.fileIngestProgress = null;
890  }
891  }
892  }
893 
897  if (!this.cancelled && this.hasSecondStageDataSourceIngestPipeline()) {
898  this.startSecondStage();
899  } else {
900  this.finish();
901  }
902  }
903 
907  private void finish() {
908  logInfoMessage("Finished analysis"); //NON-NLS
909  this.stage = IngestJobPipeline.Stages.FINALIZATION;
910 
911  if (this.doUI) {
912  // Finish the second stage data source ingest progress bar, if it hasn't
913  // already been finished.
914  synchronized (this.dataSourceIngestProgressLock) {
915  if (this.dataSourceIngestProgress != null) {
916  this.dataSourceIngestProgress.finish();
917  this.dataSourceIngestProgress = null;
918  }
919  }
920  }
921  if (ingestJob != null) {
922  if (this.cancelled) {
923  try {
924  ingestJob.setIngestJobStatus(IngestJobStatusType.CANCELLED);
925  } catch (TskCoreException ex) {
926  logErrorMessage(Level.WARNING, "Failed to update ingest job status in case database", ex);
927  }
928  } else {
929  try {
930  ingestJob.setIngestJobStatus(IngestJobStatusType.COMPLETED);
931  } catch (TskCoreException ex) {
932  logErrorMessage(Level.WARNING, "Failed to update ingest job status in case database", ex);
933  }
934  }
935  try {
936  this.ingestJob.setEndDateTime(new Date());
937  } catch (TskCoreException ex) {
938  logErrorMessage(Level.WARNING, "Failed to set job end date in case database", ex);
939  }
940  }
941  this.parentJob.ingestJobPipelineFinished(this);
942  }
943 
950  void process(DataSourceIngestTask task) {
951  try {
952  synchronized (this.dataSourceIngestPipelineLock) {
953  if (!this.isCancelled() && !this.currentDataSourceIngestPipeline.isEmpty()) {
954  List<IngestModuleError> errors = new ArrayList<>();
955  errors.addAll(this.currentDataSourceIngestPipeline.performTask(task));
956  if (!errors.isEmpty()) {
957  logIngestModuleErrors(errors);
958  }
959  }
960  }
961 
962  if (this.doUI) {
967  synchronized (this.dataSourceIngestProgressLock) {
968  if (null != this.dataSourceIngestProgress) {
969  this.dataSourceIngestProgress.finish();
970  this.dataSourceIngestProgress = null;
971  }
972  }
973  }
974 
975  } finally {
976  IngestJobPipeline.taskScheduler.notifyTaskCompleted(task);
977  this.checkForStageCompleted();
978  }
979  }
980 
992  void process(FileIngestTask task) throws InterruptedException {
993  try {
994  if (!this.isCancelled()) {
995  FileIngestPipeline pipeline = this.fileIngestPipelinesQueue.take();
996  if (!pipeline.isEmpty()) {
997  AbstractFile file;
998  try {
999  file = task.getFile();
1000  } catch (TskCoreException ex) {
1001  // In practice, this task would never have been enqueued since the file
1002  // lookup would have failed there.
1003  List<IngestModuleError> errors = new ArrayList<>();
1004  errors.add(new IngestModuleError("Ingest Job Pipeline", ex));
1005  logIngestModuleErrors(errors);
1006  this.fileIngestPipelinesQueue.put(pipeline);
1007  return;
1008  }
1009 
1010  synchronized (this.fileIngestProgressLock) {
1011  ++this.processedFiles;
1012  if (this.doUI) {
1016  if (this.processedFiles <= this.estimatedFilesToProcess) {
1017  this.fileIngestProgress.progress(file.getName(), (int) this.processedFiles);
1018  } else {
1019  this.fileIngestProgress.progress(file.getName(), (int) this.estimatedFilesToProcess);
1020  }
1021  this.filesInProgress.add(file.getName());
1022  }
1023  }
1024 
1028  List<IngestModuleError> errors = new ArrayList<>();
1029  errors.addAll(pipeline.performTask(task));
1030  if (!errors.isEmpty()) {
1031  logIngestModuleErrors(errors, file);
1032  }
1033 
1034  if (this.doUI && !this.cancelled) {
1035  synchronized (this.fileIngestProgressLock) {
1040  this.filesInProgress.remove(file.getName());
1041  if (this.filesInProgress.size() > 0) {
1042  this.fileIngestProgress.progress(this.filesInProgress.get(0));
1043  } else {
1044  this.fileIngestProgress.progress("");
1045  }
1046  }
1047  }
1048  }
1049  this.fileIngestPipelinesQueue.put(pipeline);
1050  }
1051  } finally {
1052  IngestJobPipeline.taskScheduler.notifyTaskCompleted(task);
1053  this.checkForStageCompleted();
1054  }
1055  }
1056 
1063  void addStreamingIngestFiles(List<Long> fileObjIds) {
1064 
1065  // Return if there are no file ingest modules enabled.
1066  if (!hasFileIngestPipeline()) {
1067  return;
1068  }
1069 
1070  if (stage.equals(Stages.FIRST_STAGE_FILES_ONLY)) {
1071  IngestJobPipeline.taskScheduler.scheduleStreamedFileIngestTasks(this, fileObjIds);
1072  } else {
1073  logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported");
1074  }
1075  }
1076 
1081  void processStreamingIngestDataSource() {
1082  startDataSourceIngestStreaming();
1083  checkForStageCompleted();
1084  }
1085 
1093  void addFiles(List<AbstractFile> files) {
1094  if (stage.equals(Stages.FIRST_STAGE_FILES_ONLY)
1095  || stage.equals(Stages.FIRST_STAGE_FILES_AND_DATASOURCE)) {
1096  IngestJobPipeline.taskScheduler.fastTrackFileIngestTasks(this, files);
1097  } else {
1098  logErrorMessage(Level.SEVERE, "Adding files to job during second stage analysis not supported");
1099  }
1100 
1107  this.checkForStageCompleted();
1108  }
1109 
1116  void updateDataSourceIngestProgressBarDisplayName(String displayName) {
1117  if (this.doUI && !this.cancelled) {
1118  synchronized (this.dataSourceIngestProgressLock) {
1119  this.dataSourceIngestProgress.setDisplayName(displayName);
1120  }
1121  }
1122  }
1123 
1132  void switchDataSourceIngestProgressBarToDeterminate(int workUnits) {
1133  if (this.doUI && !this.cancelled) {
1134  synchronized (this.dataSourceIngestProgressLock) {
1135  if (null != this.dataSourceIngestProgress) {
1136  this.dataSourceIngestProgress.switchToDeterminate(workUnits);
1137  }
1138  }
1139  }
1140  }
1141 
1147  void switchDataSourceIngestProgressBarToIndeterminate() {
1148  if (this.doUI && !this.cancelled) {
1149  synchronized (this.dataSourceIngestProgressLock) {
1150  if (null != this.dataSourceIngestProgress) {
1151  this.dataSourceIngestProgress.switchToIndeterminate();
1152  }
1153  }
1154  }
1155  }
1156 
1163  void advanceDataSourceIngestProgressBar(int workUnits) {
1164  if (this.doUI && !this.cancelled) {
1165  synchronized (this.dataSourceIngestProgressLock) {
1166  if (null != this.dataSourceIngestProgress) {
1167  this.dataSourceIngestProgress.progress("", workUnits);
1168  }
1169  }
1170  }
1171  }
1172 
1179  void advanceDataSourceIngestProgressBar(String currentTask) {
1180  if (this.doUI && !this.cancelled) {
1181  synchronized (this.dataSourceIngestProgressLock) {
1182  if (null != this.dataSourceIngestProgress) {
1183  this.dataSourceIngestProgress.progress(currentTask);
1184  }
1185  }
1186  }
1187  }
1188 
1197  void advanceDataSourceIngestProgressBar(String currentTask, int workUnits) {
1198  if (this.doUI && !this.cancelled) {
1199  synchronized (this.fileIngestProgressLock) {
1200  this.dataSourceIngestProgress.progress(currentTask, workUnits);
1201  }
1202  }
1203  }
1204 
1212  boolean currentDataSourceIngestModuleIsCancelled() {
1213  return this.currentDataSourceIngestModuleCancelled;
1214  }
1215 
1222  void currentDataSourceIngestModuleCancellationCompleted(String moduleDisplayName) {
1223  this.currentDataSourceIngestModuleCancelled = false;
1224  this.cancelledDataSourceIngestModules.add(moduleDisplayName);
1225 
1226  if (this.doUI) {
1234  synchronized (this.dataSourceIngestProgressLock) {
1235  this.dataSourceIngestProgress.finish();
1236  this.dataSourceIngestProgress = null;
1237  this.startDataSourceIngestProgressBar();
1238  }
1239  }
1240  }
1241 
1247  DataSourceIngestPipeline.DataSourcePipelineModule getCurrentDataSourceIngestModule() {
1248  if (null != currentDataSourceIngestPipeline) {
1249  return (DataSourceIngestPipeline.DataSourcePipelineModule) currentDataSourceIngestPipeline.getCurrentlyRunningModule();
1250  } else {
1251  return null;
1252  }
1253  }
1254 
1259  void cancelCurrentDataSourceIngestModule() {
1260  this.currentDataSourceIngestModuleCancelled = true;
1261  }
1262 
1269  void cancel(IngestJob.CancellationReason reason) {
1270  this.cancelled = true;
1271  this.cancellationReason = reason;
1272  IngestJobPipeline.taskScheduler.cancelPendingTasksForIngestJob(this);
1273 
1274  if (this.doUI) {
1275  synchronized (this.dataSourceIngestProgressLock) {
1276  if (null != dataSourceIngestProgress) {
1277  dataSourceIngestProgress.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataSourceIngest.initialDisplayName", this.dataSource.getName()));
1278  dataSourceIngestProgress.progress(NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling"));
1279  }
1280  }
1281 
1282  synchronized (this.fileIngestProgressLock) {
1283  if (null != this.fileIngestProgress) {
1284  this.fileIngestProgress.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestJob.progress.fileIngest.displayName", this.dataSource.getName()));
1285  this.fileIngestProgress.progress(NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling"));
1286  }
1287  }
1288  }
1289 
1290  synchronized (threadRegistrationLock) {
1291  for (Thread thread : pausedIngestThreads) {
1292  thread.interrupt();
1293  }
1294  pausedIngestThreads.clear();
1295  }
1296 
1297  // If a data source had no tasks in progress it may now be complete.
1298  checkForStageCompleted();
1299  }
1300 
1308  void setCurrentFileIngestModule(String moduleName, String taskName) {
1309  this.currentFileIngestModule = moduleName;
1310  this.currentFileIngestTask = taskName;
1311  }
1312 
1319  boolean isCancelled() {
1320  return this.cancelled;
1321  }
1322 
1328  IngestJob.CancellationReason getCancellationReason() {
1329  return this.cancellationReason;
1330  }
1331 
1338  private void logInfoMessage(String message) {
1339  logger.log(Level.INFO, String.format("%s (data source = %s, objId = %d, pipeline id = %d, ingest job id = %d)", message, this.dataSource.getName(), this.dataSource.getId(), id, ingestJob.getIngestJobId())); //NON-NLS
1340  }
1341 
1350  private void logErrorMessage(Level level, String message, Throwable throwable) {
1351  logger.log(level, String.format("%s (data source = %s, objId = %d, pipeline id = %d, ingest job id = %d)", message, this.dataSource.getName(), this.dataSource.getId(), id, ingestJob.getIngestJobId()), throwable); //NON-NLS
1352  }
1353 
1361  private void logErrorMessage(Level level, String message) {
1362  logger.log(level, String.format("%s (data source = %s, objId = %d, pipeline id = %d, ingest job id %d)", message, this.dataSource.getName(), this.dataSource.getId(), id, ingestJob.getIngestJobId())); //NON-NLS
1363  }
1364 
1370  private void logIngestModuleErrors(List<IngestModuleError> errors) {
1371  for (IngestModuleError error : errors) {
1372  logErrorMessage(Level.SEVERE, String.format("%s experienced an error during analysis", error.getModuleDisplayName()), error.getThrowable()); //NON-NLS
1373  }
1374  }
1375 
1382  private void logIngestModuleErrors(List<IngestModuleError> errors, AbstractFile file) {
1383  for (IngestModuleError error : errors) {
1384  logErrorMessage(Level.SEVERE, String.format("%s experienced an error during analysis while processing file %s, object ID %d", error.getModuleDisplayName(), file.getName(), file.getId()), error.getThrowable()); //NON-NLS
1385  }
1386  }
1387 
1393  Snapshot getSnapshot(boolean getIngestTasksSnapshot) {
1399  boolean fileIngestRunning = false;
1400  Date fileIngestStartTime = null;
1401 
1402  for (FileIngestPipeline pipeline : this.fileIngestPipelines) {
1403  if (pipeline.isRunning()) {
1404  fileIngestRunning = true;
1405  }
1406  Date pipelineStartTime = pipeline.getStartTime();
1407  if (null != pipelineStartTime && (null == fileIngestStartTime || pipelineStartTime.before(fileIngestStartTime))) {
1408  fileIngestStartTime = pipelineStartTime;
1409  }
1410  }
1411 
1412  long processedFilesCount = 0;
1413  long estimatedFilesToProcessCount = 0;
1414  long snapShotTime = new Date().getTime();
1415  IngestJobTasksSnapshot tasksSnapshot = null;
1416 
1417  if (getIngestTasksSnapshot) {
1418  synchronized (fileIngestProgressLock) {
1419  processedFilesCount = this.processedFiles;
1420  estimatedFilesToProcessCount = this.estimatedFilesToProcess;
1421  snapShotTime = new Date().getTime();
1422  }
1423  tasksSnapshot = IngestJobPipeline.taskScheduler.getTasksSnapshotForJob(id);
1424 
1425  }
1426 
1427  return new Snapshot(this.dataSource.getName(), id, createTime,
1428  getCurrentDataSourceIngestModule(), fileIngestRunning, fileIngestStartTime,
1429  cancelled, cancellationReason, cancelledDataSourceIngestModules,
1430  processedFilesCount, estimatedFilesToProcessCount, snapShotTime, tasksSnapshot);
1431  }
1432 
1439  void registerPausedIngestThread(Thread thread) {
1440  synchronized (threadRegistrationLock) {
1441  pausedIngestThreads.add(thread);
1442  }
1443  }
1444 
1451  void unregisterPausedIngestThread(Thread thread) {
1452  synchronized (threadRegistrationLock) {
1453  pausedIngestThreads.remove(thread);
1454  }
1455  }
1456 
1457 }
List< IngestModuleTemplate > getEnabledIngestModuleTemplates()

Copyright © 2012-2021 Basis Technology. Generated on: Thu Jul 8 2021
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.