19 package org.sleuthkit.autopsy.ingest;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.Date;
24 import java.util.HashSet;
25 import java.util.LinkedHashMap;
26 import java.util.List;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.atomic.AtomicLong;
32 import java.util.logging.Level;
33 import java.util.regex.Matcher;
34 import java.util.regex.Pattern;
35 import java.util.stream.Stream;
36 import javax.annotation.concurrent.GuardedBy;
37 import javax.swing.JOptionPane;
38 import org.netbeans.api.progress.ProgressHandle;
39 import org.openide.util.Cancellable;
40 import org.openide.util.NbBundle;
41 import org.openide.windows.WindowManager;
50 import org.
sleuthkit.datamodel.IngestJobInfo.IngestJobStatusType;
52 import org.
sleuthkit.datamodel.IngestModuleInfo.IngestModuleType;
65 final class IngestJobPipeline {
67 private static final String AUTOPSY_MODULE_PREFIX =
"org.sleuthkit.autopsy";
69 private static final Logger logger = Logger.getLogger(IngestJobPipeline.class.getName());
76 private static final Pattern JYTHON_MODULE_REGEX = Pattern.compile(
"org\\.python\\.proxies\\.(.+?)\\$(.+?)(\\$[0-9]*)?$");
89 private final IngestJob parentJob;
90 private static final AtomicLong nextPipelineId =
new AtomicLong(0L);
91 private final long pipelineId;
92 private final IngestJobSettings settings;
93 private DataSource dataSource;
94 private final List<AbstractFile> files;
138 private final Object stageTransitionLock =
new Object();
145 private DataSourceIngestPipeline firstStageDataSourceIngestPipeline;
146 private DataSourceIngestPipeline secondStageDataSourceIngestPipeline;
147 private volatile DataSourceIngestPipeline currentDataSourceIngestPipeline;
158 private final LinkedBlockingQueue<FileIngestPipeline> fileIngestPipelinesQueue =
new LinkedBlockingQueue<>();
159 private final List<FileIngestPipeline> fileIngestPipelines =
new ArrayList<>();
164 private DataArtifactIngestPipeline artifactIngestPipeline;
177 private volatile boolean currentDataSourceIngestModuleCancelled;
178 private final List<String> cancelledDataSourceIngestModules =
new CopyOnWriteArrayList<>();
179 private volatile boolean cancelled;
189 private static final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance();
196 private final boolean doUI;
197 private final Object dataSourceIngestProgressLock =
new Object();
198 private ProgressHandle dataSourceIngestProgressBar;
199 private final Object fileIngestProgressLock =
new Object();
200 private final List<String> filesInProgress =
new ArrayList<>();
201 private long estimatedFilesToProcess;
202 private long processedFiles;
203 private ProgressHandle fileIngestProgressBar;
204 private final Object artifactIngestProgressLock =
new Object();
205 private ProgressHandle artifactIngestProgressBar;
211 private volatile IngestJobInfo ingestJobInfo;
216 private final long createTime;
224 private final Object threadRegistrationLock =
new Object();
225 @GuardedBy(
"threadRegistrationLock")
226 private final Set<Thread> pausedIngestThreads = new HashSet<>();
241 this(parentJob, dataSource, Collections.emptyList(), settings);
259 IngestJobPipeline(
IngestJob parentJob, Content dataSource, List<AbstractFile> files,
IngestJobSettings settings)
throws InterruptedException {
260 if (!(dataSource instanceof DataSource)) {
261 throw new IllegalArgumentException(
"Passed dataSource that does not implement the DataSource interface");
263 this.parentJob = parentJob;
264 pipelineId = IngestJobPipeline.nextPipelineId.getAndIncrement();
265 this.dataSource = (DataSource) dataSource;
266 this.files =
new ArrayList<>();
267 this.files.addAll(files);
268 this.settings = settings;
269 doUI = RuntimeProperties.runningWithGUI();
270 createTime =
new Date().getTime();
272 createIngestModulePipelines();
286 private static void addToIngestPipelineTemplate(
final List<IngestModuleTemplate> sortedModules,
final Map<String, IngestModuleTemplate> javaModules,
final Map<String, IngestModuleTemplate> jythonModules) {
287 final List<IngestModuleTemplate> autopsyModules =
new ArrayList<>();
288 final List<IngestModuleTemplate> thirdPartyModules =
new ArrayList<>();
289 Stream.concat(javaModules.entrySet().stream(), jythonModules.entrySet().stream()).forEach((templateEntry) -> {
290 if (templateEntry.getKey().startsWith(AUTOPSY_MODULE_PREFIX)) {
291 autopsyModules.add(templateEntry.getValue());
293 thirdPartyModules.add(templateEntry.getValue());
296 sortedModules.addAll(autopsyModules);
297 sortedModules.addAll(thirdPartyModules);
311 private static String getModuleNameFromJythonClassName(String className) {
312 Matcher m = JYTHON_MODULE_REGEX.matcher(className);
314 return String.format(
"%s.%s", m.group(1), m.group(2));
330 private static void addModuleTemplateToSortingMap(Map<String, IngestModuleTemplate> mapping, Map<String, IngestModuleTemplate> jythonMapping, IngestModuleTemplate
template) {
331 String className =
template.getModuleFactory().getClass().getCanonicalName();
332 String jythonName = getModuleNameFromJythonClassName(className);
333 if (jythonName != null) {
334 jythonMapping.put(jythonName,
template);
336 mapping.put(className,
template);
346 private void createIngestModulePipelines() throws InterruptedException {
350 List<IngestModuleTemplate> enabledTemplates = settings.getEnabledIngestModuleTemplates();
358 Map<String, IngestModuleTemplate> javaDataSourceModuleTemplates =
new LinkedHashMap<>();
359 Map<String, IngestModuleTemplate> jythonDataSourceModuleTemplates =
new LinkedHashMap<>();
360 Map<String, IngestModuleTemplate> javaFileModuleTemplates =
new LinkedHashMap<>();
361 Map<String, IngestModuleTemplate> jythonFileModuleTemplates =
new LinkedHashMap<>();
362 Map<String, IngestModuleTemplate> javaArtifactModuleTemplates =
new LinkedHashMap<>();
363 Map<String, IngestModuleTemplate> jythonArtifactModuleTemplates =
new LinkedHashMap<>();
364 for (IngestModuleTemplate
template : enabledTemplates) {
365 if (
template.isDataSourceIngestModuleTemplate()) {
366 addModuleTemplateToSortingMap(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates,
template);
368 if (
template.isFileIngestModuleTemplate()) {
369 addModuleTemplateToSortingMap(javaFileModuleTemplates, jythonFileModuleTemplates,
template);
381 IngestPipelinesConfiguration pipelineConfig = IngestPipelinesConfiguration.getInstance();
382 List<IngestModuleTemplate> firstStageDataSourcePipelineTemplate = createIngestPipelineTemplate(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageOneDataSourceIngestPipelineConfig());
383 List<IngestModuleTemplate> secondStageDataSourcePipelineTemplate = createIngestPipelineTemplate(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageTwoDataSourceIngestPipelineConfig());
384 List<IngestModuleTemplate> filePipelineTemplate = createIngestPipelineTemplate(javaFileModuleTemplates, jythonFileModuleTemplates, pipelineConfig.getFileIngestPipelineConfig());
385 List<IngestModuleTemplate> artifactPipelineTemplate =
new ArrayList<>();
395 addToIngestPipelineTemplate(firstStageDataSourcePipelineTemplate, javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates);
396 addToIngestPipelineTemplate(filePipelineTemplate, javaFileModuleTemplates, jythonFileModuleTemplates);
397 addToIngestPipelineTemplate(artifactPipelineTemplate, javaArtifactModuleTemplates, jythonArtifactModuleTemplates);
403 firstStageDataSourceIngestPipeline =
new DataSourceIngestPipeline(
this, firstStageDataSourcePipelineTemplate);
404 secondStageDataSourceIngestPipeline =
new DataSourceIngestPipeline(
this, secondStageDataSourcePipelineTemplate);
405 int numberOfFileIngestThreads = IngestManager.getInstance().getNumberOfFileIngestThreads();
406 for (
int i = 0; i < numberOfFileIngestThreads; ++i) {
407 FileIngestPipeline pipeline =
new FileIngestPipeline(
this, filePipelineTemplate);
408 fileIngestPipelinesQueue.put(pipeline);
409 fileIngestPipelines.add(pipeline);
411 artifactIngestPipeline =
new DataArtifactIngestPipeline(
this, artifactPipelineTemplate);
429 private static List<IngestModuleTemplate> createIngestPipelineTemplate(Map<String, IngestModuleTemplate> javaIngestModuleTemplates, Map<String, IngestModuleTemplate> jythonIngestModuleTemplates, List<String> pipelineConfig) {
430 List<IngestModuleTemplate> pipelineTemplate =
new ArrayList<>();
431 for (String moduleClassName : pipelineConfig) {
432 if (javaIngestModuleTemplates.containsKey(moduleClassName)) {
433 pipelineTemplate.add(javaIngestModuleTemplates.remove(moduleClassName));
434 }
else if (jythonIngestModuleTemplates.containsKey(moduleClassName)) {
435 pipelineTemplate.add(jythonIngestModuleTemplates.remove(moduleClassName));
438 return pipelineTemplate;
455 String getExecutionContext() {
456 return settings.getExecutionContext();
464 DataSource getDataSource() {
474 boolean shouldProcessUnallocatedSpace() {
475 return settings.getProcessUnallocatedSpace();
483 FilesSet getFileIngestFilter() {
484 return settings.getFileFilter();
493 boolean hasIngestModules() {
494 return hasFileIngestModules()
495 || hasFirstStageDataSourceIngestModules()
496 || hasSecondStageDataSourceIngestModules()
497 || hasDataArtifactIngestModules();
506 boolean hasDataSourceIngestModules() {
508 return hasSecondStageDataSourceIngestModules();
510 return hasFirstStageDataSourceIngestModules();
520 private boolean hasFirstStageDataSourceIngestModules() {
521 return (firstStageDataSourceIngestPipeline.isEmpty() ==
false);
530 private boolean hasSecondStageDataSourceIngestModules() {
531 return (secondStageDataSourceIngestPipeline.isEmpty() ==
false);
540 boolean hasFileIngestModules() {
541 if (!fileIngestPipelines.isEmpty()) {
542 return !fileIngestPipelines.get(0).isEmpty();
553 boolean hasDataArtifactIngestModules() {
554 return (artifactIngestPipeline.isEmpty() ==
false);
562 List<IngestModuleError> startUp() {
563 List<IngestModuleError> errors = startUpIngestModulePipelines();
564 if (errors.isEmpty()) {
565 recordIngestJobStartUpInfo();
566 if (hasFirstStageDataSourceIngestModules() || hasFileIngestModules() || hasDataArtifactIngestModules()) {
567 if (parentJob.getIngestMode() == IngestJob.Mode.STREAMING) {
568 startFirstStageInStreamingMode();
570 startFirstStageInBatchMode();
572 }
else if (hasSecondStageDataSourceIngestModules()) {
584 void recordIngestJobStartUpInfo() {
586 SleuthkitCase caseDb = Case.getCurrentCase().getSleuthkitCase();
587 List<IngestModuleInfo> ingestModuleInfoList =
new ArrayList<>();
588 for (IngestModuleTemplate module : settings.getEnabledIngestModuleTemplates()) {
589 IngestModuleType moduleType = getIngestModuleTemplateType(module);
590 IngestModuleInfo moduleInfo = caseDb.addIngestModule(module.getModuleName(), FactoryClassNameNormalizer.normalize(module.getModuleFactory().getClass().getCanonicalName()), moduleType, module.getModuleFactory().getModuleVersionNumber());
591 ingestModuleInfoList.add(moduleInfo);
593 ingestJobInfo = caseDb.addIngestJob(dataSource, NetworkUtils.getLocalHostName(), ingestModuleInfoList,
new Date(this.createTime),
new Date(0), IngestJobStatusType.STARTED,
"");
594 }
catch (TskCoreException ex) {
595 logErrorMessage(Level.SEVERE,
"Failed to add ingest job info to case database", ex);
607 private IngestModuleType getIngestModuleTemplateType(IngestModuleTemplate moduleTemplate) {
608 IngestModuleType type = null;
609 if (moduleTemplate.isDataSourceIngestModuleTemplate()) {
610 type = IngestModuleType.DATA_SOURCE_LEVEL;
612 if (moduleTemplate.isFileIngestModuleTemplate()) {
614 type = IngestModuleType.FILE_LEVEL;
616 type = IngestModuleType.MULTIPLE;
642 private List<IngestModuleError> startUpIngestModulePipelines() {
643 List<IngestModuleError> errors =
new ArrayList<>();
644 errors.addAll(startUpIngestModulePipeline(firstStageDataSourceIngestPipeline));
645 errors.addAll(startUpIngestModulePipeline(secondStageDataSourceIngestPipeline));
646 for (FileIngestPipeline pipeline : fileIngestPipelines) {
647 List<IngestModuleError> filePipelineErrors = startUpIngestModulePipeline(pipeline);
648 if (!filePipelineErrors.isEmpty()) {
653 errors.addAll(filePipelineErrors);
657 errors.addAll(startUpIngestModulePipeline(artifactIngestPipeline));
669 private List<IngestModuleError> startUpIngestModulePipeline(IngestTaskPipeline<?> pipeline) {
670 List<IngestModuleError> startUpErrors = pipeline.startUp();
671 if (!startUpErrors.isEmpty()) {
672 List<IngestModuleError> shutDownErrors = pipeline.shutDown();
673 if (!shutDownErrors.isEmpty()) {
674 logIngestModuleErrors(shutDownErrors);
677 return startUpErrors;
685 private void startFirstStageInBatchMode() {
686 synchronized (stageTransitionLock) {
687 logInfoMessage(
"Starting first stage analysis in batch mode");
696 if (hasFileIngestModules()) {
698 if (files.isEmpty()) {
699 filesToProcess = dataSource.accept(
new GetFilesCountVisitor());
701 filesToProcess = files.size();
703 synchronized (fileIngestProgressLock) {
704 estimatedFilesToProcess = filesToProcess;
713 if (hasFileIngestModules()) {
714 startFileIngestProgressBar();
716 if (hasFirstStageDataSourceIngestModules()) {
717 startDataSourceIngestProgressBar();
719 if (hasDataArtifactIngestModules()) {
720 startArtifactIngestProgressBar();
728 currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
739 if (!files.isEmpty() && hasFileIngestModules()) {
740 taskScheduler.scheduleFileIngestTasks(
this, files);
741 }
else if (hasFirstStageDataSourceIngestModules() || hasFileIngestModules() || hasDataArtifactIngestModules()) {
742 taskScheduler.scheduleIngestTasks(
this);
744 checkForStageCompleted();
754 private void startFirstStageInStreamingMode() {
755 synchronized (stageTransitionLock) {
756 logInfoMessage(
"Starting first stage analysis in streaming mode");
764 if (hasFileIngestModules()) {
771 startFileIngestProgressBar();
773 if (hasDataArtifactIngestModules()) {
774 startArtifactIngestProgressBar();
778 if (hasDataArtifactIngestModules()) {
786 taskScheduler.scheduleDataArtifactIngestTasks(
this);
795 void addStreamedDataSource() {
796 synchronized (stageTransitionLock) {
797 logInfoMessage(
"Starting full first stage analysis in streaming mode");
799 currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
801 if (hasFileIngestModules()) {
808 long filesToProcess = dataSource.accept(
new GetFilesCountVisitor());
809 synchronized (fileIngestProgressLock) {
810 estimatedFilesToProcess = filesToProcess;
811 if (doUI && fileIngestProgressBar != null) {
812 fileIngestProgressBar.switchToDeterminate((
int) estimatedFilesToProcess);
818 if (hasFirstStageDataSourceIngestModules()) {
819 startDataSourceIngestProgressBar();
823 currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
824 if (hasFirstStageDataSourceIngestModules()) {
825 IngestJobPipeline.taskScheduler.scheduleDataSourceIngestTask(
this);
835 checkForStageCompleted();
843 private void startSecondStage() {
844 synchronized (stageTransitionLock) {
845 if (hasSecondStageDataSourceIngestModules()) {
846 logInfoMessage(String.format(
"Starting second stage ingest task pipelines for %s (objID=%d, jobID=%d)", dataSource.getName(), dataSource.getId(), parentJob.getId()));
850 startDataSourceIngestProgressBar();
853 currentDataSourceIngestPipeline = secondStageDataSourceIngestPipeline;
854 taskScheduler.scheduleDataSourceIngestTask(
this);
862 private void startArtifactIngestProgressBar() {
864 synchronized (artifactIngestProgressLock) {
865 String displayName = NbBundle.getMessage(this.getClass(),
"IngestJob.progress.dataArtifactIngest.displayName", this.dataSource.getName());
866 artifactIngestProgressBar = ProgressHandle.createHandle(displayName,
new Cancellable() {
868 public boolean cancel() {
869 IngestJobPipeline.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
873 artifactIngestProgressBar.start();
874 artifactIngestProgressBar.switchToIndeterminate();
882 private void startDataSourceIngestProgressBar() {
884 synchronized (this.dataSourceIngestProgressLock) {
885 String displayName = NbBundle.getMessage(this.getClass(),
886 "IngestJob.progress.dataSourceIngest.initialDisplayName",
887 this.dataSource.getName());
888 this.dataSourceIngestProgressBar = ProgressHandle.createHandle(displayName,
new Cancellable() {
890 public boolean cancel() {
897 DataSourceIngestCancellationPanel panel =
new DataSourceIngestCancellationPanel();
898 String dialogTitle = NbBundle.getMessage(IngestJobPipeline.this.getClass(),
"IngestJob.cancellationDialog.title");
899 JOptionPane.showConfirmDialog(WindowManager.getDefault().getMainWindow(), panel, dialogTitle, JOptionPane.OK_OPTION, JOptionPane.PLAIN_MESSAGE);
900 if (panel.cancelAllDataSourceIngestModules()) {
901 IngestJobPipeline.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
903 IngestJobPipeline.this.cancelCurrentDataSourceIngestModule();
908 this.dataSourceIngestProgressBar.start();
909 this.dataSourceIngestProgressBar.switchToIndeterminate();
917 private void startFileIngestProgressBar() {
919 synchronized (this.fileIngestProgressLock) {
920 String displayName = NbBundle.getMessage(this.getClass(),
921 "IngestJob.progress.fileIngest.displayName",
922 this.dataSource.getName());
923 this.fileIngestProgressBar = ProgressHandle.createHandle(displayName,
new Cancellable() {
925 public boolean cancel() {
930 IngestJobPipeline.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
934 this.fileIngestProgressBar.start();
935 this.fileIngestProgressBar.switchToDeterminate((
int) this.estimatedFilesToProcess);
944 private void checkForStageCompleted() {
945 synchronized (stageTransitionLock) {
949 if (taskScheduler.currentTasksAreCompleted(
this)) {
966 private void finishFirstStage() {
967 synchronized (stageTransitionLock) {
968 logInfoMessage(
"Finished first stage analysis");
970 shutDownIngestModulePipeline(currentDataSourceIngestPipeline);
971 while (!fileIngestPipelinesQueue.isEmpty()) {
972 FileIngestPipeline pipeline = fileIngestPipelinesQueue.poll();
973 shutDownIngestModulePipeline(pipeline);
977 synchronized (dataSourceIngestProgressLock) {
978 if (dataSourceIngestProgressBar != null) {
979 dataSourceIngestProgressBar.finish();
980 dataSourceIngestProgressBar = null;
984 synchronized (fileIngestProgressLock) {
985 if (fileIngestProgressBar != null) {
986 fileIngestProgressBar.finish();
987 fileIngestProgressBar = null;
992 if (!cancelled && hasSecondStageDataSourceIngestModules()) {
1003 private void shutDown() {
1004 synchronized (stageTransitionLock) {
1005 logInfoMessage(
"Finished all tasks");
1008 shutDownIngestModulePipeline(currentDataSourceIngestPipeline);
1009 shutDownIngestModulePipeline(artifactIngestPipeline);
1012 synchronized (dataSourceIngestProgressLock) {
1013 if (dataSourceIngestProgressBar != null) {
1014 dataSourceIngestProgressBar.finish();
1015 dataSourceIngestProgressBar = null;
1019 synchronized (fileIngestProgressLock) {
1020 if (fileIngestProgressBar != null) {
1021 fileIngestProgressBar.finish();
1022 fileIngestProgressBar = null;
1026 synchronized (artifactIngestProgressLock) {
1027 if (artifactIngestProgressBar != null) {
1028 artifactIngestProgressBar.finish();
1029 artifactIngestProgressBar = null;
1034 if (ingestJobInfo != null) {
1037 ingestJobInfo.setIngestJobStatus(IngestJobStatusType.CANCELLED);
1038 }
catch (TskCoreException ex) {
1039 logErrorMessage(Level.WARNING,
"Failed to update ingest job status in case database", ex);
1043 ingestJobInfo.setIngestJobStatus(IngestJobStatusType.COMPLETED);
1044 }
catch (TskCoreException ex) {
1045 logErrorMessage(Level.WARNING,
"Failed to update ingest job status in case database", ex);
1049 ingestJobInfo.setEndDateTime(
new Date());
1050 }
catch (TskCoreException ex) {
1051 logErrorMessage(Level.WARNING,
"Failed to set job end date in case database", ex);
1056 parentJob.notifyIngestPipelineShutDown(
this);
1064 private <T extends IngestTask>
void shutDownIngestModulePipeline(IngestTaskPipeline<T> pipeline) {
1065 if (pipeline.isRunning()) {
1066 List<IngestModuleError> errors =
new ArrayList<>();
1067 errors.addAll(pipeline.shutDown());
1068 if (!errors.isEmpty()) {
1069 logIngestModuleErrors(errors);
1081 void execute(DataSourceIngestTask task) {
1083 if (!isCancelled()) {
1084 List<IngestModuleError> errors =
new ArrayList<>();
1085 errors.addAll(currentDataSourceIngestPipeline.executeTask(task));
1086 if (!errors.isEmpty()) {
1087 logIngestModuleErrors(errors);
1091 taskScheduler.notifyTaskCompleted(task);
1092 checkForStageCompleted();
1102 void execute(FileIngestTask task) {
1104 if (!isCancelled()) {
1105 FileIngestPipeline pipeline = fileIngestPipelinesQueue.take();
1106 if (!pipeline.isEmpty()) {
1114 file = task.getFile();
1115 }
catch (TskCoreException ex) {
1116 List<IngestModuleError> errors =
new ArrayList<>();
1117 errors.add(
new IngestModuleError(
"Ingest Pipeline", ex));
1118 logIngestModuleErrors(errors);
1119 fileIngestPipelinesQueue.put(pipeline);
1123 synchronized (fileIngestProgressLock) {
1126 if (processedFiles <= estimatedFilesToProcess) {
1127 fileIngestProgressBar.progress(file.getName(), (int) processedFiles);
1129 fileIngestProgressBar.progress(file.getName(), (int) estimatedFilesToProcess);
1131 filesInProgress.add(file.getName());
1138 List<IngestModuleError> errors =
new ArrayList<>();
1139 errors.addAll(pipeline.executeTask(task));
1140 if (!errors.isEmpty()) {
1141 logIngestModuleErrors(errors, file);
1144 if (doUI && !cancelled) {
1145 synchronized (fileIngestProgressLock) {
1150 filesInProgress.remove(file.getName());
1151 if (filesInProgress.size() > 0) {
1152 fileIngestProgressBar.progress(filesInProgress.get(0));
1154 fileIngestProgressBar.progress(
"");
1159 fileIngestPipelinesQueue.put(pipeline);
1161 }
catch (InterruptedException ex) {
1162 logger.log(Level.SEVERE, String.format(
"Unexpected interrupt of file ingest thread during execution of file ingest job (file obj ID = %d)", task.getFileId()), ex);
1163 Thread.currentThread().interrupt();
1165 taskScheduler.notifyTaskCompleted(task);
1166 checkForStageCompleted();
1176 void execute(DataArtifactIngestTask task) {
1178 if (!isCancelled() && !artifactIngestPipeline.isEmpty()) {
1179 List<IngestModuleError> errors =
new ArrayList<>();
1180 errors.addAll(artifactIngestPipeline.executeTask(task));
1181 if (!errors.isEmpty()) {
1182 logIngestModuleErrors(errors);
1186 taskScheduler.notifyTaskCompleted(task);
1187 checkForStageCompleted();
1197 void addStreamedFiles(List<Long> fileObjIds) {
1198 if (hasFileIngestModules()) {
1199 if (stage.equals(Stages.FIRST_STAGE_STREAMING)) {
1200 IngestJobPipeline.taskScheduler.scheduleStreamedFileIngestTasks(
this, fileObjIds);
1202 logErrorMessage(Level.SEVERE,
"Adding streaming files to job during stage " + stage.toString() +
" not supported");
1214 void addFiles(List<AbstractFile> files) {
1215 if (stage.equals(Stages.FIRST_STAGE_STREAMING)
1216 || stage.equals(Stages.FIRST_STAGE)) {
1217 taskScheduler.fastTrackFileIngestTasks(
this, files);
1219 logErrorMessage(Level.SEVERE,
"Adding streaming files to job during stage " + stage.toString() +
" not supported");
1229 checkForStageCompleted();
1238 void addDataArtifacts(List<DataArtifact> artifacts) {
1239 List<DataArtifact> artifactsToAnalyze =
new ArrayList<>(artifacts);
1240 if (stage.equals(Stages.FIRST_STAGE_STREAMING)
1241 || stage.equals(Stages.FIRST_STAGE)
1242 || stage.equals(Stages.SECOND_STAGE)) {
1243 taskScheduler.scheduleDataArtifactIngestTasks(
this, artifactsToAnalyze);
1245 logErrorMessage(Level.SEVERE,
"Adding streaming files to job during stage " + stage.toString() +
" not supported");
1255 checkForStageCompleted();
1264 void updateDataSourceIngestProgressBarDisplayName(String displayName) {
1265 if (this.doUI && !this.cancelled) {
1266 synchronized (this.dataSourceIngestProgressLock) {
1267 this.dataSourceIngestProgressBar.setDisplayName(displayName);
1280 void switchDataSourceIngestProgressBarToDeterminate(
int workUnits) {
1281 if (this.doUI && !this.cancelled) {
1282 synchronized (this.dataSourceIngestProgressLock) {
1283 if (null != this.dataSourceIngestProgressBar) {
1284 this.dataSourceIngestProgressBar.switchToDeterminate(workUnits);
1295 void switchDataSourceIngestProgressBarToIndeterminate() {
1296 if (this.doUI && !this.cancelled) {
1297 synchronized (this.dataSourceIngestProgressLock) {
1298 if (null != this.dataSourceIngestProgressBar) {
1299 this.dataSourceIngestProgressBar.switchToIndeterminate();
1311 void advanceDataSourceIngestProgressBar(
int workUnits) {
1312 if (doUI && !cancelled) {
1313 synchronized (dataSourceIngestProgressLock) {
1314 if (null != dataSourceIngestProgressBar) {
1315 dataSourceIngestProgressBar.progress(
"", workUnits);
1327 void advanceDataSourceIngestProgressBar(String currentTask) {
1328 if (doUI && !cancelled) {
1329 synchronized (dataSourceIngestProgressLock) {
1330 if (null != dataSourceIngestProgressBar) {
1331 dataSourceIngestProgressBar.progress(currentTask);
1345 void advanceDataSourceIngestProgressBar(String currentTask,
int workUnits) {
1346 if (this.doUI && !this.cancelled) {
1347 synchronized (this.fileIngestProgressLock) {
1348 this.dataSourceIngestProgressBar.progress(currentTask, workUnits);
1360 boolean currentDataSourceIngestModuleIsCancelled() {
1361 return this.currentDataSourceIngestModuleCancelled;
1370 void currentDataSourceIngestModuleCancellationCompleted(String moduleDisplayName) {
1371 this.currentDataSourceIngestModuleCancelled =
false;
1372 this.cancelledDataSourceIngestModules.add(moduleDisplayName);
1382 synchronized (this.dataSourceIngestProgressLock) {
1383 this.dataSourceIngestProgressBar.finish();
1384 this.dataSourceIngestProgressBar = null;
1385 this.startDataSourceIngestProgressBar();
1395 DataSourceIngestPipeline.DataSourcePipelineModule getCurrentDataSourceIngestModule() {
1396 if (currentDataSourceIngestPipeline != null) {
1397 return (DataSourceIngestPipeline.DataSourcePipelineModule) currentDataSourceIngestPipeline.getCurrentlyRunningModule();
1407 void cancelCurrentDataSourceIngestModule() {
1408 this.currentDataSourceIngestModuleCancelled =
true;
1417 void cancel(IngestJob.CancellationReason reason) {
1418 this.cancelled =
true;
1419 this.cancellationReason = reason;
1420 IngestJobPipeline.taskScheduler.cancelPendingFileTasksForIngestJob(
this);
1423 synchronized (this.dataSourceIngestProgressLock) {
1424 if (null != dataSourceIngestProgressBar) {
1425 dataSourceIngestProgressBar.setDisplayName(NbBundle.getMessage(
this.getClass(),
"IngestJob.progress.dataSourceIngest.initialDisplayName", this.dataSource.getName()));
1426 dataSourceIngestProgressBar.progress(NbBundle.getMessage(
this.getClass(),
"IngestJob.progress.cancelling"));
1430 synchronized (this.fileIngestProgressLock) {
1431 if (null != this.fileIngestProgressBar) {
1432 this.fileIngestProgressBar.setDisplayName(NbBundle.getMessage(
this.getClass(),
"IngestJob.progress.fileIngest.displayName", this.dataSource.getName()));
1433 this.fileIngestProgressBar.progress(NbBundle.getMessage(
this.getClass(),
"IngestJob.progress.cancelling"));
1438 synchronized (threadRegistrationLock) {
1439 for (Thread thread : pausedIngestThreads) {
1442 pausedIngestThreads.clear();
1446 checkForStageCompleted();
1455 boolean isCancelled() {
1456 return this.cancelled;
1464 IngestJob.CancellationReason getCancellationReason() {
1465 return this.cancellationReason;
1474 private void logInfoMessage(String message) {
1475 logger.log(Level.INFO, String.format(
"%s (data source = %s, objId = %d, pipeline id = %d, ingest job id = %d)", message,
this.dataSource.getName(), this.dataSource.getId(), pipelineId, ingestJobInfo.getIngestJobId()));
1486 private void logErrorMessage(Level level, String message, Throwable throwable) {
1487 logger.log(level, String.format(
"%s (data source = %s, objId = %d, pipeline id = %d, ingest job id = %d)", message,
this.dataSource.getName(), this.dataSource.getId(), pipelineId, ingestJobInfo.getIngestJobId()), throwable);
1497 private void logErrorMessage(Level level, String message) {
1498 logger.log(level, String.format(
"%s (data source = %s, objId = %d, pipeline id = %d, ingest job id %d)", message,
this.dataSource.getName(), this.dataSource.getId(), pipelineId, ingestJobInfo.getIngestJobId()));
1506 private void logIngestModuleErrors(List<IngestModuleError> errors) {
1507 for (IngestModuleError error : errors) {
1508 logErrorMessage(Level.SEVERE, String.format(
"%s experienced an error during analysis", error.getModuleDisplayName()), error.getThrowable());
1518 private void logIngestModuleErrors(List<IngestModuleError> errors, AbstractFile file) {
1519 for (IngestModuleError error : errors) {
1520 logErrorMessage(Level.SEVERE, String.format(
"%s experienced an error during analysis while processing file %s, object ID %d", error.getModuleDisplayName(), file.getName(), file.getId()), error.getThrowable());
1529 Snapshot getSnapshot(
boolean getIngestTasksSnapshot) {
1535 boolean fileIngestRunning =
false;
1536 Date fileIngestStartTime = null;
1537 for (FileIngestPipeline pipeline : this.fileIngestPipelines) {
1538 if (pipeline.isRunning()) {
1539 fileIngestRunning =
true;
1541 Date pipelineStartTime = pipeline.getStartTime();
1542 if (null != pipelineStartTime && (null == fileIngestStartTime || pipelineStartTime.before(fileIngestStartTime))) {
1543 fileIngestStartTime = pipelineStartTime;
1547 long processedFilesCount = 0;
1548 long estimatedFilesToProcessCount = 0;
1549 long snapShotTime =
new Date().getTime();
1550 IngestJobTasksSnapshot tasksSnapshot = null;
1551 if (getIngestTasksSnapshot) {
1552 synchronized (fileIngestProgressLock) {
1553 processedFilesCount = this.processedFiles;
1554 estimatedFilesToProcessCount = this.estimatedFilesToProcess;
1555 snapShotTime =
new Date().getTime();
1557 tasksSnapshot = taskScheduler.getTasksSnapshotForJob(pipelineId);
1560 return new Snapshot(dataSource.getName(),
1561 pipelineId, createTime,
1562 getCurrentDataSourceIngestModule(),
1563 fileIngestRunning, fileIngestStartTime,
1564 cancelled, cancellationReason, cancelledDataSourceIngestModules,
1565 processedFilesCount, estimatedFilesToProcessCount, snapShotTime, tasksSnapshot);
1574 void registerPausedIngestThread(Thread thread) {
1575 synchronized (threadRegistrationLock) {
1576 pausedIngestThreads.add(thread);
1586 void unregisterPausedIngestThread(Thread thread) {
1587 synchronized (threadRegistrationLock) {
1588 pausedIngestThreads.remove(thread);