19 package org.sleuthkit.autopsy.ingest;
 
   21 import java.io.Serializable;
 
   22 import java.util.ArrayList;
 
   23 import java.util.Collection;
 
   24 import java.util.Collections;
 
   25 import java.util.Comparator;
 
   26 import java.util.Deque;
 
   27 import java.util.Iterator;
 
   28 import java.util.LinkedList;
 
   29 import java.util.List;
 
   30 import java.util.Queue;
 
   31 import java.util.TreeSet;
 
   32 import java.util.concurrent.BlockingDeque;
 
   33 import java.util.concurrent.LinkedBlockingDeque;
 
   34 import java.util.logging.Level;
 
   35 import java.util.regex.Matcher;
 
   36 import java.util.regex.Pattern;
 
   37 import javax.annotation.concurrent.GuardedBy;
 
   38 import javax.annotation.concurrent.ThreadSafe;
 
   51 final class IngestTasksScheduler {
 
   53     private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
 
   54     private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
 
   55     @GuardedBy(
"IngestTasksScheduler.this")
 
   56     private static IngestTasksScheduler instance;
 
   57     private final IngestTaskTrackingQueue dataSourceIngestThreadQueue;
 
   59     private final TreeSet<FileIngestTask> rootFileTaskQueue;
 
   61     private final Deque<FileIngestTask> pendingFileTaskQueue;
 
   63     private final Queue<FileIngestTask> streamedTasksQueue;
 
   64     private final IngestTaskTrackingQueue fileIngestThreadsQueue;
 
   71     synchronized static IngestTasksScheduler getInstance() {
 
   72         if (IngestTasksScheduler.instance == null) {
 
   73             IngestTasksScheduler.instance = 
new IngestTasksScheduler();
 
   75         return IngestTasksScheduler.instance;
 
   83     private IngestTasksScheduler() {
 
   84         this.dataSourceIngestThreadQueue = 
new IngestTaskTrackingQueue();
 
   85         this.rootFileTaskQueue = 
new TreeSet<>(
new RootDirectoryTaskComparator());
 
   86         this.pendingFileTaskQueue = 
new LinkedList<>();
 
   87         this.fileIngestThreadsQueue = 
new IngestTaskTrackingQueue();
 
   88         this.streamedTasksQueue = 
new LinkedList<>();
 
   97     BlockingIngestTaskQueue getDataSourceIngestTaskQueue() {
 
   98         return this.dataSourceIngestThreadQueue;
 
  107     BlockingIngestTaskQueue getFileIngestTaskQueue() {
 
  108         return this.fileIngestThreadsQueue;
 
  117     synchronized void scheduleIngestTasks(IngestJobPipeline ingestJobPipeline) {
 
  118         if (!ingestJobPipeline.isCancelled()) {
 
  127             this.scheduleDataSourceIngestTask(ingestJobPipeline);
 
  128             this.scheduleFileIngestTasks(ingestJobPipeline, Collections.emptyList());
 
  137     synchronized void scheduleDataSourceIngestTask(IngestJobPipeline ingestJobPipeline) {
 
  138         if (!ingestJobPipeline.isCancelled()) {
 
  139             DataSourceIngestTask task = 
new DataSourceIngestTask(ingestJobPipeline);
 
  141                 this.dataSourceIngestThreadQueue.putLast(task);
 
  142             } 
catch (InterruptedException ex) {
 
  143                 IngestTasksScheduler.logger.log(Level.INFO, String.format(
"Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (jobId={%d)", ingestJobPipeline.getId()), ex);
 
  144                 Thread.currentThread().interrupt();
 
  157     synchronized void scheduleFileIngestTasks(IngestJobPipeline ingestJobPipeline, Collection<AbstractFile> files) {
 
  158         if (!ingestJobPipeline.isCancelled()) {
 
  159             Collection<AbstractFile> candidateFiles;
 
  160             if (files.isEmpty()) {
 
  161                 candidateFiles = getTopLevelFiles(ingestJobPipeline.getDataSource());
 
  163                 candidateFiles = files;
 
  165             for (AbstractFile file : candidateFiles) {
 
  166                 FileIngestTask task = 
new FileIngestTask(ingestJobPipeline, file);
 
  167                 if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
 
  168                     this.rootFileTaskQueue.add(task);
 
  171             refillIngestThreadQueue();
 
  182     synchronized void scheduleStreamedFileIngestTasks(IngestJobPipeline ingestJobPipeline, List<Long> fileIds) {
 
  183         if (!ingestJobPipeline.isCancelled()) {
 
  184             for (
long id : fileIds) {
 
  187                 FileIngestTask task = 
new FileIngestTask(ingestJobPipeline, 
id);
 
  188                 this.streamedTasksQueue.add(task);
 
  190             refillIngestThreadQueue();
 
  202     synchronized void fastTrackFileIngestTasks(IngestJobPipeline ingestJobPipeline, Collection<AbstractFile> files) {
 
  203         if (!ingestJobPipeline.isCancelled()) {
 
  212             for (AbstractFile file : files) {
 
  213                 FileIngestTask fileTask = 
new FileIngestTask(ingestJobPipeline, file);
 
  214                 if (shouldEnqueueFileTask(fileTask)) {
 
  216                         this.fileIngestThreadsQueue.putFirst(fileTask);
 
  217                     } 
catch (InterruptedException ex) {
 
  218                         IngestTasksScheduler.logger.log(Level.INFO, String.format(
"Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", ingestJobPipeline.getId()), ex);
 
  219                         Thread.currentThread().interrupt();
 
  233     synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
 
  234         this.dataSourceIngestThreadQueue.taskCompleted(task);
 
  243     synchronized void notifyTaskCompleted(FileIngestTask task) {
 
  244         this.fileIngestThreadsQueue.taskCompleted(task);
 
  245         refillIngestThreadQueue();
 
  256     synchronized boolean currentTasksAreCompleted(IngestJobPipeline ingestJobPipeline) {
 
  257         long jobId = ingestJobPipeline.getId();
 
  259         return !(this.dataSourceIngestThreadQueue.hasTasksForJob(jobId)
 
  260                 || hasTasksForJob(this.rootFileTaskQueue, jobId)
 
  261                 || hasTasksForJob(this.pendingFileTaskQueue, jobId)
 
  262                 || hasTasksForJob(this.streamedTasksQueue, jobId)
 
  263                 || this.fileIngestThreadsQueue.hasTasksForJob(jobId));
 
  273     synchronized void cancelPendingTasksForIngestJob(IngestJobPipeline ingestJobPipeline) {
 
  274         long jobId = ingestJobPipeline.getId();
 
  275         IngestTasksScheduler.removeTasksForJob(rootFileTaskQueue, jobId);
 
  276         IngestTasksScheduler.removeTasksForJob(pendingFileTaskQueue, jobId);
 
  277         IngestTasksScheduler.removeTasksForJob(streamedTasksQueue, jobId);
 
  289     private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
 
  290         List<AbstractFile> topLevelFiles = 
new ArrayList<>();
 
  291         Collection<AbstractFile> rootObjects = dataSource.accept(
new GetRootDirectoryVisitor());
 
  292         if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
 
  294             topLevelFiles.add((AbstractFile) dataSource);
 
  296             for (AbstractFile root : rootObjects) {
 
  297                 List<Content> children;
 
  299                     children = root.getChildren();
 
  300                     if (children.isEmpty()) {
 
  303                         topLevelFiles.add(root);
 
  307                         for (Content child : children) {
 
  308                             if (child instanceof AbstractFile) {
 
  309                                 topLevelFiles.add((AbstractFile) child);
 
  313                 } 
catch (TskCoreException ex) {
 
  314                     logger.log(Level.WARNING, 
"Could not get children of root to enqueue: " + root.getId() + 
": " + root.getName(), ex); 
 
  318         return topLevelFiles;
 
  325     synchronized private void refillIngestThreadQueue() {
 
  327             takeFromStreamingTaskQueue();
 
  328             takeFromBatchTasksQueues();
 
  329         } 
catch (InterruptedException ex) {
 
  330             IngestTasksScheduler.logger.log(Level.INFO, 
"Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
 
  331             Thread.currentThread().interrupt();
 
  339     synchronized private void takeFromStreamingTaskQueue() throws InterruptedException {
 
  343         while (fileIngestThreadsQueue.isEmpty()) {
 
  349             while (taskCount < IngestManager.getInstance().getNumberOfFileIngestThreads()) {
 
  350                 final FileIngestTask streamingTask = streamedTasksQueue.poll();
 
  351                 if (streamingTask == null) {
 
  355                 if (shouldEnqueueFileTask(streamingTask)) {
 
  356                     fileIngestThreadsQueue.putLast(streamingTask);
 
  393     synchronized private void takeFromBatchTasksQueues() throws InterruptedException {
 
  395         while (this.fileIngestThreadsQueue.isEmpty()) {     
 
  400             if (this.pendingFileTaskQueue.isEmpty()) {
 
  401                 final FileIngestTask rootTask = this.rootFileTaskQueue.pollFirst();
 
  402                 if (rootTask != null) {
 
  403                     this.pendingFileTaskQueue.addLast(rootTask);
 
  412             final FileIngestTask pendingTask = this.pendingFileTaskQueue.pollFirst();
 
  413             if (pendingTask == null) {
 
  416             if (shouldEnqueueFileTask(pendingTask)) {
 
  421                 this.fileIngestThreadsQueue.putLast(pendingTask);
 
  431             AbstractFile file = null;
 
  433                 file = pendingTask.getFile();
 
  434                 for (Content child : file.getChildren()) {
 
  435                     if (child instanceof AbstractFile) {
 
  436                         AbstractFile childFile = (AbstractFile) child;
 
  437                         FileIngestTask childTask = 
new FileIngestTask(pendingTask.getIngestJobPipeline(), childFile);
 
  438                         if (childFile.hasChildren()) {
 
  439                             this.pendingFileTaskQueue.add(childTask);
 
  440                         } 
else if (shouldEnqueueFileTask(childTask)) {
 
  441                             this.fileIngestThreadsQueue.putLast(childTask);
 
  445             } 
catch (TskCoreException ex) {
 
  447                     logger.log(Level.SEVERE, String.format(
"Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex);  
 
  450                     logger.log(Level.SEVERE, 
"Error loading file with object ID {0}", pendingTask.getFileId());
 
  465     private static boolean shouldEnqueueFileTask(
final FileIngestTask task) {
 
  468             file = task.getFile();
 
  469         } 
catch (TskCoreException ex) {
 
  470             logger.log(Level.SEVERE, 
"Error loading file with ID {0}", task.getFileId());
 
  476         String fileName = file.getName();
 
  478         if (fileName.equals(
".") || fileName.equals(
"..")) {
 
  494         if (!file.isDir() && !shouldBeCarved(task) && !fileAcceptedByFilter(task)) {
 
  504             TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
 
  506                 FileSystem fs = f.getFileSystem();
 
  508                     fsType = fs.getFsType();
 
  510             } 
catch (TskCoreException ex) {
 
  511                 logger.log(Level.SEVERE, 
"Error querying file system for " + f, ex); 
 
  515             if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
 
  520             boolean isInRootDir = 
false;
 
  522                 AbstractFile parent = f.getParentDirectory();
 
  523                 if (parent == null) {
 
  526                     isInRootDir = parent.isRoot();
 
  528             } 
catch (TskCoreException ex) {
 
  529                 logger.log(Level.WARNING, 
"Error querying parent directory for" + f.getName(), ex); 
 
  535             if (isInRootDir && f.getMetaAddr() < 32) {
 
  536                 String name = f.getName();
 
  537                 if (name.length() > 0 && name.charAt(0) == 
'$' && name.contains(
":")) {
 
  554     private static boolean shouldBeCarved(
final FileIngestTask task) {
 
  556             AbstractFile file = task.getFile();
 
  557             return task.getIngestJobPipeline().shouldProcessUnallocatedSpace() && file.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS);
 
  558         } 
catch (TskCoreException ex) {
 
  571     private static boolean fileAcceptedByFilter(
final FileIngestTask task) {
 
  573             AbstractFile file = task.getFile();
 
  574             return !(task.getIngestJobPipeline().getFileIngestFilter().fileIsMemberOf(file) == null);
 
  575         } 
catch (TskCoreException ex) {
 
  589     synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks, 
long jobId) {
 
  590         for (IngestTask task : tasks) {
 
  591             if (task.getIngestJobPipeline().getId() == jobId) {
 
  605     private static void removeTasksForJob(Collection<? extends IngestTask> tasks, 
long jobId) {
 
  606         Iterator<? extends IngestTask> iterator = tasks.iterator();
 
  607         while (iterator.hasNext()) {
 
  608             IngestTask task = iterator.next();
 
  609             if (task.getIngestJobPipeline().getId() == jobId) {
 
  623     private static int countTasksForJob(Collection<? extends IngestTask> queue, 
long jobId) {
 
  625         for (IngestTask task : queue) {
 
  626             if (task.getIngestJobPipeline().getId() == jobId) {
 
  641     synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(
long jobId) {
 
  642         return new IngestJobTasksSnapshot(jobId, this.dataSourceIngestThreadQueue.countQueuedTasksForJob(jobId),
 
  643                 countTasksForJob(this.rootFileTaskQueue, jobId),
 
  644                 countTasksForJob(this.pendingFileTaskQueue, jobId),
 
  645                 this.fileIngestThreadsQueue.countQueuedTasksForJob(jobId),
 
  646                 this.dataSourceIngestThreadQueue.countRunningTasksForJob(jobId) + this.fileIngestThreadsQueue.countRunningTasksForJob(jobId),
 
  647                 countTasksForJob(this.streamedTasksQueue, jobId));
 
  657         public int compare(FileIngestTask q1, FileIngestTask q2) {
 
  660             AbstractFile file1 = null;
 
  661             AbstractFile file2 = null;
 
  663                 file1 = q1.getFile();
 
  664             } 
catch (TskCoreException ex) {
 
  669                 file2 = q2.getFile();
 
  670             } 
catch (TskCoreException ex) {
 
  676                     return (
int) (q2.getFileId() - q1.getFileId());
 
  680             } 
else  if (file2 == null) {
 
  687                 return (
int) (file2.getId() - file1.getId());
 
  689                 return p2.ordinal() - p1.ordinal();
 
  704                 LAST, LOW, MEDIUM, HIGH
 
  707             static final List<Pattern> LAST_PRI_PATHS = 
new ArrayList<>();
 
  709             static final List<Pattern> LOW_PRI_PATHS = 
new ArrayList<>();
 
  711             static final List<Pattern> MEDIUM_PRI_PATHS = 
new ArrayList<>();
 
  713             static final List<Pattern> HIGH_PRI_PATHS = 
new ArrayList<>();
 
  729                 LAST_PRI_PATHS.add(Pattern.compile(
"^pagefile", Pattern.CASE_INSENSITIVE));
 
  730                 LAST_PRI_PATHS.add(Pattern.compile(
"^hiberfil", Pattern.CASE_INSENSITIVE));
 
  733                 LOW_PRI_PATHS.add(Pattern.compile(
"^\\$OrphanFiles", Pattern.CASE_INSENSITIVE));
 
  734                 LOW_PRI_PATHS.add(Pattern.compile(
"^Windows", Pattern.CASE_INSENSITIVE));
 
  736                 MEDIUM_PRI_PATHS.add(Pattern.compile(
"^Program Files", Pattern.CASE_INSENSITIVE));
 
  738                 HIGH_PRI_PATHS.add(Pattern.compile(
"^Users", Pattern.CASE_INSENSITIVE));
 
  739                 HIGH_PRI_PATHS.add(Pattern.compile(
"^Documents and Settings", Pattern.CASE_INSENSITIVE));
 
  740                 HIGH_PRI_PATHS.add(Pattern.compile(
"^home", Pattern.CASE_INSENSITIVE));
 
  741                 HIGH_PRI_PATHS.add(Pattern.compile(
"^ProgramData", Pattern.CASE_INSENSITIVE));
 
  752                 if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) {
 
  758                 final String path = abstractFile.getName();
 
  762                 for (Pattern p : HIGH_PRI_PATHS) {
 
  763                     Matcher m = p.matcher(path);
 
  768                 for (Pattern p : MEDIUM_PRI_PATHS) {
 
  769                     Matcher m = p.matcher(path);
 
  774                 for (Pattern p : LOW_PRI_PATHS) {
 
  775                     Matcher m = p.matcher(path);
 
  780                 for (Pattern p : LAST_PRI_PATHS) {
 
  781                     Matcher m = p.matcher(path);
 
  799         private final BlockingDeque<IngestTask> 
taskQueue = 
new LinkedBlockingDeque<>();
 
  815         void putFirst(IngestTask task) throws InterruptedException {
 
  816             synchronized (
this) {
 
  820                 this.taskQueue.putFirst(task);
 
  821             } 
catch (InterruptedException ex) {
 
  822                 synchronized (
this) {
 
  839         void putLast(IngestTask task) 
throws InterruptedException {
 
  840             synchronized (
this) {
 
  844                 this.taskQueue.putLast(task);
 
  845             } 
catch (InterruptedException ex) {
 
  846                 synchronized (
this) {
 
  865             IngestTask task = taskQueue.takeFirst();
 
  866             synchronized (
this) {
 
  879             synchronized (
this) {
 
  890         void taskCompleted(IngestTask task) {
 
  891             synchronized (
this) {
 
  904         boolean hasTasksForJob(
long jobId) {
 
  905             synchronized (
this) {
 
  906                 return IngestTasksScheduler.hasTasksForJob(this.
queuedTasks, jobId) || IngestTasksScheduler.hasTasksForJob(this.
tasksInProgress, jobId);
 
  918         int countQueuedTasksForJob(
long jobId) {
 
  919             synchronized (
this) {
 
  920                 return IngestTasksScheduler.countTasksForJob(this.
queuedTasks, jobId);
 
  932         int countRunningTasksForJob(
long jobId) {
 
  933             synchronized (
this) {
 
  934                 return IngestTasksScheduler.countTasksForJob(this.
tasksInProgress, jobId);
 
  943     static final class IngestJobTasksSnapshot 
implements Serializable {
 
  945         private static final long serialVersionUID = 1L;
 
  946         private final long jobId;
 
  947         private final long dsQueueSize;
 
  948         private final long rootQueueSize;
 
  949         private final long dirQueueSize;
 
  950         private final long fileQueueSize;
 
  951         private final long runningListSize;
 
  952         private final long streamingQueueSize;
 
  959         IngestJobTasksSnapshot(
long jobId, 
long dsQueueSize, 
long rootQueueSize, 
long dirQueueSize, 
long fileQueueSize, 
 
  960                 long runningListSize, 
long streamingQueueSize) {
 
  962             this.dsQueueSize = dsQueueSize;
 
  963             this.rootQueueSize = rootQueueSize;
 
  964             this.dirQueueSize = dirQueueSize;
 
  965             this.fileQueueSize = fileQueueSize;
 
  966             this.runningListSize = runningListSize;
 
  967             this.streamingQueueSize = streamingQueueSize;
 
  986         long getRootQueueSize() {
 
  987             return rootQueueSize;
 
  996         long getDirectoryTasksQueueSize() {
 
 1000         long getFileQueueSize() {
 
 1001             return fileQueueSize;
 
 1004         long getStreamingQueueSize() {
 
 1005             return streamingQueueSize;
 
 1008         long getDsQueueSize() {
 
 1012         long getRunningListSize() {
 
 1013             return runningListSize;
 
final List< IngestTask > queuedTasks
 
final BlockingDeque< IngestTask > taskQueue
 
final List< IngestTask > tasksInProgress
 
int compare(FileIngestTask q1, FileIngestTask q2)