Autopsy  4.21.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestJobExecutor.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2014-2021 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import java.lang.reflect.InvocationTargetException;
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.Set;
28 import java.util.concurrent.CopyOnWriteArrayList;
29 import java.util.logging.Level;
30 import javax.annotation.concurrent.GuardedBy;
31 import javax.swing.JOptionPane;
32 import javax.swing.SwingUtilities;
33 import org.netbeans.api.progress.ProgressHandle;
34 import org.openide.util.Cancellable;
35 import org.openide.util.NbBundle;
36 import org.openide.util.NbBundle.Messages;
37 import org.openide.windows.WindowManager;
43 import org.sleuthkit.datamodel.AbstractFile;
44 import org.sleuthkit.datamodel.IngestJobInfo;
45 import org.sleuthkit.datamodel.IngestJobInfo.IngestJobStatusType;
46 import org.sleuthkit.datamodel.IngestModuleInfo;
47 import org.sleuthkit.datamodel.IngestModuleInfo.IngestModuleType;
48 import org.sleuthkit.datamodel.SleuthkitCase;
49 import org.sleuthkit.datamodel.TskCoreException;
52 import org.sleuthkit.datamodel.AnalysisResult;
53 import org.sleuthkit.datamodel.DataArtifact;
54 import org.sleuthkit.datamodel.DataSource;
55 
61 final class IngestJobExecutor {
62 
63  private static enum IngestJobState {
67  PIPELINES_SHUTTING_DOWN
68  };
69  private static final Logger logger = Logger.getLogger(IngestJobExecutor.class.getName());
70  private final IngestJob ingestJob;
71  private final long createTime;
72  private final boolean usingNetBeansGUI;
73  private final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance();
74  private final Object threadRegistrationLock = new Object();
75  @GuardedBy("threadRegistrationLock")
76  private final Set<Thread> pausedIngestThreads = new HashSet<>();
77  private final List<String> cancelledDataSourceIngestModules = new CopyOnWriteArrayList<>();
78  private final Object tierTransitionLock = new Object();
79  private final List<IngestModuleTier> ingestModuleTiers = new ArrayList<>();
80  private volatile int moduleTierIndex = 0;
81  private volatile IngestJobState jobState = IngestJobExecutor.IngestJobState.PIPELINES_STARTING_UP;
82  private volatile long estimatedFilesToProcess = 0;
83  private volatile long processedFiles = 0;
84  private volatile boolean currentDataSourceIngestModuleCancelled = false;
85  private volatile boolean jobCancelled = false;
86  private volatile IngestJob.CancellationReason cancellationReason = IngestJob.CancellationReason.NOT_CANCELLED;
87  private volatile IngestJobInfo casDbingestJobInfo;
88  @ThreadConfined(type = ThreadConfined.ThreadType.AWT)
89  private ProgressHandle dataSourceIngestProgressBar;
90  @ThreadConfined(type = ThreadConfined.ThreadType.AWT)
91  private final List<String> filesInProgress = new ArrayList<>();
92  @ThreadConfined(type = ThreadConfined.ThreadType.AWT)
93  private ProgressHandle fileIngestProgressBar;
94  @ThreadConfined(type = ThreadConfined.ThreadType.AWT)
95  private ProgressHandle artifactIngestProgressBar;
96  @ThreadConfined(type = ThreadConfined.ThreadType.AWT)
97  private ProgressHandle resultIngestProgressBar;
98 
111  IngestJobExecutor(IngestJob ingestJob) throws InterruptedException {
112  this.ingestJob = ingestJob;
113  createTime = new Date().getTime();
114  /*
115  * If running in the NetBeans thick client application version of
116  * Autopsy, NetBeans progress handles (i.e., progress bars) are used to
117  * display ingest job progress in the lower right hand corner of the
118  * main application window. A layer of abstraction to allow alternate
119  * representations of progress could be used here, as it is in some
120  * other places in the application (see implementations and usage of the
121  * org.sleuthkit.autopsy.progress.ProgressIndicator interface).
122  */
123  usingNetBeansGUI = RuntimeProperties.runningWithGUI();
124  }
125 
131  long getIngestJobId() {
132  return ingestJob.getId();
133  }
134 
141  String getExecutionContext() {
142  return ingestJob.getSettings().getExecutionContext();
143  }
144 
151  DataSource getDataSource() {
152  return ingestJob.getDataSource();
153  }
154 
161  boolean shouldProcessUnallocatedSpace() {
162  return ingestJob.getSettings().getProcessUnallocatedSpace();
163  }
164 
171  FilesSet getFileIngestFilter() {
172  return ingestJob.getSettings().getFileFilter();
173  }
174 
190  List<IngestModuleError> startUp() throws InterruptedException {
191  jobState = IngestJobState.PIPELINES_STARTING_UP;
192  ingestModuleTiers.addAll(IngestModuleTierBuilder.buildIngestModuleTiers(ingestJob.getSettings(), this));
193  List<IngestModuleError> errors = startUpIngestModulePipelines();
194  if (errors.isEmpty()) {
195  recordIngestJobStartUpInfo();
196  /*
197  * Start up and execution of the first ingest module tier requires
198  * some special treatment due to the differences between streaming
199  * and batch mode ingest jobs. Subsequent tiers can be handled
200  * generically.
201  */
202  if (ingestJob.getIngestMode() == IngestJob.Mode.STREAMING) {
203  startStreamingModeAnalysis();
204  } else {
205  startBatchModeAnalysis();
206  }
207  }
208  return errors;
209  }
210 
216  private List<IngestModuleError> startUpIngestModulePipelines() {
217  List<IngestModuleError> errors = new ArrayList<>();
218  for (IngestModuleTier moduleTier : ingestModuleTiers) {
219  Optional<DataSourceIngestPipeline> dataSourcePipeline = moduleTier.getDataSourceIngestPipeline();
220  if (dataSourcePipeline.isPresent()) {
221  errors.addAll(startUpIngestModulePipeline(dataSourcePipeline.get()));
222  }
223 
224  for (FileIngestPipeline pipeline : moduleTier.getFileIngestPipelines()) {
225  List<IngestModuleError> filePipelineErrors = startUpIngestModulePipeline(pipeline);
226  if (!filePipelineErrors.isEmpty()) {
227  /*
228  * If one file pipeline copy can't start up, assume that
229  * none of the other copies will be able to start up, for
230  * the same reason.
231  */
232  errors.addAll(filePipelineErrors);
233  break;
234  }
235  }
236 
237  Optional<DataArtifactIngestPipeline> dataArtifactPipeline = moduleTier.getDataArtifactIngestPipeline();
238  if (dataArtifactPipeline.isPresent()) {
239  errors.addAll(startUpIngestModulePipeline(dataArtifactPipeline.get()));
240  }
241 
242  Optional<AnalysisResultIngestPipeline> analysisResultPipeline = moduleTier.getAnalysisResultIngestPipeline();
243  if (analysisResultPipeline.isPresent()) {
244  errors.addAll(startUpIngestModulePipeline(analysisResultPipeline.get()));
245  }
246  }
247  return errors;
248  }
249 
258  private List<IngestModuleError> startUpIngestModulePipeline(IngestPipeline<?> pipeline) {
259  List<IngestModuleError> startUpErrors = pipeline.startUp();
260  if (!startUpErrors.isEmpty()) {
261  List<IngestModuleError> shutDownErrors = pipeline.shutDown();
262  if (!shutDownErrors.isEmpty()) {
263  logIngestModuleErrors(shutDownErrors);
264  }
265  }
266  return startUpErrors;
267  }
268 
274  private void recordIngestJobStartUpInfo() {
275  try {
276  SleuthkitCase caseDb = Case.getCurrentCase().getSleuthkitCase();
277  List<IngestModuleInfo> ingestModuleInfoList = new ArrayList<>();
278  for (IngestModuleTemplate module : ingestJob.getSettings().getEnabledIngestModuleTemplates()) {
279  IngestModuleType moduleType = getIngestModuleTemplateType(module);
280  IngestModuleInfo moduleInfo = caseDb.addIngestModule(module.getModuleName(), FactoryClassNameNormalizer.normalize(module.getModuleFactory().getClass().getCanonicalName()), moduleType, module.getModuleFactory().getModuleVersionNumber());
281  ingestModuleInfoList.add(moduleInfo);
282  }
283  casDbingestJobInfo = caseDb.addIngestJob(ingestJob.getDataSource(), NetworkUtils.getLocalHostName(), ingestModuleInfoList, new Date(this.createTime), new Date(0), IngestJobStatusType.STARTED, "");
284  } catch (TskCoreException ex) {
285  logErrorMessage(Level.SEVERE, "Failed to add ingest job info to case database", ex); //NON-NLS
286  }
287  }
288 
297  private static IngestModuleType getIngestModuleTemplateType(IngestModuleTemplate moduleTemplate) {
298  IngestModuleType type = null;
299  if (moduleTemplate.isDataSourceIngestModuleTemplate()) {
300  type = IngestModuleType.DATA_SOURCE_LEVEL;
301  }
302  if (moduleTemplate.isFileIngestModuleTemplate()) {
303  if (type == null) {
304  type = IngestModuleType.FILE_LEVEL;
305  } else {
306  type = IngestModuleType.MULTIPLE;
307  }
308  }
309  if (moduleTemplate.isDataArtifactIngestModuleTemplate()) {
310  if (type == null) {
311  type = IngestModuleType.DATA_ARTIFACT;
312  } else {
313  type = IngestModuleType.MULTIPLE;
314  }
315  }
316  return type;
317  }
318 
325  private void startBatchModeAnalysis() {
326  synchronized (tierTransitionLock) {
327  logInfoMessage("Starting ingest job in file batch mode"); //NON-NLS
328  jobState = IngestJobState.ANALYZING;
329  IngestModuleTier currentTier = ingestModuleTiers.get(moduleTierIndex);
330 
331  if (currentTier.hasDataSourceIngestModules()) {
332  startDataSourceIngestProgressBar();
333  taskScheduler.scheduleDataSourceIngestTask(this);
334  }
335 
336  if (currentTier.hasFileIngestModules()) {
337  estimateFilesToProcess();
338  startFileIngestProgressBar(true);
339  taskScheduler.scheduleFileIngestTasks(this, ingestJob.getFiles());
340  }
341 
342  if (currentTier.hasDataArtifactIngestModules()) {
343  /*
344  * Analysis of any data artifacts already in the case database
345  * (possibly added by the DSP) will be performed.
346  */
347  startDataArtifactIngestProgressBar();
348  taskScheduler.scheduleDataArtifactIngestTasks(this);
349  }
350 
351  if (currentTier.hasAnalysisResultIngestModules()) {
352  /*
353  * Analysis of any analysis results already in the case database
354  * (possibly added by the DSP) will be performed.
355  */
356  startAnalysisResultIngestProgressBar();
357  taskScheduler.scheduleAnalysisResultIngestTasks(this);
358  }
359 
360  /*
361  * Check for analysis completion. This is necessary because it is
362  * possible that none of the tasks that were just scheduled will
363  * actually make it to task execution, due to the file filter or
364  * other ingest job settings. If that happens, there will never be
365  * another analysis completion check for this job in an ingest
366  * thread executing an ingest task, so such a job would run forever,
367  * doing nothing, without a check here.
368  */
369  checkForTierCompleted(moduleTierIndex);
370  }
371  }
372 
376  private void estimateFilesToProcess() {
377  estimatedFilesToProcess = 0;
378  processedFiles = 0;
379  if (ingestModuleTiers.get(moduleTierIndex).hasFileIngestModules()) {
380  /*
381  * Do an estimate of the total number of files to be analyzed. This
382  * will be used to estimate of how many files remain to be analyzed
383  * as each file ingest task is completed. The numbers are estimates
384  * because analysis can add carved and/or derived files to the job.
385  */
386  List<AbstractFile> files = ingestJob.getFiles();
387  if (files.isEmpty()) {
388  /*
389  * Do a count of the files from the data source that the data
390  * source processor (DSP) has added to the case database.
391  */
392  estimatedFilesToProcess = ingestJob.getDataSource().accept(new GetFilesCountVisitor());
393  } else {
394  /*
395  * Otherwise, this job is analyzing a user-specified subset of
396  * the files in the data source.
397  */
398  estimatedFilesToProcess = files.size();
399  }
400  }
401  }
402 
410  private void startStreamingModeAnalysis() {
411  synchronized (tierTransitionLock) {
412  logInfoMessage("Starting ingest job in file streaming mode"); //NON-NLS
413  jobState = IngestJobState.ACCEPTING_STREAMED_CONTENT_AND_ANALYZING;
414  IngestModuleTier currentTier = ingestModuleTiers.get(moduleTierIndex);
415 
416  if (currentTier.hasFileIngestModules()) {
417  /*
418  * Start the file ingest progress bar, but do not schedule any
419  * file or data source ingest tasks. File ingest tasks will
420  * instead be scheduled as files are streamed in via
421  * addStreamedFiles(), and a data source ingest task will be
422  * scheduled later, via addStreamedDataSource().
423  */
424  startFileIngestProgressBar(false);
425  }
426 
427  if (currentTier.hasDataArtifactIngestModules()) {
428  /*
429  * Start the data artifact progress bar and schedule ingest
430  * tasks for any data artifacts currently in the case database.
431  * This needs to be done BEFORE any files or the data source are
432  * streamed in to ensure that any data artifacts added to the
433  * case database by the file and data source ingest tasks are
434  * not analyzed twice. This works here because the ingest
435  * manager has not yet returned the ingest stream object that is
436  * used to call addStreamedFiles() and addStreamedDataSource().
437  */
438  startDataArtifactIngestProgressBar();
439  taskScheduler.scheduleDataArtifactIngestTasks(this);
440  }
441 
442  if (currentTier.hasAnalysisResultIngestModules()) {
443  /*
444  * Start the analysis result progress bar and schedule ingest
445  * tasks for any analysis results currently in the case
446  * database. This needs to be done BEFORE any files or the data
447  * source are streamed in to ensure that any analysis results
448  * added to the case database by the file and data source ingest
449  * tasks are not analyzed twice. This works here because the
450  * ingest manager has not yet returned the ingest stream object
451  * that is used to call addStreamedFiles() and
452  * addStreamedDataSource().
453  */
454  startAnalysisResultIngestProgressBar();
455  taskScheduler.scheduleAnalysisResultIngestTasks(this);
456  }
457  }
458  }
459 
465  void addStreamedDataSource() {
466  synchronized (tierTransitionLock) {
467  logInfoMessage("Data source received in streaming mode ingest job"); //NON-NLS
468  jobState = IngestJobExecutor.IngestJobState.ANALYZING;
469  IngestModuleTier currentTier = ingestModuleTiers.get(moduleTierIndex);
470 
471  if (currentTier.hasFileIngestModules()) {
472  estimateFilesToProcess();
473  switchFileIngestProgressBarToDeterminate();
474  // We don't need to schedule file tasks here because they've already been
475  // added as the data source was being processed
476  }
477 
478  if (currentTier.hasDataSourceIngestModules()) {
479  taskScheduler.scheduleDataSourceIngestTask(this);
480  startDataSourceIngestProgressBar();
481  } else {
482  /*
483  * If no data source level ingest task is scheduled at this
484  * time, and all of the file level and artifact ingest tasks
485  * scheduled during the initial file streaming stage have
486  * already been executed, there will never be a stage completion
487  * check in an ingest thread executing an ingest task for this
488  * job, so such a job would run forever, doing nothing, without
489  * a check here.
490  */
491  checkForTierCompleted(moduleTierIndex);
492  }
493  }
494  }
495 
501  private void checkForTierCompleted(int currentTier) {
502  synchronized (tierTransitionLock) {
503  if (jobState.equals(IngestJobState.ACCEPTING_STREAMED_CONTENT_AND_ANALYZING)) {
504  return;
505  }
506  if (currentTier < moduleTierIndex) {
507  // We likely had a leftover task from the previous tier. Since we've already
508  // advanced to the next tier, ignore it.
509  return;
510  }
511  if (taskScheduler.currentTasksAreCompleted(getIngestJobId())) {
512  do {
513  shutDownCurrentTier();
514  moduleTierIndex++;
515  if (moduleTierIndex < ingestModuleTiers.size()) {
516  startAnalysisForCurrentTier();
517  } else {
518  shutDown();
519  break;
520  }
521  } while (taskScheduler.currentTasksAreCompleted(getIngestJobId())); // Loop again immediately in case the new tier is empty
522  }
523  }
524  }
525 
530  private void startAnalysisForCurrentTier() {
531  logInfoMessage(String.format("Scheduling ingest tasks for tier %s of ingest job", moduleTierIndex)); //NON-NLS
532  jobState = IngestJobExecutor.IngestJobState.ANALYZING;
533  IngestModuleTier currentTier = ingestModuleTiers.get(moduleTierIndex);
534 
535  if (currentTier.hasDataSourceIngestModules()) {
536  startDataSourceIngestProgressBar();
537  taskScheduler.scheduleDataSourceIngestTask(this);
538  }
539 
540  if (currentTier.hasFileIngestModules()) {
541  estimateFilesToProcess();
542  startFileIngestProgressBar(true);
543  taskScheduler.scheduleFileIngestTasks(this, ingestJob.getFiles());
544  }
545 
546  if (currentTier.hasDataArtifactIngestModules()) {
547  startDataArtifactIngestProgressBar();
548  }
549 
550  if (currentTier.hasAnalysisResultIngestModules()) {
551  startDataArtifactIngestProgressBar();
552  }
553  }
554 
562  void execute(DataSourceIngestTask task) {
563  try {
564  if (!isCancelled()) {
565  Optional<DataSourceIngestPipeline> pipeline = ingestModuleTiers.get(moduleTierIndex).getDataSourceIngestPipeline();
566  if (pipeline.isPresent()) {
567  List<IngestModuleError> errors = new ArrayList<>();
568  errors.addAll(pipeline.get().performTask(task));
569  if (!errors.isEmpty()) {
570  logIngestModuleErrors(errors);
571  }
572  }
573  }
574  } finally {
575  // Save the module tier assocaited with this task since it could change after
576  // notifyTaskComplete
577  int currentTier = moduleTierIndex;
578  taskScheduler.notifyTaskCompleted(task);
579  checkForTierCompleted(currentTier);
580  }
581  }
582 
590  void execute(FileIngestTask task) {
591  try {
592  if (!isCancelled()) {
593  FileIngestPipeline pipeline = ingestModuleTiers.get(moduleTierIndex).takeFileIngestPipeline();
594  if (!pipeline.isEmpty()) {
595  /*
596  * Get the file from the task. If the file was streamed in,
597  * the task may only have the file object ID, and a trip to
598  * the case database will be required.
599  */
600  AbstractFile file;
601  try {
602  file = task.getFile();
603  } catch (TskCoreException ex) {
604  List<IngestModuleError> errors = new ArrayList<>();
605  errors.add(new IngestModuleError("Ingest Pipeline", ex));
606  logIngestModuleErrors(errors);
607  ingestModuleTiers.get(moduleTierIndex).returnFileIngestPipeleine(pipeline);
608  return;
609  }
610 
615  final String fileName = file.getName();
616  processedFiles++;
617  updateFileProgressBarForFileTaskStarted(fileName);
618  List<IngestModuleError> errors = new ArrayList<>();
619  errors.addAll(pipeline.performTask(task));
620  if (!errors.isEmpty()) {
621  logIngestModuleErrors(errors, file);
622  }
623  updateFileProgressBarForFileTaskCompleted(fileName);
624  }
625  ingestModuleTiers.get(moduleTierIndex).returnFileIngestPipeleine(pipeline);
626  }
627  } catch (InterruptedException ex) {
628  logger.log(Level.SEVERE, String.format("File ingest thread interrupted during execution of file ingest job (file object ID = %d, thread ID = %d)", task.getFileId(), task.getThreadId()), ex);
629  Thread.currentThread().interrupt();
630  } finally {
631  // Save the module tier assocaited with this task since it could change after
632  // notifyTaskComplete
633  int currentTier = moduleTierIndex;
634  taskScheduler.notifyTaskCompleted(task);
635  checkForTierCompleted(currentTier);
636  }
637  }
638 
646  void execute(DataArtifactIngestTask task) {
647  try {
648  if (!isCancelled()) {
649  Optional<DataArtifactIngestPipeline> pipeline = ingestModuleTiers.get(moduleTierIndex).getDataArtifactIngestPipeline();
650  if (pipeline.isPresent()) {
651  List<IngestModuleError> errors = new ArrayList<>();
652  errors.addAll(pipeline.get().performTask(task));
653  if (!errors.isEmpty()) {
654  logIngestModuleErrors(errors);
655  }
656  }
657  }
658  } finally {
659  // Save the module tier assocaited with this task since it could change after
660  // notifyTaskComplete
661  int currentTier = moduleTierIndex;
662  taskScheduler.notifyTaskCompleted(task);
663  checkForTierCompleted(currentTier);
664  }
665  }
666 
674  void execute(AnalysisResultIngestTask task) {
675  try {
676  if (!isCancelled()) {
677  Optional<AnalysisResultIngestPipeline> pipeline = ingestModuleTiers.get(moduleTierIndex).getAnalysisResultIngestPipeline();
678  if (pipeline.isPresent()) {
679  List<IngestModuleError> errors = new ArrayList<>();
680  errors.addAll(pipeline.get().performTask(task));
681  if (!errors.isEmpty()) {
682  logIngestModuleErrors(errors);
683  }
684  }
685  }
686  } finally {
687  // Save the module tier assocaited with this task since it could change after
688  // notifyTaskComplete
689  int currentTier = moduleTierIndex;
690  taskScheduler.notifyTaskCompleted(task);
691  checkForTierCompleted(currentTier);
692  }
693  }
694 
700  void addStreamedFiles(List<Long> fileObjIds) {
701  if (!isCancelled() && ingestModuleTiers.get(moduleTierIndex).hasFileIngestModules()) {
702  if (jobState.equals(IngestJobState.ACCEPTING_STREAMED_CONTENT_AND_ANALYZING)) {
703  taskScheduler.scheduleStreamedFileIngestTasks(this, fileObjIds);
704  } else {
705  logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + jobState.toString() + " not supported");
706  }
707  }
708  }
709 
721  void addFiles(List<AbstractFile> files) {
722  if (!isCancelled() && ingestModuleTiers.get(moduleTierIndex).hasFileIngestModules()) {
723  if (jobState.equals(IngestJobState.ACCEPTING_STREAMED_CONTENT_AND_ANALYZING) || jobState.equals(IngestJobState.ANALYZING)) {
724  taskScheduler.scheduleHighPriorityFileIngestTasks(this, files);
725  } else {
726  logErrorMessage(Level.SEVERE, "Adding files to job during stage " + jobState.toString() + " not supported");
727  }
728  }
729  }
730 
741  void addDataArtifacts(List<DataArtifact> artifacts) {
742  if (!isCancelled() && ingestModuleTiers.get(moduleTierIndex).hasDataArtifactIngestModules()) {
743  switch (jobState) {
744  case ACCEPTING_STREAMED_CONTENT_AND_ANALYZING:
745  case ANALYZING:
746  taskScheduler.scheduleDataArtifactIngestTasks(this, artifacts);
747  break;
748  case PIPELINES_SHUTTING_DOWN:
749  /*
750  * Don't log an error if there is an attempt to add an
751  * data artifact ingest task in a pipeline shut down
752  * state. This is a work around for dealing with data
753  * artifacts generated by a final keyword search carried out
754  * during ingest module shut down by simply ignoring them.
755  * (Currently these are credit card accounts generated by
756  * keyword search). Other ideas were to add
757  * a startShutDown() phase to the ingest module
758  * life cycle (complicated), or to add a flag
759  * to keyword hit processing to suppress posting the keyword
760  * hit analysis results / data artifacts to the blackboard during a final
761  * search (API changes required to allow firing of the event
762  * to make any GUI refresh).
763  */
764  break;
765  default:
766  logErrorMessage(Level.SEVERE, "Attempt to add data artifacts to job during stage " + jobState.toString() + " not supported");
767  break;
768  }
769  }
770  }
771 
782  void addAnalysisResults(List<AnalysisResult> results) {
783  if (!isCancelled() && ingestModuleTiers.get(moduleTierIndex).hasAnalysisResultIngestModules()) {
784  switch (jobState) {
785  case ACCEPTING_STREAMED_CONTENT_AND_ANALYZING:
786  case ANALYZING:
787  taskScheduler.scheduleAnalysisResultIngestTasks(this, results);
788  break;
789  case PIPELINES_SHUTTING_DOWN:
790  /*
791  * Don't log an error if there is an attempt to add an
792  * analysis result ingest task in a pipeline shut down
793  * state. This is a work around for dealing with analysis
794  * results generated by a final keyword search carried out
795  * during ingest module shut down by simply ignoring them.
796  * Other ideas were to add a startShutDown() phase to the
797  * ingest module life cycle (complicated), or to add a flag
798  * to keyword hit processing to suppress posting the keyword
799  * hit analysis results to the blackboard during a final
800  * search (API changes required to allow firing of the event
801  * to make any GUI refresh).
802  */
803  break;
804  default:
805  logErrorMessage(Level.SEVERE, "Attempt to add analysis results to job during stage " + jobState.toString() + " not supported");
806  }
807  }
808  }
809 
813  private void shutDownCurrentTier() {
814  // Note that this method is only called while holding the tierTransitionLock, so moduleTierIndex can not change
815  // during execution.
816  if (moduleTierIndex >= ingestModuleTiers.size()) {
817  logErrorMessage(Level.SEVERE, "shutDownCurrentTier called with out-of-bounds moduleTierIndex (" + moduleTierIndex + ")");
818  return;
819  }
820  logInfoMessage(String.format("Finished all ingest tasks for tier %s of ingest job", moduleTierIndex)); //NON-NLS
821  jobState = IngestJobExecutor.IngestJobState.PIPELINES_SHUTTING_DOWN;
822  IngestModuleTier moduleTier = ingestModuleTiers.get(moduleTierIndex);
823 
824  Optional<DataSourceIngestPipeline> dataSourcePipeline = moduleTier.getDataSourceIngestPipeline();
825  if (dataSourcePipeline.isPresent()) {
826  shutDownIngestModulePipeline(dataSourcePipeline.get());
827  }
828 
829  for (FileIngestPipeline pipeline : moduleTier.getFileIngestPipelines()) {
830  shutDownIngestModulePipeline(pipeline);
831  }
832 
833  Optional<DataArtifactIngestPipeline> dataArtifactPipeline = moduleTier.getDataArtifactIngestPipeline();
834  if (dataArtifactPipeline.isPresent()) {
835  shutDownIngestModulePipeline(dataArtifactPipeline.get());
836  }
837 
838  Optional<AnalysisResultIngestPipeline> analysisResultPipeline = moduleTier.getAnalysisResultIngestPipeline();
839  if (analysisResultPipeline.isPresent()) {
840  shutDownIngestModulePipeline(analysisResultPipeline.get());
841  }
842 
843  finishAllProgressBars();
844  }
845 
851  private <T extends IngestTask> void shutDownIngestModulePipeline(IngestPipeline<T> pipeline) {
852  if (pipeline.isRunning()) {
853  List<IngestModuleError> errors = new ArrayList<>();
854  errors.addAll(pipeline.shutDown());
855  if (!errors.isEmpty()) {
856  logIngestModuleErrors(errors);
857  }
858  }
859  }
860 
864  private void shutDown() {
865  logInfoMessage("Finished all ingest tasks for ingest job"); //NON-NLS
866  try {
867  if (casDbingestJobInfo != null) {
868  if (jobCancelled) {
869  casDbingestJobInfo.setIngestJobStatus(IngestJobStatusType.CANCELLED);
870  } else {
871  casDbingestJobInfo.setIngestJobStatus(IngestJobStatusType.COMPLETED);
872  }
873  casDbingestJobInfo.setEndDateTime(new Date());
874  }
875  } catch (TskCoreException ex) {
876  logErrorMessage(Level.WARNING, "Failed to set job end date in case database", ex);
877  }
878 
879  ingestJob.notifyIngestPipelinesShutDown();
880  }
881 
887  DataSourceIngestPipeline.DataSourcePipelineModule getCurrentDataSourceIngestModule() {
888  Optional<DataSourceIngestPipeline> pipeline = getCurrentDataSourceIngestPipelines();
889  if (pipeline.isPresent()) {
890  return (DataSourceIngestPipeline.DataSourcePipelineModule) pipeline.get().getCurrentlyRunningModule();
891  } else {
892  return null;
893  }
894  }
895 
909  void cancelCurrentDataSourceIngestModule() {
910  currentDataSourceIngestModuleCancelled = true;
911  }
912 
929  boolean currentDataSourceIngestModuleIsCancelled() {
930  return currentDataSourceIngestModuleCancelled;
931  }
932 
950  void currentDataSourceIngestModuleCancellationCompleted(String moduleDisplayName) {
951  currentDataSourceIngestModuleCancelled = false;
952  cancelledDataSourceIngestModules.add(moduleDisplayName);
953  if (usingNetBeansGUI && !jobCancelled) {
954  try {
955  // use invokeAndWait to ensure synchronous behavior.
956  // See JIRA-8298 for more information.
957  SwingUtilities.invokeAndWait(() -> {
966  dataSourceIngestProgressBar.finish();
967  dataSourceIngestProgressBar = null;
968  startDataSourceIngestProgressBar();
969  });
970  } catch (InvocationTargetException | InterruptedException ex) {
971  logger.log(Level.WARNING, "Cancellation worker cancelled.", ex);
972  }
973  }
974  }
975 
986  void cancel(IngestJob.CancellationReason reason) {
987  jobCancelled = true;
988  cancellationReason = reason;
989  displayCancellingProgressMessages();
990  taskScheduler.cancelPendingFileTasksForIngestJob(getIngestJobId());
991  synchronized (threadRegistrationLock) {
992  for (Thread thread : pausedIngestThreads) {
993  thread.interrupt();
994  }
995  pausedIngestThreads.clear();
996  }
997  checkForTierCompleted(moduleTierIndex);
998  }
999 
1007  boolean isCancelled() {
1008  return jobCancelled;
1009  }
1010 
1016  IngestJob.CancellationReason getCancellationReason() {
1017  return cancellationReason;
1018  }
1019 
1027  private void startDataSourceIngestProgressBar() {
1028  if (usingNetBeansGUI) {
1029  SwingUtilities.invokeLater(() -> {
1030  dataSourceIngestProgressBar = ProgressHandle.createHandle(NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataSourceIngest.initialDisplayName", ingestJob.getDataSource().getName()), new Cancellable() {
1031  @Override
1032  public boolean cancel() {
1033  /*
1034  * The user has already pressed the cancel button on
1035  * this progress bar, and the OK button of a cancelation
1036  * confirmation dialog supplied by NetBeans. Find out
1037  * whether the user wants to cancel only the currently
1038  * executing data source ingest module or the entire
1039  * ingest job.
1040  */
1041  DataSourceIngestCancellationPanel panel = new DataSourceIngestCancellationPanel();
1042  String dialogTitle = NbBundle.getMessage(IngestJobExecutor.this.getClass(), "IngestJob.cancellationDialog.title");
1043  JOptionPane.showConfirmDialog(WindowManager.getDefault().getMainWindow(), panel, dialogTitle, JOptionPane.OK_OPTION, JOptionPane.PLAIN_MESSAGE);
1044  if (panel.cancelAllDataSourceIngestModules()) {
1045  new Thread(() -> {
1046  IngestJobExecutor.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
1047  }).start();
1048  } else {
1049  new Thread(() -> {
1050  IngestJobExecutor.this.cancelCurrentDataSourceIngestModule();
1051  }).start();
1052  }
1053  return true;
1054  }
1055  });
1056  dataSourceIngestProgressBar.start();
1057  dataSourceIngestProgressBar.switchToIndeterminate();
1058  });
1059  }
1060  }
1061 
1068  void changeDataSourceIngestProgressBarTitle(String title) {
1069  if (usingNetBeansGUI && !jobCancelled) {
1070  SwingUtilities.invokeLater(() -> {
1071  if (dataSourceIngestProgressBar != null) {
1072  dataSourceIngestProgressBar.setDisplayName(title);
1073  }
1074  });
1075  }
1076  }
1077 
1082  void switchDataSourceIngestProgressBarToIndeterminate() {
1083  if (usingNetBeansGUI && !jobCancelled) {
1084  SwingUtilities.invokeLater(() -> {
1085  if (dataSourceIngestProgressBar != null) {
1086  dataSourceIngestProgressBar.switchToIndeterminate();
1087  }
1088  });
1089  }
1090  }
1091 
1098  void switchDataSourceIngestProgressBarToDeterminate(int workUnitsToDo) {
1099  if (usingNetBeansGUI && !jobCancelled) {
1100  SwingUtilities.invokeLater(() -> {
1101  if (dataSourceIngestProgressBar != null) {
1102  dataSourceIngestProgressBar.switchToDeterminate(workUnitsToDo);
1103  }
1104  });
1105  }
1106  }
1107 
1125  void updateDataSourceIngestProgressBar(String newText, int workUnitsDone) {
1126  if (usingNetBeansGUI && !jobCancelled) {
1127  SwingUtilities.invokeLater(() -> {
1128  if (dataSourceIngestProgressBar != null) {
1129  dataSourceIngestProgressBar.progress(newText, workUnitsDone);
1130  }
1131  });
1132  }
1133  }
1134 
1141  void updateDataSourceIngestProgressBarText(String newText) {
1142  if (usingNetBeansGUI && !jobCancelled) {
1143  SwingUtilities.invokeLater(() -> {
1144  if (dataSourceIngestProgressBar != null) {
1145  dataSourceIngestProgressBar.progress(newText);
1146  }
1147  });
1148  }
1149  }
1150 
1164  void updateDataSourceIngestProgressBar(int workUnitsDone) {
1165  if (usingNetBeansGUI && !jobCancelled) {
1166  SwingUtilities.invokeLater(() -> {
1167  if (dataSourceIngestProgressBar != null) {
1168  dataSourceIngestProgressBar.progress("", workUnitsDone);
1169  }
1170  });
1171  }
1172  }
1173 
1185  private void startFileIngestProgressBar(boolean useDeterminateMode) {
1186  if (usingNetBeansGUI) {
1187  SwingUtilities.invokeLater(() -> {
1188  fileIngestProgressBar = ProgressHandle.createHandle(NbBundle.getMessage(getClass(), "IngestJob.progress.fileIngest.displayName", ingestJob.getDataSource().getName()), new Cancellable() {
1189  @Override
1190  public boolean cancel() {
1191  new Thread(() -> {
1192  IngestJobExecutor.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
1193  }).start();
1194  return true;
1195  }
1196  });
1197  if (useDeterminateMode) {
1198  fileIngestProgressBar.start((int) estimatedFilesToProcess);
1199  } else {
1200  fileIngestProgressBar.start();
1201  }
1202  });
1203  }
1204  }
1205 
1211  private void switchFileIngestProgressBarToDeterminate() {
1212  if (usingNetBeansGUI) {
1213  SwingUtilities.invokeLater(() -> {
1214  if (fileIngestProgressBar != null) {
1215  fileIngestProgressBar.switchToDeterminate((int) estimatedFilesToProcess);
1216  }
1217  });
1218  }
1219  }
1220 
1228  private void updateFileProgressBarForFileTaskStarted(String fileName) {
1229  if (usingNetBeansGUI && !jobCancelled) {
1230  SwingUtilities.invokeLater(() -> {
1231  /*
1232  * If processedFiles exceeds estimatedFilesToProcess, i.e., the
1233  * max work units set for the progress bar, the progress bar
1234  * will go into an infinite loop throwing
1235  * IllegalArgumentExceptions in the EDT (NetBeans bug). Also, a
1236  * check-then-act race condition needs to be avoided here. This
1237  * can be done without guarding processedFiles and
1238  * estimatedFilesToProcess with the same lock because
1239  * estimatedFilesToProcess does not change after it is used to
1240  * switch the progress bar to determinate mode.
1241  */
1242  long processedFilesCapture = processedFiles;
1243  if (processedFilesCapture <= estimatedFilesToProcess) {
1244  fileIngestProgressBar.progress(fileName, (int) processedFilesCapture);
1245  } else {
1246  fileIngestProgressBar.progress(fileName, (int) estimatedFilesToProcess);
1247  }
1248  filesInProgress.add(fileName);
1249  });
1250  }
1251  }
1252 
1261  private void updateFileProgressBarForFileTaskCompleted(String completedFileName) {
1262  if (usingNetBeansGUI && !jobCancelled) {
1263  SwingUtilities.invokeLater(() -> {
1264  filesInProgress.remove(completedFileName);
1265  /*
1266  * Display the name of another file in progress, or the empty
1267  * string if there are none.
1268  */
1269  if (filesInProgress.size() > 0) {
1270  fileIngestProgressBar.progress(filesInProgress.get(0));
1271  } else {
1272  fileIngestProgressBar.progress(""); // NON-NLS
1273  }
1274  });
1275  }
1276  }
1277 
1284  private void startDataArtifactIngestProgressBar() {
1285  if (usingNetBeansGUI) {
1286  SwingUtilities.invokeLater(() -> {
1287  artifactIngestProgressBar = ProgressHandle.createHandle(NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataArtifactIngest.displayName", ingestJob.getDataSource().getName()), new Cancellable() {
1288  @Override
1289  public boolean cancel() {
1290  new Thread(() -> {
1291  IngestJobExecutor.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
1292  }).start();
1293  return true;
1294  }
1295  });
1296  artifactIngestProgressBar.start();
1297  artifactIngestProgressBar.switchToIndeterminate();
1298  });
1299  }
1300  }
1301 
1308  @NbBundle.Messages({
1309  "# {0} - data source name",
1310  "IngestJob_progress_analysisResultIngest_displayName=Analyzing analysis results from {0}"
1311  })
1312  private void startAnalysisResultIngestProgressBar() {
1313  if (usingNetBeansGUI) {
1314  SwingUtilities.invokeLater(() -> {
1315  resultIngestProgressBar = ProgressHandle.createHandle(Bundle.IngestJob_progress_analysisResultIngest_displayName(ingestJob.getDataSource().getName()), new Cancellable() {
1316  @Override
1317  public boolean cancel() {
1318  new Thread(() -> {
1319  IngestJobExecutor.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
1320  }).start();
1321  return true;
1322  }
1323  });
1324  resultIngestProgressBar.start();
1325  resultIngestProgressBar.switchToIndeterminate();
1326  });
1327  }
1328  }
1329 
1334  private void displayCancellingProgressMessages() {
1335  if (usingNetBeansGUI) {
1336  SwingUtilities.invokeLater(() -> {
1337  if (dataSourceIngestProgressBar != null) {
1338  dataSourceIngestProgressBar.setDisplayName(NbBundle.getMessage(getClass(), "IngestJob.progress.dataSourceIngest.initialDisplayName", ingestJob.getDataSource().getName()));
1339  dataSourceIngestProgressBar.progress(NbBundle.getMessage(getClass(), "IngestJob.progress.cancelling"));
1340  }
1341  if (fileIngestProgressBar != null) {
1342  fileIngestProgressBar.setDisplayName(NbBundle.getMessage(getClass(), "IngestJob.progress.fileIngest.displayName", ingestJob.getDataSource().getName()));
1343  fileIngestProgressBar.progress(NbBundle.getMessage(getClass(), "IngestJob.progress.cancelling"));
1344  }
1345  if (artifactIngestProgressBar != null) {
1346  artifactIngestProgressBar.setDisplayName(NbBundle.getMessage(getClass(), "IngestJob.progress.dataArtifactIngest.displayName", ingestJob.getDataSource().getName()));
1347  artifactIngestProgressBar.progress(NbBundle.getMessage(getClass(), "IngestJob.progress.cancelling"));
1348  }
1349  if (resultIngestProgressBar != null) {
1350  resultIngestProgressBar.setDisplayName(Bundle.IngestJob_progress_analysisResultIngest_displayName(ingestJob.getDataSource().getName()));
1351  resultIngestProgressBar.progress(NbBundle.getMessage(getClass(), "IngestJob.progress.cancelling"));
1352  }
1353  });
1354  }
1355  }
1356 
1360  private void finishAllProgressBars() {
1361  if (usingNetBeansGUI) {
1362  SwingUtilities.invokeLater(() -> {
1363  if (dataSourceIngestProgressBar != null) {
1364  dataSourceIngestProgressBar.finish();
1365  dataSourceIngestProgressBar = null;
1366  }
1367 
1368  if (fileIngestProgressBar != null) {
1369  fileIngestProgressBar.finish();
1370  fileIngestProgressBar = null;
1371  }
1372 
1373  if (artifactIngestProgressBar != null) {
1374  artifactIngestProgressBar.finish();
1375  artifactIngestProgressBar = null;
1376  }
1377 
1378  if (resultIngestProgressBar != null) {
1379  resultIngestProgressBar.finish();
1380  resultIngestProgressBar = null;
1381  }
1382  });
1383  }
1384  }
1385 
1392  private void logInfoMessage(String message) {
1393  logger.log(Level.INFO, String.format("%s (data source = %s, data source object ID = %d, job ID = %d)", message, ingestJob.getDataSource().getName(), ingestJob.getDataSource().getId(), getIngestJobId())); //NON-NLS
1394  }
1395 
1404  private void logErrorMessage(Level level, String message, Throwable throwable) {
1405  logger.log(level, String.format("%s (data source = %s, data source object ID = %d, ingest job ID = %d)", message, ingestJob.getDataSource().getName(), ingestJob.getDataSource().getId(), getIngestJobId()), throwable); //NON-NLS
1406  }
1407 
1415  private void logErrorMessage(Level level, String message) {
1416  logger.log(level, String.format("%s (data source = %s, data source object ID= %d, ingest job ID %d)", message, ingestJob.getDataSource().getName(), ingestJob.getDataSource().getId(), getIngestJobId())); //NON-NLS
1417  }
1418 
1424  private void logIngestModuleErrors(List<IngestModuleError> errors) {
1425  for (IngestModuleError error : errors) {
1426  logErrorMessage(Level.SEVERE, String.format("%s experienced an error during analysis", error.getModuleDisplayName()), error.getThrowable()); //NON-NLS
1427  }
1428  }
1429 
1436  private void logIngestModuleErrors(List<IngestModuleError> errors, AbstractFile file) {
1437  for (IngestModuleError error : errors) {
1438  logErrorMessage(Level.SEVERE, String.format("%s experienced an error during analysis while processing file %s (object ID = %d)", error.getModuleDisplayName(), file.getName(), file.getId()), error.getThrowable()); //NON-NLS
1439  }
1440  }
1441 
1447  Optional<List<FileIngestPipeline>> getCurrentFileIngestPipelines() {
1448  // Make a local copy in case the tier increments
1449  int currentModuleTierIndex = moduleTierIndex;
1450  if (currentModuleTierIndex < ingestModuleTiers.size()) {
1451  return Optional.of(ingestModuleTiers.get(currentModuleTierIndex).getFileIngestPipelines());
1452  }
1453  return Optional.empty();
1454  }
1455 
1461  Optional<DataSourceIngestPipeline> getCurrentDataSourceIngestPipelines() {
1462  // Make a local copy in case the tier increments
1463  int currentModuleTierIndex = moduleTierIndex;
1464  if (currentModuleTierIndex < ingestModuleTiers.size()) {
1465  return ingestModuleTiers.get(currentModuleTierIndex).getDataSourceIngestPipeline();
1466  }
1467  return Optional.empty();
1468  }
1469 
1470 
1480  @Messages({
1481  "IngestJobExecutor_progress_snapshot_currentTier_shutDown_modifier=shut down",
1482  "# {0} - tier number",
1483  "# {1} - job state modifer",
1484  "IngestJobExecutor_progress_snapshot_currentTier=Tier {0} {1}"
1485  })
1486  IngestJobProgressSnapshot getIngestJobProgressSnapshot(boolean includeIngestTasksSnapshot) {
1487  /*
1488  * Determine whether file ingest is running at the time of this snapshot
1489  * and determine the earliest file ingest module pipeline start time, if
1490  * file ingest was started at all.
1491  */
1492  boolean fileIngestRunning = false;
1493  Date fileIngestStartTime = null;
1494  Optional<List<FileIngestPipeline>> fileIngestPipelines = getCurrentFileIngestPipelines();
1495  if (!fileIngestPipelines.isPresent()) {
1496  // If there are no currently running pipelines, use the original set.
1497  fileIngestPipelines = Optional.of(ingestModuleTiers.get(0).getFileIngestPipelines());
1498  }
1499  for (FileIngestPipeline pipeline : fileIngestPipelines.get()) {
1500  if (pipeline.isRunning()) {
1501  fileIngestRunning = true;
1502  }
1503  Date pipelineStartTime = pipeline.getStartTime();
1504  if (pipelineStartTime != null && (fileIngestStartTime == null || pipelineStartTime.before(fileIngestStartTime))) {
1505  fileIngestStartTime = pipelineStartTime;
1506  }
1507  }
1508 
1509  long processedFilesCount = 0;
1510  long estimatedFilesToProcessCount = 0;
1511  long snapShotTime = new Date().getTime();
1512  IngestTasksScheduler.IngestTasksSnapshot tasksSnapshot = null;
1513  if (includeIngestTasksSnapshot) {
1514  processedFilesCount = processedFiles;
1515  estimatedFilesToProcessCount = estimatedFilesToProcess;
1516  snapShotTime = new Date().getTime();
1517  tasksSnapshot = taskScheduler.getTasksSnapshotForJob(getIngestJobId());
1518  }
1519  return new IngestJobProgressSnapshot(
1520  ingestJob.getDataSource().getName(),
1521  getIngestJobId(),
1522  createTime,
1523  Bundle.IngestJobExecutor_progress_snapshot_currentTier(moduleTierIndex, jobState.equals(IngestJobState.PIPELINES_SHUTTING_DOWN) ? Bundle.IngestJobExecutor_progress_snapshot_currentTier_shutDown_modifier() : ""),
1524  getCurrentDataSourceIngestModule(),
1525  fileIngestRunning,
1526  fileIngestStartTime,
1527  jobCancelled,
1528  cancellationReason,
1529  cancelledDataSourceIngestModules,
1530  processedFilesCount,
1531  estimatedFilesToProcessCount,
1532  snapShotTime,
1533  tasksSnapshot);
1534  }
1535 
1542  void registerPausedIngestThread(Thread thread) {
1543  synchronized (threadRegistrationLock) {
1544  pausedIngestThreads.add(thread);
1545  }
1546  }
1547 
1554  void unregisterPausedIngestThread(Thread thread) {
1555  synchronized (threadRegistrationLock) {
1556  pausedIngestThreads.remove(thread);
1557  }
1558  }
1559 
1560 }
List< IngestModuleTemplate > getEnabledIngestModuleTemplates()
synchronized static Logger getLogger(String name)
Definition: Logger.java:124

Copyright © 2012-2022 Basis Technology. Generated on: Tue Feb 6 2024
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.