19 package org.sleuthkit.autopsy.ingest;
21 import java.io.Serializable;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.Comparator;
26 import java.util.Deque;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Queue;
31 import java.util.TreeSet;
32 import java.util.concurrent.BlockingDeque;
33 import java.util.concurrent.LinkedBlockingDeque;
34 import java.util.logging.Level;
35 import java.util.regex.Matcher;
36 import java.util.regex.Pattern;
37 import javax.annotation.concurrent.GuardedBy;
38 import javax.annotation.concurrent.ThreadSafe;
55 final class IngestTasksScheduler {
57 private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
58 private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
59 @GuardedBy(
"IngestTasksScheduler.this")
60 private static IngestTasksScheduler instance;
61 private final IngestTaskTrackingQueue dataSourceIngestTasksQueue;
63 private final TreeSet<FileIngestTask> topLevelFileIngestTasksQueue;
65 private final Deque<FileIngestTask> batchedFileIngestTasksQueue;
67 private final Queue<FileIngestTask> streamedFileIngestTasksQueue;
68 private final IngestTaskTrackingQueue fileIngestTasksQueue;
69 private final IngestTaskTrackingQueue artifactIngestTasksQueue;
76 synchronized static IngestTasksScheduler getInstance() {
77 if (IngestTasksScheduler.instance == null) {
78 IngestTasksScheduler.instance =
new IngestTasksScheduler();
80 return IngestTasksScheduler.instance;
88 private IngestTasksScheduler() {
89 dataSourceIngestTasksQueue =
new IngestTaskTrackingQueue();
90 topLevelFileIngestTasksQueue =
new TreeSet<>(
new RootDirectoryTaskComparator());
91 batchedFileIngestTasksQueue =
new LinkedList<>();
92 fileIngestTasksQueue =
new IngestTaskTrackingQueue();
93 streamedFileIngestTasksQueue =
new LinkedList<>();
94 artifactIngestTasksQueue =
new IngestTaskTrackingQueue();
103 BlockingIngestTaskQueue getDataSourceIngestTaskQueue() {
104 return dataSourceIngestTasksQueue;
113 BlockingIngestTaskQueue getFileIngestTaskQueue() {
114 return fileIngestTasksQueue;
123 BlockingIngestTaskQueue getResultIngestTaskQueue() {
124 return artifactIngestTasksQueue;
141 synchronized void scheduleIngestTasks(IngestJobPipeline ingestPipeline) {
142 if (!ingestPipeline.isCancelled()) {
143 if (ingestPipeline.hasDataSourceIngestModules()) {
144 scheduleDataSourceIngestTask(ingestPipeline);
146 if (ingestPipeline.hasFileIngestModules()) {
147 scheduleFileIngestTasks(ingestPipeline, Collections.emptyList());
149 if (ingestPipeline.hasDataArtifactIngestModules()) {
150 scheduleDataArtifactIngestTasks(ingestPipeline);
166 synchronized void scheduleDataSourceIngestTask(IngestJobPipeline ingestPipeline) {
167 if (!ingestPipeline.isCancelled()) {
168 DataSourceIngestTask task =
new DataSourceIngestTask(ingestPipeline);
170 dataSourceIngestTasksQueue.putLast(task);
171 }
catch (InterruptedException ex) {
172 IngestTasksScheduler.logger.log(Level.INFO, String.format(
"Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (pipelineId={%d)", ingestPipeline.getId()), ex);
173 Thread.currentThread().interrupt();
193 synchronized void scheduleFileIngestTasks(IngestJobPipeline ingestPipeline, Collection<AbstractFile> files) {
194 if (!ingestPipeline.isCancelled()) {
195 Collection<AbstractFile> candidateFiles;
196 if (files.isEmpty()) {
197 candidateFiles = getTopLevelFiles(ingestPipeline.getDataSource());
199 candidateFiles = files;
201 for (AbstractFile file : candidateFiles) {
202 FileIngestTask task =
new FileIngestTask(ingestPipeline, file);
203 if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
204 topLevelFileIngestTasksQueue.add(task);
207 refillFileIngestTasksQueue();
223 synchronized void scheduleStreamedFileIngestTasks(IngestJobPipeline ingestPipeline, List<Long> fileIds) {
224 if (!ingestPipeline.isCancelled()) {
225 for (
long id : fileIds) {
233 FileIngestTask task =
new FileIngestTask(ingestPipeline,
id);
234 streamedFileIngestTasksQueue.add(task);
236 refillFileIngestTasksQueue();
255 synchronized void fastTrackFileIngestTasks(IngestJobPipeline ingestPipeline, Collection<AbstractFile> files) {
256 if (!ingestPipeline.isCancelled()) {
265 for (AbstractFile file : files) {
266 FileIngestTask fileTask =
new FileIngestTask(ingestPipeline, file);
267 if (shouldEnqueueFileTask(fileTask)) {
269 fileIngestTasksQueue.putFirst(fileTask);
270 }
catch (InterruptedException ex) {
271 DataSource dataSource = ingestPipeline.getDataSource();
272 logger.log(Level.WARNING, String.format(
"Interrupted while enqueuing file tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex);
273 Thread.currentThread().interrupt();
293 synchronized void scheduleDataArtifactIngestTasks(IngestJobPipeline ingestPipeline) {
294 if (!ingestPipeline.isCancelled()) {
295 Blackboard blackboard = Case.getCurrentCase().getSleuthkitCase().getBlackboard();
297 List<DataArtifact> artifacts = blackboard.getDataArtifacts(ingestPipeline.getDataSource().getId(), null);
298 scheduleDataArtifactIngestTasks(ingestPipeline, artifacts);
299 }
catch (TskCoreException ex) {
300 DataSource dataSource = ingestPipeline.getDataSource();
301 logger.log(Level.SEVERE, String.format(
"Failed to retrieve data artifacts for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex);
321 synchronized void scheduleDataArtifactIngestTasks(IngestJobPipeline ingestPipeline, List<DataArtifact> artifacts) {
322 if (!ingestPipeline.isCancelled()) {
323 for (DataArtifact artifact : artifacts) {
324 DataArtifactIngestTask task =
new DataArtifactIngestTask(ingestPipeline, artifact);
326 this.artifactIngestTasksQueue.putLast(task);
327 }
catch (InterruptedException ex) {
328 DataSource dataSource = ingestPipeline.getDataSource();
329 logger.log(Level.WARNING, String.format(
"Interrupted while enqueuing data artifact tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex);
330 Thread.currentThread().interrupt();
343 synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
344 dataSourceIngestTasksQueue.taskCompleted(task);
353 synchronized void notifyTaskCompleted(FileIngestTask task) {
354 fileIngestTasksQueue.taskCompleted(task);
355 refillFileIngestTasksQueue();
364 synchronized void notifyTaskCompleted(DataArtifactIngestTask task) {
365 artifactIngestTasksQueue.taskCompleted(task);
376 synchronized boolean currentTasksAreCompleted(IngestJobPipeline ingestPipeline) {
377 long pipelineId = ingestPipeline.getId();
378 return !(dataSourceIngestTasksQueue.hasTasksForJob(pipelineId)
379 || hasTasksForJob(topLevelFileIngestTasksQueue, pipelineId)
380 || hasTasksForJob(batchedFileIngestTasksQueue, pipelineId)
381 || hasTasksForJob(streamedFileIngestTasksQueue, pipelineId)
382 || fileIngestTasksQueue.hasTasksForJob(pipelineId)
383 || artifactIngestTasksQueue.hasTasksForJob(pipelineId));
405 synchronized void cancelPendingFileTasksForIngestJob(IngestJobPipeline ingestJobPipeline) {
406 long jobId = ingestJobPipeline.getId();
407 removeTasksForJob(topLevelFileIngestTasksQueue, jobId);
408 removeTasksForJob(batchedFileIngestTasksQueue, jobId);
409 removeTasksForJob(streamedFileIngestTasksQueue, jobId);
420 private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
421 List<AbstractFile> topLevelFiles =
new ArrayList<>();
422 Collection<AbstractFile> rootObjects = dataSource.accept(
new GetRootDirectoryVisitor());
423 if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
425 topLevelFiles.add((AbstractFile) dataSource);
427 for (AbstractFile root : rootObjects) {
428 List<Content> children;
430 children = root.getChildren();
431 if (children.isEmpty()) {
434 topLevelFiles.add(root);
438 for (Content child : children) {
439 if (child instanceof AbstractFile) {
440 topLevelFiles.add((AbstractFile) child);
444 }
catch (TskCoreException ex) {
445 logger.log(Level.SEVERE,
"Could not get children of root to enqueue: " + root.getId() +
": " + root.getName(), ex);
449 return topLevelFiles;
459 synchronized private void refillFileIngestTasksQueue() {
461 takeFromStreamingFileTasksQueue();
462 takeFromBatchTasksQueues();
463 }
catch (InterruptedException ex) {
464 IngestTasksScheduler.logger.log(Level.INFO,
"Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
465 Thread.currentThread().interrupt();
475 synchronized private void takeFromStreamingFileTasksQueue() throws InterruptedException {
476 while (fileIngestTasksQueue.isEmpty()) {
478 while (taskCount < IngestManager.getInstance().getNumberOfFileIngestThreads()) {
479 final FileIngestTask streamingTask = streamedFileIngestTasksQueue.poll();
480 if (streamingTask == null) {
483 if (shouldEnqueueFileTask(streamingTask)) {
484 fileIngestTasksQueue.putLast(streamingTask);
516 synchronized private void takeFromBatchTasksQueues() throws InterruptedException {
518 while (fileIngestTasksQueue.isEmpty()) {
523 if (batchedFileIngestTasksQueue.isEmpty()) {
524 final FileIngestTask topLevelTask = topLevelFileIngestTasksQueue.pollFirst();
525 if (topLevelTask != null) {
526 batchedFileIngestTasksQueue.addLast(topLevelTask);
534 final FileIngestTask nextTask = batchedFileIngestTasksQueue.pollFirst();
535 if (nextTask == null) {
538 if (shouldEnqueueFileTask(nextTask)) {
539 fileIngestTasksQueue.putLast(nextTask);
546 AbstractFile file = null;
548 file = nextTask.getFile();
549 for (Content child : file.getChildren()) {
550 if (child instanceof AbstractFile) {
551 AbstractFile childFile = (AbstractFile) child;
552 FileIngestTask childTask =
new FileIngestTask(nextTask.getIngestJobPipeline(), childFile);
553 if (childFile.hasChildren()) {
554 batchedFileIngestTasksQueue.add(childTask);
555 }
else if (shouldEnqueueFileTask(childTask)) {
556 fileIngestTasksQueue.putLast(childTask);
560 }
catch (TskCoreException ex) {
562 logger.log(Level.SEVERE, String.format(
"Error getting the children of %s (object ID = %d)", file.getName(), file.getId()), ex);
564 logger.log(Level.SEVERE,
"Error loading file with object ID = {0}", nextTask.getFileId());
580 private static boolean shouldEnqueueFileTask(
final FileIngestTask task) {
583 file = task.getFile();
584 }
catch (TskCoreException ex) {
585 logger.log(Level.SEVERE,
"Error loading file with ID {0}", task.getFileId());
591 String fileName = file.getName();
593 if (fileName.equals(
".") || fileName.equals(
"..")) {
609 if (!file.isDir() && !shouldBeCarved(task) && !fileAcceptedByFilter(task)) {
619 TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
621 FileSystem fs = f.getFileSystem();
623 fsType = fs.getFsType();
625 }
catch (TskCoreException ex) {
626 logger.log(Level.SEVERE,
"Error querying file system for " + f, ex);
630 if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
635 boolean isInRootDir =
false;
637 AbstractFile parent = f.getParentDirectory();
638 if (parent == null) {
641 isInRootDir = parent.isRoot();
643 }
catch (TskCoreException ex) {
644 logger.log(Level.WARNING,
"Error querying parent directory for" + f.getName(), ex);
650 if (isInRootDir && f.getMetaAddr() < 32) {
651 String name = f.getName();
652 if (name.length() > 0 && name.charAt(0) ==
'$' && name.contains(
":")) {
668 private static boolean shouldBeCarved(
final FileIngestTask task) {
670 AbstractFile file = task.getFile();
671 return task.getIngestJobPipeline().shouldProcessUnallocatedSpace() && file.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS);
672 }
catch (TskCoreException ex) {
685 private static boolean fileAcceptedByFilter(
final FileIngestTask task) {
687 AbstractFile file = task.getFile();
688 return !(task.getIngestJobPipeline().getFileIngestFilter().fileIsMemberOf(file) == null);
689 }
catch (TskCoreException ex) {
703 synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks,
long pipelineId) {
704 for (IngestTask task : tasks) {
705 if (task.getIngestJobPipeline().getId() == pipelineId) {
719 private static void removeTasksForJob(Collection<? extends IngestTask> tasks,
long pipelineId) {
720 Iterator<? extends IngestTask> iterator = tasks.iterator();
721 while (iterator.hasNext()) {
722 IngestTask task = iterator.next();
723 if (task.getIngestJobPipeline().getId() == pipelineId) {
738 private static int countTasksForJob(Collection<? extends IngestTask> tasks,
long pipelineId) {
740 for (IngestTask task : tasks) {
741 if (task.getIngestJobPipeline().getId() == pipelineId) {
756 synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(
long jobId) {
757 return new IngestJobTasksSnapshot(jobId, dataSourceIngestTasksQueue.countQueuedTasksForJob(jobId),
758 countTasksForJob(topLevelFileIngestTasksQueue, jobId),
759 countTasksForJob(batchedFileIngestTasksQueue, jobId),
760 fileIngestTasksQueue.countQueuedTasksForJob(jobId),
761 dataSourceIngestTasksQueue.countRunningTasksForJob(jobId) + fileIngestTasksQueue.countRunningTasksForJob(jobId) + artifactIngestTasksQueue.countRunningTasksForJob(jobId),
762 countTasksForJob(streamedFileIngestTasksQueue, jobId),
763 artifactIngestTasksQueue.countQueuedTasksForJob(jobId));
773 public int compare(FileIngestTask q1, FileIngestTask q2) {
776 AbstractFile file1 = null;
777 AbstractFile file2 = null;
779 file1 = q1.getFile();
780 }
catch (TskCoreException ex) {
785 file2 = q2.getFile();
786 }
catch (TskCoreException ex) {
792 return (
int) (q2.getFileId() - q1.getFileId());
796 }
else if (file2 == null) {
803 return (
int) (file2.getId() - file1.getId());
805 return p2.ordinal() - p1.ordinal();
820 LAST, LOW, MEDIUM, HIGH
823 static final List<Pattern> LAST_PRI_PATHS =
new ArrayList<>();
825 static final List<Pattern> LOW_PRI_PATHS =
new ArrayList<>();
827 static final List<Pattern> MEDIUM_PRI_PATHS =
new ArrayList<>();
829 static final List<Pattern> HIGH_PRI_PATHS =
new ArrayList<>();
845 LAST_PRI_PATHS.add(Pattern.compile(
"^pagefile", Pattern.CASE_INSENSITIVE));
846 LAST_PRI_PATHS.add(Pattern.compile(
"^hiberfil", Pattern.CASE_INSENSITIVE));
849 LOW_PRI_PATHS.add(Pattern.compile(
"^\\$OrphanFiles", Pattern.CASE_INSENSITIVE));
850 LOW_PRI_PATHS.add(Pattern.compile(
"^Windows", Pattern.CASE_INSENSITIVE));
852 MEDIUM_PRI_PATHS.add(Pattern.compile(
"^Program Files", Pattern.CASE_INSENSITIVE));
854 HIGH_PRI_PATHS.add(Pattern.compile(
"^Users", Pattern.CASE_INSENSITIVE));
855 HIGH_PRI_PATHS.add(Pattern.compile(
"^Documents and Settings", Pattern.CASE_INSENSITIVE));
856 HIGH_PRI_PATHS.add(Pattern.compile(
"^home", Pattern.CASE_INSENSITIVE));
857 HIGH_PRI_PATHS.add(Pattern.compile(
"^ProgramData", Pattern.CASE_INSENSITIVE));
868 if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) {
874 final String path = abstractFile.getName();
878 for (Pattern p : HIGH_PRI_PATHS) {
879 Matcher m = p.matcher(path);
884 for (Pattern p : MEDIUM_PRI_PATHS) {
885 Matcher m = p.matcher(path);
890 for (Pattern p : LOW_PRI_PATHS) {
891 Matcher m = p.matcher(path);
896 for (Pattern p : LAST_PRI_PATHS) {
897 Matcher m = p.matcher(path);
915 private final BlockingDeque<IngestTask>
taskQueue =
new LinkedBlockingDeque<>();
931 void putFirst(IngestTask task) throws InterruptedException {
932 synchronized (
this) {
936 this.taskQueue.putFirst(task);
937 }
catch (InterruptedException ex) {
938 synchronized (
this) {
955 void putLast(IngestTask task)
throws InterruptedException {
956 synchronized (
this) {
960 this.taskQueue.putLast(task);
961 }
catch (InterruptedException ex) {
962 synchronized (
this) {
981 IngestTask task = taskQueue.takeFirst();
982 synchronized (
this) {
995 synchronized (
this) {
1006 void taskCompleted(IngestTask task) {
1007 synchronized (
this) {
1020 boolean hasTasksForJob(
long pipelineId) {
1021 synchronized (
this) {
1022 return IngestTasksScheduler.hasTasksForJob(
queuedTasks, pipelineId) || IngestTasksScheduler.hasTasksForJob(
tasksInProgress, pipelineId);
1033 int countQueuedTasksForJob(
long pipelineId) {
1034 synchronized (
this) {
1035 return IngestTasksScheduler.countTasksForJob(
queuedTasks, pipelineId);
1046 int countRunningTasksForJob(
long pipelineId) {
1047 synchronized (
this) {
1048 return IngestTasksScheduler.countTasksForJob(
tasksInProgress, pipelineId);
1057 static final class IngestJobTasksSnapshot
implements Serializable {
1059 private static final long serialVersionUID = 1L;
1060 private final long jobId;
1061 private final long dsQueueSize;
1062 private final long rootQueueSize;
1063 private final long dirQueueSize;
1064 private final long fileQueueSize;
1065 private final long runningListSize;
1066 private final long streamingQueueSize;
1067 private final long artifactsQueueSize;
1083 IngestJobTasksSnapshot(
long jobId,
long dsQueueSize,
long rootQueueSize,
long dirQueueSize,
long fileQueueSize,
1084 long runningListSize,
long streamingQueueSize,
long artifactsQueueSize) {
1086 this.dsQueueSize = dsQueueSize;
1087 this.rootQueueSize = rootQueueSize;
1088 this.dirQueueSize = dirQueueSize;
1089 this.fileQueueSize = fileQueueSize;
1090 this.runningListSize = runningListSize;
1091 this.streamingQueueSize = streamingQueueSize;
1092 this.artifactsQueueSize = artifactsQueueSize;
1111 long getRootQueueSize() {
1112 return rootQueueSize;
1121 long getDirectoryTasksQueueSize() {
1122 return dirQueueSize;
1125 long getFileQueueSize() {
1126 return fileQueueSize;
1129 long getStreamingQueueSize() {
1130 return streamingQueueSize;
1133 long getDsQueueSize() {
1137 long getRunningListSize() {
1138 return runningListSize;
1141 long getArtifactsQueueSize() {
1142 return artifactsQueueSize;
final List< IngestTask > queuedTasks
final BlockingDeque< IngestTask > taskQueue
final List< IngestTask > tasksInProgress
int compare(FileIngestTask q1, FileIngestTask q2)