19 package org.sleuthkit.autopsy.ingest;
21 import java.io.Serializable;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Comparator;
25 import java.util.Deque;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Queue;
30 import java.util.TreeSet;
31 import java.util.concurrent.BlockingDeque;
32 import java.util.concurrent.LinkedBlockingDeque;
33 import java.util.logging.Level;
34 import java.util.regex.Matcher;
35 import java.util.regex.Pattern;
36 import javax.annotation.concurrent.GuardedBy;
37 import javax.annotation.concurrent.ThreadSafe;
55 final class IngestTasksScheduler {
57 private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
58 private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
59 @GuardedBy(
"IngestTasksScheduler.this")
60 private static IngestTasksScheduler instance;
61 private final IngestTaskTrackingQueue dataSourceIngestTasksQueue;
63 private final TreeSet<FileIngestTask> topLevelFileIngestTasksQueue;
65 private final Deque<FileIngestTask> batchedFileIngestTasksQueue;
67 private final Queue<FileIngestTask> streamedFileIngestTasksQueue;
68 private final IngestTaskTrackingQueue fileIngestTasksQueue;
69 private final IngestTaskTrackingQueue artifactIngestTasksQueue;
70 private final IngestTaskTrackingQueue resultIngestTasksQueue;
77 synchronized static IngestTasksScheduler getInstance() {
78 if (IngestTasksScheduler.instance == null) {
79 IngestTasksScheduler.instance =
new IngestTasksScheduler();
81 return IngestTasksScheduler.instance;
89 private IngestTasksScheduler() {
90 dataSourceIngestTasksQueue =
new IngestTaskTrackingQueue();
91 topLevelFileIngestTasksQueue =
new TreeSet<>(
new RootDirectoryTaskComparator());
92 batchedFileIngestTasksQueue =
new LinkedList<>();
93 fileIngestTasksQueue =
new IngestTaskTrackingQueue();
94 streamedFileIngestTasksQueue =
new LinkedList<>();
95 artifactIngestTasksQueue =
new IngestTaskTrackingQueue();
96 resultIngestTasksQueue =
new IngestTaskTrackingQueue();
105 BlockingIngestTaskQueue getDataSourceIngestTaskQueue() {
106 return dataSourceIngestTasksQueue;
115 BlockingIngestTaskQueue getFileIngestTaskQueue() {
116 return fileIngestTasksQueue;
125 BlockingIngestTaskQueue getDataArtifactIngestTaskQueue() {
126 return artifactIngestTasksQueue;
135 BlockingIngestTaskQueue getAnalysisResultIngestTaskQueue() {
136 return resultIngestTasksQueue;
149 synchronized void scheduleDataSourceIngestTask(IngestJobExecutor executor) {
150 if (!executor.isCancelled()) {
151 DataSourceIngestTask task =
new DataSourceIngestTask(executor);
153 dataSourceIngestTasksQueue.putLast(task);
154 }
catch (InterruptedException ex) {
155 IngestTasksScheduler.logger.log(Level.INFO, String.format(
"Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (ingest job ID={%d)", executor.getIngestJobId()), ex);
156 Thread.currentThread().interrupt();
175 synchronized void scheduleFileIngestTasks(IngestJobExecutor executor, Collection<AbstractFile> files) {
176 if (!executor.isCancelled()) {
177 Collection<AbstractFile> candidateFiles;
178 if (files.isEmpty()) {
179 candidateFiles = getTopLevelFiles(executor.getDataSource());
181 candidateFiles = files;
183 for (AbstractFile file : candidateFiles) {
184 FileIngestTask task =
new FileIngestTask(executor, file);
185 if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
186 topLevelFileIngestTasksQueue.add(task);
189 refillFileIngestTasksQueue();
204 synchronized void scheduleStreamedFileIngestTasks(IngestJobExecutor executor, List<Long> fileIds) {
205 if (!executor.isCancelled()) {
206 for (
long id : fileIds) {
214 FileIngestTask task =
new FileIngestTask(executor,
id);
215 streamedFileIngestTasksQueue.add(task);
217 refillFileIngestTasksQueue();
235 synchronized void scheduleHighPriorityFileIngestTasks(IngestJobExecutor executor, Collection<AbstractFile> files) {
236 if (!executor.isCancelled()) {
245 for (AbstractFile file : files) {
246 FileIngestTask fileTask =
new FileIngestTask(executor, file);
247 if (shouldEnqueueFileTask(fileTask)) {
249 fileIngestTasksQueue.putFirst(fileTask);
250 }
catch (InterruptedException ex) {
251 DataSource dataSource = executor.getDataSource();
252 logger.log(Level.WARNING, String.format(
"Interrupted while enqueuing file tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex);
253 Thread.currentThread().interrupt();
272 synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor executor) {
273 if (!executor.isCancelled()) {
274 Blackboard blackboard = Case.getCurrentCase().getSleuthkitCase().getBlackboard();
276 List<DataArtifact> artifacts = blackboard.getDataArtifacts(executor.getDataSource().getId(), null);
277 scheduleDataArtifactIngestTasks(executor, artifacts);
278 }
catch (TskCoreException ex) {
279 DataSource dataSource = executor.getDataSource();
280 logger.log(Level.SEVERE, String.format(
"Failed to retrieve data artifacts for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex);
296 synchronized void scheduleAnalysisResultIngestTasks(IngestJobExecutor executor) {
297 if (!executor.isCancelled()) {
298 Blackboard blackboard = Case.getCurrentCase().getSleuthkitCase().getBlackboard();
300 List<AnalysisResult> results = blackboard.getAnalysisResults(executor.getDataSource().getId(), null);
301 scheduleAnalysisResultIngestTasks(executor, results);
302 }
catch (TskCoreException ex) {
303 DataSource dataSource = executor.getDataSource();
304 logger.log(Level.SEVERE, String.format(
"Failed to retrieve analysis results for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex);
323 synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor executor, List<DataArtifact> artifacts) {
324 if (!executor.isCancelled()) {
325 for (DataArtifact artifact : artifacts) {
326 DataArtifactIngestTask task =
new DataArtifactIngestTask(executor, artifact);
328 artifactIngestTasksQueue.putLast(task);
329 }
catch (InterruptedException ex) {
330 DataSource dataSource = executor.getDataSource();
331 logger.log(Level.WARNING, String.format(
"Interrupted while enqueuing data artifact tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex);
332 Thread.currentThread().interrupt();
353 synchronized void scheduleAnalysisResultIngestTasks(IngestJobExecutor executor, List<AnalysisResult> results) {
354 if (!executor.isCancelled()) {
355 for (AnalysisResult result : results) {
356 AnalysisResultIngestTask task =
new AnalysisResultIngestTask(executor, result);
358 resultIngestTasksQueue.putLast(task);
359 }
catch (InterruptedException ex) {
360 DataSource dataSource = executor.getDataSource();
361 logger.log(Level.WARNING, String.format(
"Interrupted while enqueuing analysis results tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex);
362 Thread.currentThread().interrupt();
375 synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
376 dataSourceIngestTasksQueue.taskCompleted(task);
385 synchronized void notifyTaskCompleted(FileIngestTask task) {
386 fileIngestTasksQueue.taskCompleted(task);
387 refillFileIngestTasksQueue();
396 synchronized void notifyTaskCompleted(DataArtifactIngestTask task) {
397 artifactIngestTasksQueue.taskCompleted(task);
406 synchronized void notifyTaskCompleted(AnalysisResultIngestTask task) {
407 resultIngestTasksQueue.taskCompleted(task);
418 synchronized boolean currentTasksAreCompleted(Long ingestJobId) {
419 return !(dataSourceIngestTasksQueue.hasTasksForJob(ingestJobId)
420 || hasTasksForJob(topLevelFileIngestTasksQueue, ingestJobId)
421 || hasTasksForJob(batchedFileIngestTasksQueue, ingestJobId)
422 || hasTasksForJob(streamedFileIngestTasksQueue, ingestJobId)
423 || fileIngestTasksQueue.hasTasksForJob(ingestJobId)
424 || artifactIngestTasksQueue.hasTasksForJob(ingestJobId)
425 || resultIngestTasksQueue.hasTasksForJob(ingestJobId));
447 synchronized void cancelPendingFileTasksForIngestJob(
long ingestJobId) {
448 removeTasksForJob(topLevelFileIngestTasksQueue, ingestJobId);
449 removeTasksForJob(batchedFileIngestTasksQueue, ingestJobId);
450 removeTasksForJob(streamedFileIngestTasksQueue, ingestJobId);
461 private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
462 List<AbstractFile> topLevelFiles =
new ArrayList<>();
463 Collection<AbstractFile> rootObjects = dataSource.accept(
new GetRootDirectoryVisitor());
464 if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
468 topLevelFiles.add((AbstractFile) dataSource);
470 for (AbstractFile root : rootObjects) {
471 List<Content> children;
473 children = root.getChildren();
474 if (children.isEmpty()) {
480 topLevelFiles.add(root);
486 for (Content child : children) {
487 if (child instanceof AbstractFile) {
488 topLevelFiles.add((AbstractFile) child);
492 }
catch (TskCoreException ex) {
493 logger.log(Level.SEVERE,
"Could not get children of root to enqueue: " + root.getId() +
": " + root.getName(), ex);
497 return topLevelFiles;
507 synchronized private void refillFileIngestTasksQueue() {
509 takeFromStreamingFileTasksQueue();
510 takeFromBatchTasksQueues();
511 }
catch (InterruptedException ex) {
512 IngestTasksScheduler.logger.log(Level.INFO,
"Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
513 Thread.currentThread().interrupt();
523 synchronized private void takeFromStreamingFileTasksQueue() throws InterruptedException {
524 while (fileIngestTasksQueue.isEmpty()) {
526 while (taskCount < IngestManager.getInstance().getNumberOfFileIngestThreads()) {
527 final FileIngestTask streamingTask = streamedFileIngestTasksQueue.poll();
528 if (streamingTask == null) {
531 if (shouldEnqueueFileTask(streamingTask)) {
532 fileIngestTasksQueue.putLast(streamingTask);
564 synchronized private void takeFromBatchTasksQueues() throws InterruptedException {
566 while (fileIngestTasksQueue.isEmpty()) {
571 if (batchedFileIngestTasksQueue.isEmpty()) {
572 final FileIngestTask topLevelTask = topLevelFileIngestTasksQueue.pollFirst();
573 if (topLevelTask != null) {
574 batchedFileIngestTasksQueue.addLast(topLevelTask);
582 final FileIngestTask nextTask = batchedFileIngestTasksQueue.pollFirst();
583 if (nextTask == null) {
586 if (shouldEnqueueFileTask(nextTask)) {
587 fileIngestTasksQueue.putLast(nextTask);
594 AbstractFile file = null;
596 file = nextTask.getFile();
597 List<Content> children = file.getChildren();
598 for (Content child : children) {
599 if (child instanceof AbstractFile) {
600 AbstractFile childFile = (AbstractFile) child;
601 FileIngestTask childTask =
new FileIngestTask(nextTask.getIngestJobExecutor(), childFile);
602 if (childFile.hasChildren()) {
603 batchedFileIngestTasksQueue.add(childTask);
604 }
else if (shouldEnqueueFileTask(childTask)) {
605 fileIngestTasksQueue.putLast(childTask);
609 }
catch (TskCoreException ex) {
611 logger.log(Level.SEVERE, String.format(
"Error getting the children of %s (object ID = %d)", file.getName(), file.getId()), ex);
613 logger.log(Level.SEVERE,
"Error loading file with object ID = {0}", nextTask.getFileId());
629 private static boolean shouldEnqueueFileTask(
final FileIngestTask task) {
632 file = task.getFile();
633 }
catch (TskCoreException ex) {
634 logger.log(Level.SEVERE,
"Error loading file with ID {0}", task.getFileId());
642 String fileName = file.getName();
644 if (fileName.equals(
".") || fileName.equals(
"..")) {
660 if (!file.isDir() && !shouldBeCarved(task) && !fileAcceptedByFilter(task)) {
674 TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
676 FileSystem fs = f.getFileSystem();
678 fsType = fs.getFsType();
680 }
catch (TskCoreException ex) {
681 logger.log(Level.SEVERE,
"Error querying file system for " + f, ex);
687 if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
694 boolean isInRootDir =
false;
696 AbstractFile parent = f.getParentDirectory();
697 if (parent == null) {
700 isInRootDir = parent.isRoot();
702 }
catch (TskCoreException ex) {
703 logger.log(Level.WARNING,
"Error querying parent directory for" + f.getName(), ex);
711 if (isInRootDir && f.getMetaAddr() < 32) {
712 String name = f.getName();
713 if (name.length() > 0 && name.charAt(0) ==
'$' && name.contains(
":")) {
729 private static boolean shouldBeCarved(
final FileIngestTask task) {
731 AbstractFile file = task.getFile();
732 return task.getIngestJobExecutor().shouldProcessUnallocatedSpace() && file.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS);
733 }
catch (TskCoreException ex) {
746 private static boolean fileAcceptedByFilter(
final FileIngestTask task) {
748 AbstractFile file = task.getFile();
749 return !(task.getIngestJobExecutor().getFileIngestFilter().fileIsMemberOf(file) == null);
750 }
catch (TskCoreException ex) {
764 synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks,
long ingestJobId) {
765 for (IngestTask task : tasks) {
766 if (task.getIngestJobExecutor().getIngestJobId() == ingestJobId) {
780 private static void removeTasksForJob(Collection<? extends IngestTask> tasks,
long ingestJobId) {
781 Iterator<? extends IngestTask> iterator = tasks.iterator();
782 while (iterator.hasNext()) {
783 IngestTask task = iterator.next();
784 if (task.getIngestJobExecutor().getIngestJobId() == ingestJobId) {
799 private static int countTasksForJob(Collection<? extends IngestTask> tasks,
long ingestJobId) {
801 for (IngestTask task : tasks) {
802 if (task.getIngestJobExecutor().getIngestJobId() == ingestJobId) {
817 synchronized IngestTasksSnapshot getTasksSnapshotForJob(
long ingestJobId) {
818 return new IngestTasksSnapshot(
820 dataSourceIngestTasksQueue.countQueuedTasksForJob(ingestJobId),
821 countTasksForJob(topLevelFileIngestTasksQueue, ingestJobId),
822 countTasksForJob(batchedFileIngestTasksQueue, ingestJobId),
823 fileIngestTasksQueue.countQueuedTasksForJob(ingestJobId),
824 countTasksForJob(streamedFileIngestTasksQueue, ingestJobId),
825 artifactIngestTasksQueue.countQueuedTasksForJob(ingestJobId),
826 resultIngestTasksQueue.countQueuedTasksForJob(ingestJobId),
827 dataSourceIngestTasksQueue.countRunningTasksForJob(ingestJobId) + fileIngestTasksQueue.countRunningTasksForJob(ingestJobId) + artifactIngestTasksQueue.countRunningTasksForJob(ingestJobId) + resultIngestTasksQueue.countRunningTasksForJob(ingestJobId)
838 public int compare(FileIngestTask q1, FileIngestTask q2) {
844 AbstractFile file1 = null;
845 AbstractFile file2 = null;
847 file1 = q1.getFile();
848 }
catch (TskCoreException ex) {
855 file2 = q2.getFile();
856 }
catch (TskCoreException ex) {
864 return (
int) (q2.getFileId() - q1.getFileId());
868 }
else if (file2 == null) {
875 return (
int) (file2.getId() - file1.getId());
877 return p2.ordinal() - p1.ordinal();
892 LAST, LOW, MEDIUM, HIGH
895 static final List<Pattern> LAST_PRI_PATHS =
new ArrayList<>();
897 static final List<Pattern> LOW_PRI_PATHS =
new ArrayList<>();
899 static final List<Pattern> MEDIUM_PRI_PATHS =
new ArrayList<>();
901 static final List<Pattern> HIGH_PRI_PATHS =
new ArrayList<>();
913 LAST_PRI_PATHS.add(Pattern.compile(
"^pagefile", Pattern.CASE_INSENSITIVE));
914 LAST_PRI_PATHS.add(Pattern.compile(
"^hiberfil", Pattern.CASE_INSENSITIVE));
917 LOW_PRI_PATHS.add(Pattern.compile(
"^\\$OrphanFiles", Pattern.CASE_INSENSITIVE));
918 LOW_PRI_PATHS.add(Pattern.compile(
"^Windows", Pattern.CASE_INSENSITIVE));
920 MEDIUM_PRI_PATHS.add(Pattern.compile(
"^Program Files", Pattern.CASE_INSENSITIVE));
922 HIGH_PRI_PATHS.add(Pattern.compile(
"^Users", Pattern.CASE_INSENSITIVE));
923 HIGH_PRI_PATHS.add(Pattern.compile(
"^Documents and Settings", Pattern.CASE_INSENSITIVE));
924 HIGH_PRI_PATHS.add(Pattern.compile(
"^home", Pattern.CASE_INSENSITIVE));
925 HIGH_PRI_PATHS.add(Pattern.compile(
"^ProgramData", Pattern.CASE_INSENSITIVE));
936 if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) {
942 final String path = abstractFile.getName();
946 for (Pattern p : HIGH_PRI_PATHS) {
947 Matcher m = p.matcher(path);
952 for (Pattern p : MEDIUM_PRI_PATHS) {
953 Matcher m = p.matcher(path);
958 for (Pattern p : LOW_PRI_PATHS) {
959 Matcher m = p.matcher(path);
964 for (Pattern p : LAST_PRI_PATHS) {
965 Matcher m = p.matcher(path);
983 private final BlockingDeque<IngestTask>
taskQueue =
new LinkedBlockingDeque<>();
999 void putFirst(IngestTask task) throws InterruptedException {
1000 synchronized (
this) {
1004 this.taskQueue.putFirst(task);
1005 }
catch (InterruptedException ex) {
1006 synchronized (
this) {
1023 void putLast(IngestTask task)
throws InterruptedException {
1024 synchronized (
this) {
1028 this.taskQueue.putLast(task);
1029 }
catch (InterruptedException ex) {
1030 synchronized (
this) {
1049 IngestTask task = taskQueue.takeFirst();
1050 synchronized (
this) {
1063 synchronized (
this) {
1074 void taskCompleted(IngestTask task) {
1075 synchronized (
this) {
1088 boolean hasTasksForJob(
long ingestJobId) {
1089 synchronized (
this) {
1090 return IngestTasksScheduler.hasTasksForJob(
queuedTasks, ingestJobId) || IngestTasksScheduler.hasTasksForJob(
tasksInProgress, ingestJobId);
1101 int countQueuedTasksForJob(
long ingestJobId) {
1102 synchronized (
this) {
1103 return IngestTasksScheduler.countTasksForJob(
queuedTasks, ingestJobId);
1114 int countRunningTasksForJob(
long ingestJobId) {
1115 synchronized (
this) {
1116 return IngestTasksScheduler.countTasksForJob(
tasksInProgress, ingestJobId);
1126 static final class IngestTasksSnapshot
implements Serializable {
1128 private static final long serialVersionUID = 1L;
1129 private final long ingestJobId;
1130 private final long dataSourceQueueSize;
1131 private final long rootQueueSize;
1132 private final long dirQueueSize;
1133 private final long fileQueueSize;
1134 private final long inProgressListSize;
1135 private final long streamedFileQueueSize;
1136 private final long artifactsQueueSize;
1137 private final long resultsQueueSize;
1160 IngestTasksSnapshot(
long ingestJobId,
long dataSourceQueueSize,
long rootQueueSize,
long dirQueueSize,
long fileQueueSize,
long streamedFileQueueSize,
long artifactsQueueSize,
long resultsQueueSize,
long inProgressListSize) {
1161 this.ingestJobId = ingestJobId;
1162 this.dataSourceQueueSize = dataSourceQueueSize;
1163 this.rootQueueSize = rootQueueSize;
1164 this.dirQueueSize = dirQueueSize;
1165 this.fileQueueSize = fileQueueSize;
1166 this.streamedFileQueueSize = streamedFileQueueSize;
1167 this.artifactsQueueSize = artifactsQueueSize;
1168 this.resultsQueueSize = resultsQueueSize;
1169 this.inProgressListSize = inProgressListSize;
1178 long getIngestJobId() {
1188 long getRootQueueSize() {
1189 return rootQueueSize;
1198 long getDirQueueSize() {
1199 return dirQueueSize;
1208 long getFileQueueSize() {
1209 return fileQueueSize;
1218 long getStreamedFilesQueueSize() {
1219 return streamedFileQueueSize;
1228 long getDataSourceQueueSize() {
1229 return dataSourceQueueSize;
1238 long getArtifactsQueueSize() {
1239 return artifactsQueueSize;
1248 long getResultsQueueSize() {
1249 return resultsQueueSize;
1258 long getProgressListSize() {
1259 return inProgressListSize;
final List< IngestTask > queuedTasks
final BlockingDeque< IngestTask > taskQueue
final List< IngestTask > tasksInProgress
int compare(FileIngestTask q1, FileIngestTask q2)