19 package org.sleuthkit.autopsy.ingest;
21 import java.io.Serializable;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.Comparator;
26 import java.util.Deque;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Queue;
31 import java.util.TreeSet;
32 import java.util.concurrent.BlockingDeque;
33 import java.util.concurrent.LinkedBlockingDeque;
34 import java.util.logging.Level;
35 import java.util.regex.Matcher;
36 import java.util.regex.Pattern;
37 import javax.annotation.concurrent.GuardedBy;
38 import javax.annotation.concurrent.ThreadSafe;
51 final class IngestTasksScheduler {
53 private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
54 private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
55 @GuardedBy(
"IngestTasksScheduler.this")
56 private static IngestTasksScheduler instance;
57 private final IngestTaskTrackingQueue dataSourceIngestThreadQueue;
59 private final TreeSet<FileIngestTask> rootFileTaskQueue;
61 private final Deque<FileIngestTask> pendingFileTaskQueue;
63 private final Queue<FileIngestTask> streamedTasksQueue;
64 private final IngestTaskTrackingQueue fileIngestThreadsQueue;
71 synchronized static IngestTasksScheduler getInstance() {
72 if (IngestTasksScheduler.instance == null) {
73 IngestTasksScheduler.instance =
new IngestTasksScheduler();
75 return IngestTasksScheduler.instance;
83 private IngestTasksScheduler() {
84 this.dataSourceIngestThreadQueue =
new IngestTaskTrackingQueue();
85 this.rootFileTaskQueue =
new TreeSet<>(
new RootDirectoryTaskComparator());
86 this.pendingFileTaskQueue =
new LinkedList<>();
87 this.fileIngestThreadsQueue =
new IngestTaskTrackingQueue();
88 this.streamedTasksQueue =
new LinkedList<>();
97 BlockingIngestTaskQueue getDataSourceIngestTaskQueue() {
98 return this.dataSourceIngestThreadQueue;
107 BlockingIngestTaskQueue getFileIngestTaskQueue() {
108 return this.fileIngestThreadsQueue;
117 synchronized void scheduleIngestTasks(IngestJobPipeline ingestJobPipeline) {
118 if (!ingestJobPipeline.isCancelled()) {
127 this.scheduleDataSourceIngestTask(ingestJobPipeline);
128 this.scheduleFileIngestTasks(ingestJobPipeline, Collections.emptyList());
137 synchronized void scheduleDataSourceIngestTask(IngestJobPipeline ingestJobPipeline) {
138 if (!ingestJobPipeline.isCancelled()) {
139 DataSourceIngestTask task =
new DataSourceIngestTask(ingestJobPipeline);
141 this.dataSourceIngestThreadQueue.putLast(task);
142 }
catch (InterruptedException ex) {
143 IngestTasksScheduler.logger.log(Level.INFO, String.format(
"Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (jobId={%d)", ingestJobPipeline.getId()), ex);
144 Thread.currentThread().interrupt();
157 synchronized void scheduleFileIngestTasks(IngestJobPipeline ingestJobPipeline, Collection<AbstractFile> files) {
158 if (!ingestJobPipeline.isCancelled()) {
159 Collection<AbstractFile> candidateFiles;
160 if (files.isEmpty()) {
161 candidateFiles = getTopLevelFiles(ingestJobPipeline.getDataSource());
163 candidateFiles = files;
165 for (AbstractFile file : candidateFiles) {
166 FileIngestTask task =
new FileIngestTask(ingestJobPipeline, file);
167 if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
168 this.rootFileTaskQueue.add(task);
171 refillIngestThreadQueue();
182 synchronized void scheduleStreamedFileIngestTasks(IngestJobPipeline ingestJobPipeline, List<Long> fileIds) {
183 if (!ingestJobPipeline.isCancelled()) {
184 for (
long id : fileIds) {
187 FileIngestTask task =
new FileIngestTask(ingestJobPipeline,
id);
188 this.streamedTasksQueue.add(task);
190 refillIngestThreadQueue();
202 synchronized void fastTrackFileIngestTasks(IngestJobPipeline ingestJobPipeline, Collection<AbstractFile> files) {
203 if (!ingestJobPipeline.isCancelled()) {
212 for (AbstractFile file : files) {
213 FileIngestTask fileTask =
new FileIngestTask(ingestJobPipeline, file);
214 if (shouldEnqueueFileTask(fileTask)) {
216 this.fileIngestThreadsQueue.putFirst(fileTask);
217 }
catch (InterruptedException ex) {
218 IngestTasksScheduler.logger.log(Level.INFO, String.format(
"Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", ingestJobPipeline.getId()), ex);
219 Thread.currentThread().interrupt();
233 synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
234 this.dataSourceIngestThreadQueue.taskCompleted(task);
243 synchronized void notifyTaskCompleted(FileIngestTask task) {
244 this.fileIngestThreadsQueue.taskCompleted(task);
245 refillIngestThreadQueue();
256 synchronized boolean currentTasksAreCompleted(IngestJobPipeline ingestJobPipeline) {
257 long jobId = ingestJobPipeline.getId();
259 return !(this.dataSourceIngestThreadQueue.hasTasksForJob(jobId)
260 || hasTasksForJob(this.rootFileTaskQueue, jobId)
261 || hasTasksForJob(this.pendingFileTaskQueue, jobId)
262 || hasTasksForJob(this.streamedTasksQueue, jobId)
263 || this.fileIngestThreadsQueue.hasTasksForJob(jobId));
273 synchronized void cancelPendingTasksForIngestJob(IngestJobPipeline ingestJobPipeline) {
274 long jobId = ingestJobPipeline.getId();
275 IngestTasksScheduler.removeTasksForJob(rootFileTaskQueue, jobId);
276 IngestTasksScheduler.removeTasksForJob(pendingFileTaskQueue, jobId);
277 IngestTasksScheduler.removeTasksForJob(streamedTasksQueue, jobId);
289 private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
290 List<AbstractFile> topLevelFiles =
new ArrayList<>();
291 Collection<AbstractFile> rootObjects = dataSource.accept(
new GetRootDirectoryVisitor());
292 if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
294 topLevelFiles.add((AbstractFile) dataSource);
296 for (AbstractFile root : rootObjects) {
297 List<Content> children;
299 children = root.getChildren();
300 if (children.isEmpty()) {
303 topLevelFiles.add(root);
307 for (Content child : children) {
308 if (child instanceof AbstractFile) {
309 topLevelFiles.add((AbstractFile) child);
313 }
catch (TskCoreException ex) {
314 logger.log(Level.WARNING,
"Could not get children of root to enqueue: " + root.getId() +
": " + root.getName(), ex);
318 return topLevelFiles;
325 synchronized private void refillIngestThreadQueue() {
327 takeFromStreamingTaskQueue();
328 takeFromBatchTasksQueues();
329 }
catch (InterruptedException ex) {
330 IngestTasksScheduler.logger.log(Level.INFO,
"Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
331 Thread.currentThread().interrupt();
339 synchronized private void takeFromStreamingTaskQueue() throws InterruptedException {
343 while (fileIngestThreadsQueue.isEmpty()) {
349 while (taskCount < IngestManager.getInstance().getNumberOfFileIngestThreads()) {
350 final FileIngestTask streamingTask = streamedTasksQueue.poll();
351 if (streamingTask == null) {
355 if (shouldEnqueueFileTask(streamingTask)) {
356 fileIngestThreadsQueue.putLast(streamingTask);
393 synchronized private void takeFromBatchTasksQueues() throws InterruptedException {
395 while (this.fileIngestThreadsQueue.isEmpty()) {
400 if (this.pendingFileTaskQueue.isEmpty()) {
401 final FileIngestTask rootTask = this.rootFileTaskQueue.pollFirst();
402 if (rootTask != null) {
403 this.pendingFileTaskQueue.addLast(rootTask);
412 final FileIngestTask pendingTask = this.pendingFileTaskQueue.pollFirst();
413 if (pendingTask == null) {
416 if (shouldEnqueueFileTask(pendingTask)) {
421 this.fileIngestThreadsQueue.putLast(pendingTask);
431 AbstractFile file = null;
433 file = pendingTask.getFile();
434 for (Content child : file.getChildren()) {
435 if (child instanceof AbstractFile) {
436 AbstractFile childFile = (AbstractFile) child;
437 FileIngestTask childTask =
new FileIngestTask(pendingTask.getIngestJobPipeline(), childFile);
438 if (childFile.hasChildren()) {
439 this.pendingFileTaskQueue.add(childTask);
440 }
else if (shouldEnqueueFileTask(childTask)) {
441 this.fileIngestThreadsQueue.putLast(childTask);
445 }
catch (TskCoreException ex) {
447 logger.log(Level.SEVERE, String.format(
"Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex);
450 logger.log(Level.SEVERE,
"Error loading file with object ID {0}", pendingTask.getFileId());
465 private static boolean shouldEnqueueFileTask(
final FileIngestTask task) {
468 file = task.getFile();
469 }
catch (TskCoreException ex) {
470 logger.log(Level.SEVERE,
"Error loading file with ID {0}", task.getFileId());
476 String fileName = file.getName();
478 if (fileName.equals(
".") || fileName.equals(
"..")) {
494 if (!file.isDir() && !shouldBeCarved(task) && !fileAcceptedByFilter(task)) {
504 TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
506 FileSystem fs = f.getFileSystem();
508 fsType = fs.getFsType();
510 }
catch (TskCoreException ex) {
511 logger.log(Level.SEVERE,
"Error querying file system for " + f, ex);
515 if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
520 boolean isInRootDir =
false;
522 AbstractFile parent = f.getParentDirectory();
523 if (parent == null) {
526 isInRootDir = parent.isRoot();
528 }
catch (TskCoreException ex) {
529 logger.log(Level.WARNING,
"Error querying parent directory for" + f.getName(), ex);
535 if (isInRootDir && f.getMetaAddr() < 32) {
536 String name = f.getName();
537 if (name.length() > 0 && name.charAt(0) ==
'$' && name.contains(
":")) {
554 private static boolean shouldBeCarved(
final FileIngestTask task) {
556 AbstractFile file = task.getFile();
557 return task.getIngestJobPipeline().shouldProcessUnallocatedSpace() && file.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS);
558 }
catch (TskCoreException ex) {
571 private static boolean fileAcceptedByFilter(
final FileIngestTask task) {
573 AbstractFile file = task.getFile();
574 return !(task.getIngestJobPipeline().getFileIngestFilter().fileIsMemberOf(file) == null);
575 }
catch (TskCoreException ex) {
589 synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks,
long jobId) {
590 for (IngestTask task : tasks) {
591 if (task.getIngestJobPipeline().getId() == jobId) {
605 private static void removeTasksForJob(Collection<? extends IngestTask> tasks,
long jobId) {
606 Iterator<? extends IngestTask> iterator = tasks.iterator();
607 while (iterator.hasNext()) {
608 IngestTask task = iterator.next();
609 if (task.getIngestJobPipeline().getId() == jobId) {
623 private static int countTasksForJob(Collection<? extends IngestTask> queue,
long jobId) {
625 for (IngestTask task : queue) {
626 if (task.getIngestJobPipeline().getId() == jobId) {
641 synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(
long jobId) {
642 return new IngestJobTasksSnapshot(jobId, this.dataSourceIngestThreadQueue.countQueuedTasksForJob(jobId),
643 countTasksForJob(this.rootFileTaskQueue, jobId),
644 countTasksForJob(this.pendingFileTaskQueue, jobId),
645 this.fileIngestThreadsQueue.countQueuedTasksForJob(jobId),
646 this.dataSourceIngestThreadQueue.countRunningTasksForJob(jobId) + this.fileIngestThreadsQueue.countRunningTasksForJob(jobId),
647 countTasksForJob(this.streamedTasksQueue, jobId));
657 public int compare(FileIngestTask q1, FileIngestTask q2) {
660 AbstractFile file1 = null;
661 AbstractFile file2 = null;
663 file1 = q1.getFile();
664 }
catch (TskCoreException ex) {
669 file2 = q2.getFile();
670 }
catch (TskCoreException ex) {
676 return (
int) (q2.getFileId() - q1.getFileId());
680 }
else if (file2 == null) {
687 return (
int) (file2.getId() - file1.getId());
689 return p2.ordinal() - p1.ordinal();
704 LAST, LOW, MEDIUM, HIGH
707 static final List<Pattern> LAST_PRI_PATHS =
new ArrayList<>();
709 static final List<Pattern> LOW_PRI_PATHS =
new ArrayList<>();
711 static final List<Pattern> MEDIUM_PRI_PATHS =
new ArrayList<>();
713 static final List<Pattern> HIGH_PRI_PATHS =
new ArrayList<>();
729 LAST_PRI_PATHS.add(Pattern.compile(
"^pagefile", Pattern.CASE_INSENSITIVE));
730 LAST_PRI_PATHS.add(Pattern.compile(
"^hiberfil", Pattern.CASE_INSENSITIVE));
733 LOW_PRI_PATHS.add(Pattern.compile(
"^\\$OrphanFiles", Pattern.CASE_INSENSITIVE));
734 LOW_PRI_PATHS.add(Pattern.compile(
"^Windows", Pattern.CASE_INSENSITIVE));
736 MEDIUM_PRI_PATHS.add(Pattern.compile(
"^Program Files", Pattern.CASE_INSENSITIVE));
738 HIGH_PRI_PATHS.add(Pattern.compile(
"^Users", Pattern.CASE_INSENSITIVE));
739 HIGH_PRI_PATHS.add(Pattern.compile(
"^Documents and Settings", Pattern.CASE_INSENSITIVE));
740 HIGH_PRI_PATHS.add(Pattern.compile(
"^home", Pattern.CASE_INSENSITIVE));
741 HIGH_PRI_PATHS.add(Pattern.compile(
"^ProgramData", Pattern.CASE_INSENSITIVE));
752 if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) {
758 final String path = abstractFile.getName();
762 for (Pattern p : HIGH_PRI_PATHS) {
763 Matcher m = p.matcher(path);
768 for (Pattern p : MEDIUM_PRI_PATHS) {
769 Matcher m = p.matcher(path);
774 for (Pattern p : LOW_PRI_PATHS) {
775 Matcher m = p.matcher(path);
780 for (Pattern p : LAST_PRI_PATHS) {
781 Matcher m = p.matcher(path);
799 private final BlockingDeque<IngestTask>
taskQueue =
new LinkedBlockingDeque<>();
815 void putFirst(IngestTask task) throws InterruptedException {
816 synchronized (
this) {
820 this.taskQueue.putFirst(task);
821 }
catch (InterruptedException ex) {
822 synchronized (
this) {
839 void putLast(IngestTask task)
throws InterruptedException {
840 synchronized (
this) {
844 this.taskQueue.putLast(task);
845 }
catch (InterruptedException ex) {
846 synchronized (
this) {
865 IngestTask task = taskQueue.takeFirst();
866 synchronized (
this) {
879 synchronized (
this) {
890 void taskCompleted(IngestTask task) {
891 synchronized (
this) {
904 boolean hasTasksForJob(
long jobId) {
905 synchronized (
this) {
906 return IngestTasksScheduler.hasTasksForJob(this.
queuedTasks, jobId) || IngestTasksScheduler.hasTasksForJob(this.
tasksInProgress, jobId);
918 int countQueuedTasksForJob(
long jobId) {
919 synchronized (
this) {
920 return IngestTasksScheduler.countTasksForJob(this.
queuedTasks, jobId);
932 int countRunningTasksForJob(
long jobId) {
933 synchronized (
this) {
934 return IngestTasksScheduler.countTasksForJob(this.
tasksInProgress, jobId);
943 static final class IngestJobTasksSnapshot
implements Serializable {
945 private static final long serialVersionUID = 1L;
946 private final long jobId;
947 private final long dsQueueSize;
948 private final long rootQueueSize;
949 private final long dirQueueSize;
950 private final long fileQueueSize;
951 private final long runningListSize;
952 private final long streamingQueueSize;
959 IngestJobTasksSnapshot(
long jobId,
long dsQueueSize,
long rootQueueSize,
long dirQueueSize,
long fileQueueSize,
960 long runningListSize,
long streamingQueueSize) {
962 this.dsQueueSize = dsQueueSize;
963 this.rootQueueSize = rootQueueSize;
964 this.dirQueueSize = dirQueueSize;
965 this.fileQueueSize = fileQueueSize;
966 this.runningListSize = runningListSize;
967 this.streamingQueueSize = streamingQueueSize;
986 long getRootQueueSize() {
987 return rootQueueSize;
996 long getDirectoryTasksQueueSize() {
1000 long getFileQueueSize() {
1001 return fileQueueSize;
1004 long getStreamingQueueSize() {
1005 return streamingQueueSize;
1008 long getDsQueueSize() {
1012 long getRunningListSize() {
1013 return runningListSize;
final List< IngestTask > queuedTasks
final BlockingDeque< IngestTask > taskQueue
final List< IngestTask > tasksInProgress
int compare(FileIngestTask q1, FileIngestTask q2)