19 package org.sleuthkit.autopsy.ingest;
21 import java.io.Serializable;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.Comparator;
26 import java.util.Deque;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.TreeSet;
31 import java.util.concurrent.BlockingDeque;
32 import java.util.concurrent.LinkedBlockingDeque;
33 import java.util.logging.Level;
34 import java.util.regex.Matcher;
35 import java.util.regex.Pattern;
36 import javax.annotation.concurrent.GuardedBy;
37 import javax.annotation.concurrent.ThreadSafe;
50 final class IngestTasksScheduler {
52 private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
53 private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
54 @GuardedBy(
"IngestTasksScheduler.this")
55 private static IngestTasksScheduler instance;
56 private final IngestTaskTrackingQueue dataSourceIngestThreadQueue;
58 private final TreeSet<FileIngestTask> rootFileTaskQueue;
60 private final Deque<FileIngestTask> pendingFileTaskQueue;
61 private final IngestTaskTrackingQueue fileIngestThreadsQueue;
68 synchronized static IngestTasksScheduler getInstance() {
69 if (IngestTasksScheduler.instance == null) {
70 IngestTasksScheduler.instance =
new IngestTasksScheduler();
72 return IngestTasksScheduler.instance;
80 private IngestTasksScheduler() {
81 this.dataSourceIngestThreadQueue =
new IngestTaskTrackingQueue();
82 this.rootFileTaskQueue =
new TreeSet<>(
new RootDirectoryTaskComparator());
83 this.pendingFileTaskQueue =
new LinkedList<>();
84 this.fileIngestThreadsQueue =
new IngestTaskTrackingQueue();
93 BlockingIngestTaskQueue getDataSourceIngestTaskQueue() {
94 return this.dataSourceIngestThreadQueue;
103 BlockingIngestTaskQueue getFileIngestTaskQueue() {
104 return this.fileIngestThreadsQueue;
113 synchronized void scheduleIngestTasks(DataSourceIngestJob job) {
114 if (!job.isCancelled()) {
123 this.scheduleDataSourceIngestTask(job);
124 this.scheduleFileIngestTasks(job, Collections.emptyList());
133 synchronized void scheduleDataSourceIngestTask(DataSourceIngestJob job) {
134 if (!job.isCancelled()) {
135 DataSourceIngestTask task =
new DataSourceIngestTask(job);
137 this.dataSourceIngestThreadQueue.putLast(task);
138 }
catch (InterruptedException ex) {
139 IngestTasksScheduler.logger.log(Level.INFO, String.format(
"Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (jobId={%d)", job.getId()), ex);
140 Thread.currentThread().interrupt();
153 synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) {
154 if (!job.isCancelled()) {
155 Collection<AbstractFile> candidateFiles;
156 if (files.isEmpty()) {
157 candidateFiles = getTopLevelFiles(job.getDataSource());
159 candidateFiles = files;
161 for (AbstractFile file : candidateFiles) {
162 FileIngestTask task =
new FileIngestTask(job, file);
163 if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
164 this.rootFileTaskQueue.add(task);
167 shuffleFileTaskQueues();
179 synchronized void fastTrackFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) {
180 if (!job.isCancelled()) {
189 for (AbstractFile file : files) {
190 FileIngestTask fileTask =
new FileIngestTask(job, file);
191 if (shouldEnqueueFileTask(fileTask)) {
193 this.fileIngestThreadsQueue.putFirst(fileTask);
194 }
catch (InterruptedException ex) {
195 IngestTasksScheduler.logger.log(Level.INFO, String.format(
"Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", job.getId()), ex);
196 Thread.currentThread().interrupt();
210 synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
211 this.dataSourceIngestThreadQueue.taskCompleted(task);
220 synchronized void notifyTaskCompleted(FileIngestTask task) {
221 this.fileIngestThreadsQueue.taskCompleted(task);
222 shuffleFileTaskQueues();
233 synchronized boolean tasksForJobAreCompleted(DataSourceIngestJob job) {
234 long jobId = job.getId();
235 return !(this.dataSourceIngestThreadQueue.hasTasksForJob(jobId)
236 || hasTasksForJob(this.rootFileTaskQueue, jobId)
237 || hasTasksForJob(this.pendingFileTaskQueue, jobId)
238 || this.fileIngestThreadsQueue.hasTasksForJob(jobId));
248 synchronized void cancelPendingTasksForIngestJob(DataSourceIngestJob job) {
249 long jobId = job.getId();
250 IngestTasksScheduler.removeTasksForJob(this.rootFileTaskQueue, jobId);
251 IngestTasksScheduler.removeTasksForJob(this.pendingFileTaskQueue, jobId);
263 private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
264 List<AbstractFile> topLevelFiles =
new ArrayList<>();
265 Collection<AbstractFile> rootObjects = dataSource.accept(
new GetRootDirectoryVisitor());
266 if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
268 topLevelFiles.add((AbstractFile) dataSource);
270 for (AbstractFile root : rootObjects) {
271 List<Content> children;
273 children = root.getChildren();
274 if (children.isEmpty()) {
277 topLevelFiles.add(root);
281 for (Content child : children) {
282 if (child instanceof AbstractFile) {
283 topLevelFiles.add((AbstractFile) child);
287 }
catch (TskCoreException ex) {
288 logger.log(Level.WARNING,
"Could not get children of root to enqueue: " + root.getId() +
": " + root.getName(), ex);
292 return topLevelFiles;
325 synchronized private void shuffleFileTaskQueues() {
326 while (this.fileIngestThreadsQueue.isEmpty()) {
331 if (this.pendingFileTaskQueue.isEmpty()) {
332 final FileIngestTask rootTask = this.rootFileTaskQueue.pollFirst();
333 if (rootTask != null) {
334 this.pendingFileTaskQueue.addLast(rootTask);
343 final FileIngestTask pendingTask = this.pendingFileTaskQueue.pollFirst();
344 if (pendingTask == null) {
347 if (shouldEnqueueFileTask(pendingTask)) {
353 this.fileIngestThreadsQueue.putLast(pendingTask);
354 }
catch (InterruptedException ex) {
355 IngestTasksScheduler.logger.log(Level.INFO,
"Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
356 Thread.currentThread().interrupt();
368 final AbstractFile file = pendingTask.getFile();
370 for (Content child : file.getChildren()) {
371 if (child instanceof AbstractFile) {
372 AbstractFile childFile = (AbstractFile) child;
373 FileIngestTask childTask =
new FileIngestTask(pendingTask.getIngestJob(), childFile);
374 if (childFile.hasChildren()) {
375 this.pendingFileTaskQueue.add(childTask);
376 }
else if (shouldEnqueueFileTask(childTask)) {
378 this.fileIngestThreadsQueue.putLast(childTask);
379 }
catch (InterruptedException ex) {
380 IngestTasksScheduler.logger.log(Level.INFO,
"Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
381 Thread.currentThread().interrupt();
387 }
catch (TskCoreException ex) {
388 logger.log(Level.SEVERE, String.format(
"Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex);
402 private static boolean shouldEnqueueFileTask(
final FileIngestTask task) {
403 final AbstractFile file = task.getFile();
407 String fileName = file.getName();
409 if (fileName.equals(
".") || fileName.equals(
"..")) {
425 if (!file.isDir() && !shouldBeCarved(task) && !fileAcceptedByFilter(task)) {
435 TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
437 FileSystem fs = f.getFileSystem();
439 fsType = fs.getFsType();
441 }
catch (TskCoreException ex) {
442 logger.log(Level.SEVERE,
"Error querying file system for " + f, ex);
446 if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
451 boolean isInRootDir =
false;
453 AbstractFile parent = f.getParentDirectory();
454 if (parent == null) {
457 isInRootDir = parent.isRoot();
459 }
catch (TskCoreException ex) {
460 logger.log(Level.WARNING,
"Error querying parent directory for" + f.getName(), ex);
466 if (isInRootDir && f.getMetaAddr() < 32) {
467 String name = f.getName();
468 if (name.length() > 0 && name.charAt(0) ==
'$' && name.contains(
":")) {
485 private static boolean shouldBeCarved(
final FileIngestTask task) {
486 return task.getIngestJob().shouldProcessUnallocatedSpace() && task.getFile().getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS);
497 private static boolean fileAcceptedByFilter(
final FileIngestTask task) {
498 return !(task.getIngestJob().getFileIngestFilter().fileIsMemberOf(task.getFile()) == null);
510 synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks,
long jobId) {
511 for (IngestTask task : tasks) {
512 if (task.getIngestJob().getId() == jobId) {
526 private static void removeTasksForJob(Collection<? extends IngestTask> tasks,
long jobId) {
527 Iterator<? extends IngestTask> iterator = tasks.iterator();
528 while (iterator.hasNext()) {
529 IngestTask task = iterator.next();
530 if (task.getIngestJob().getId() == jobId) {
544 private static int countTasksForJob(Collection<? extends IngestTask> queue,
long jobId) {
546 for (IngestTask task : queue) {
547 if (task.getIngestJob().getId() == jobId) {
562 synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(
long jobId) {
563 return new IngestJobTasksSnapshot(jobId, this.dataSourceIngestThreadQueue.countQueuedTasksForJob(jobId),
564 countTasksForJob(this.rootFileTaskQueue, jobId),
565 countTasksForJob(this.pendingFileTaskQueue, jobId),
566 this.fileIngestThreadsQueue.countQueuedTasksForJob(jobId),
567 this.dataSourceIngestThreadQueue.countRunningTasksForJob(jobId) + this.fileIngestThreadsQueue.countRunningTasksForJob(jobId));
577 public int compare(FileIngestTask q1, FileIngestTask q2) {
581 return (
int) (q2.getFile().getId() - q1.getFile().getId());
583 return p2.ordinal() - p1.ordinal();
598 LAST, LOW, MEDIUM, HIGH
601 static final List<Pattern> LAST_PRI_PATHS =
new ArrayList<>();
603 static final List<Pattern> LOW_PRI_PATHS =
new ArrayList<>();
605 static final List<Pattern> MEDIUM_PRI_PATHS =
new ArrayList<>();
607 static final List<Pattern> HIGH_PRI_PATHS =
new ArrayList<>();
623 LAST_PRI_PATHS.add(Pattern.compile(
"^pagefile", Pattern.CASE_INSENSITIVE));
624 LAST_PRI_PATHS.add(Pattern.compile(
"^hiberfil", Pattern.CASE_INSENSITIVE));
627 LOW_PRI_PATHS.add(Pattern.compile(
"^\\$OrphanFiles", Pattern.CASE_INSENSITIVE));
628 LOW_PRI_PATHS.add(Pattern.compile(
"^Windows", Pattern.CASE_INSENSITIVE));
630 MEDIUM_PRI_PATHS.add(Pattern.compile(
"^Program Files", Pattern.CASE_INSENSITIVE));
632 HIGH_PRI_PATHS.add(Pattern.compile(
"^Users", Pattern.CASE_INSENSITIVE));
633 HIGH_PRI_PATHS.add(Pattern.compile(
"^Documents and Settings", Pattern.CASE_INSENSITIVE));
634 HIGH_PRI_PATHS.add(Pattern.compile(
"^home", Pattern.CASE_INSENSITIVE));
635 HIGH_PRI_PATHS.add(Pattern.compile(
"^ProgramData", Pattern.CASE_INSENSITIVE));
646 if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) {
652 final String path = abstractFile.getName();
656 for (Pattern p : HIGH_PRI_PATHS) {
657 Matcher m = p.matcher(path);
662 for (Pattern p : MEDIUM_PRI_PATHS) {
663 Matcher m = p.matcher(path);
668 for (Pattern p : LOW_PRI_PATHS) {
669 Matcher m = p.matcher(path);
674 for (Pattern p : LAST_PRI_PATHS) {
675 Matcher m = p.matcher(path);
693 private final BlockingDeque<IngestTask>
taskQueue =
new LinkedBlockingDeque<>();
709 void putFirst(IngestTask task) throws InterruptedException {
710 synchronized (
this) {
714 this.taskQueue.putFirst(task);
715 }
catch (InterruptedException ex) {
716 synchronized (
this) {
733 void putLast(IngestTask task)
throws InterruptedException {
734 synchronized (
this) {
738 this.taskQueue.putLast(task);
739 }
catch (InterruptedException ex) {
740 synchronized (
this) {
759 IngestTask task = taskQueue.takeFirst();
760 synchronized (
this) {
773 synchronized (
this) {
784 void taskCompleted(IngestTask task) {
785 synchronized (
this) {
798 boolean hasTasksForJob(
long jobId) {
799 synchronized (
this) {
800 return IngestTasksScheduler.hasTasksForJob(this.
queuedTasks, jobId) || IngestTasksScheduler.hasTasksForJob(this.
tasksInProgress, jobId);
812 int countQueuedTasksForJob(
long jobId) {
813 synchronized (
this) {
814 return IngestTasksScheduler.countTasksForJob(this.
queuedTasks, jobId);
826 int countRunningTasksForJob(
long jobId) {
827 synchronized (
this) {
828 return IngestTasksScheduler.countTasksForJob(this.
tasksInProgress, jobId);
837 static final class IngestJobTasksSnapshot
implements Serializable {
839 private static final long serialVersionUID = 1L;
840 private final long jobId;
841 private final long dsQueueSize;
842 private final long rootQueueSize;
843 private final long dirQueueSize;
844 private final long fileQueueSize;
845 private final long runningListSize;
852 IngestJobTasksSnapshot(
long jobId,
long dsQueueSize,
long rootQueueSize,
long dirQueueSize,
long fileQueueSize,
long runningListSize) {
854 this.dsQueueSize = dsQueueSize;
855 this.rootQueueSize = rootQueueSize;
856 this.dirQueueSize = dirQueueSize;
857 this.fileQueueSize = fileQueueSize;
858 this.runningListSize = runningListSize;
877 long getRootQueueSize() {
878 return rootQueueSize;
887 long getDirectoryTasksQueueSize() {
891 long getFileQueueSize() {
892 return fileQueueSize;
895 long getDsQueueSize() {
899 long getRunningListSize() {
900 return runningListSize;
final List< IngestTask > queuedTasks
final BlockingDeque< IngestTask > taskQueue
final List< IngestTask > tasksInProgress
int compare(FileIngestTask q1, FileIngestTask q2)