19 package org.sleuthkit.autopsy.ingest;
 
   21 import java.io.Serializable;
 
   22 import java.util.ArrayList;
 
   23 import java.util.Collection;
 
   24 import java.util.Collections;
 
   25 import java.util.Comparator;
 
   26 import java.util.Deque;
 
   27 import java.util.Iterator;
 
   28 import java.util.LinkedList;
 
   29 import java.util.List;
 
   30 import java.util.Queue;
 
   31 import java.util.TreeSet;
 
   32 import java.util.concurrent.BlockingDeque;
 
   33 import java.util.concurrent.LinkedBlockingDeque;
 
   34 import java.util.logging.Level;
 
   35 import java.util.regex.Matcher;
 
   36 import java.util.regex.Pattern;
 
   37 import javax.annotation.concurrent.GuardedBy;
 
   38 import javax.annotation.concurrent.ThreadSafe;
 
   55 final class IngestTasksScheduler {
 
   57     private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
 
   58     private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
 
   59     @GuardedBy(
"IngestTasksScheduler.this")
 
   60     private static IngestTasksScheduler instance;
 
   61     private final IngestTaskTrackingQueue dataSourceIngestTasksQueue;
 
   63     private final TreeSet<FileIngestTask> topLevelFileIngestTasksQueue;
 
   65     private final Deque<FileIngestTask> batchedFileIngestTasksQueue;
 
   67     private final Queue<FileIngestTask> streamedFileIngestTasksQueue;
 
   68     private final IngestTaskTrackingQueue fileIngestTasksQueue;
 
   69     private final IngestTaskTrackingQueue artifactIngestTasksQueue;
 
   76     synchronized static IngestTasksScheduler getInstance() {
 
   77         if (IngestTasksScheduler.instance == null) {
 
   78             IngestTasksScheduler.instance = 
new IngestTasksScheduler();
 
   80         return IngestTasksScheduler.instance;
 
   88     private IngestTasksScheduler() {
 
   89         dataSourceIngestTasksQueue = 
new IngestTaskTrackingQueue();
 
   90         topLevelFileIngestTasksQueue = 
new TreeSet<>(
new RootDirectoryTaskComparator());
 
   91         batchedFileIngestTasksQueue = 
new LinkedList<>();
 
   92         fileIngestTasksQueue = 
new IngestTaskTrackingQueue();
 
   93         streamedFileIngestTasksQueue = 
new LinkedList<>();
 
   94         artifactIngestTasksQueue = 
new IngestTaskTrackingQueue();
 
  103     BlockingIngestTaskQueue getDataSourceIngestTaskQueue() {
 
  104         return dataSourceIngestTasksQueue;
 
  113     BlockingIngestTaskQueue getFileIngestTaskQueue() {
 
  114         return fileIngestTasksQueue;
 
  123     BlockingIngestTaskQueue getResultIngestTaskQueue() {
 
  124         return artifactIngestTasksQueue;
 
  141     synchronized void scheduleIngestTasks(IngestJobPipeline ingestPipeline) {
 
  142         if (!ingestPipeline.isCancelled()) {
 
  143             if (ingestPipeline.hasDataSourceIngestModules()) {
 
  144                 scheduleDataSourceIngestTask(ingestPipeline);
 
  146             if (ingestPipeline.hasFileIngestModules()) {
 
  147                 scheduleFileIngestTasks(ingestPipeline, Collections.emptyList());
 
  149             if (ingestPipeline.hasDataArtifactIngestModules()) {
 
  150                 scheduleDataArtifactIngestTasks(ingestPipeline);
 
  166     synchronized void scheduleDataSourceIngestTask(IngestJobPipeline ingestPipeline) {
 
  167         if (!ingestPipeline.isCancelled()) {
 
  168             DataSourceIngestTask task = 
new DataSourceIngestTask(ingestPipeline);
 
  170                 dataSourceIngestTasksQueue.putLast(task);
 
  171             } 
catch (InterruptedException ex) {
 
  172                 IngestTasksScheduler.logger.log(Level.INFO, String.format(
"Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (pipelineId={%d)", ingestPipeline.getId()), ex);
 
  173                 Thread.currentThread().interrupt();
 
  193     synchronized void scheduleFileIngestTasks(IngestJobPipeline ingestPipeline, Collection<AbstractFile> files) {
 
  194         if (!ingestPipeline.isCancelled()) {
 
  195             Collection<AbstractFile> candidateFiles;
 
  196             if (files.isEmpty()) {
 
  197                 candidateFiles = getTopLevelFiles(ingestPipeline.getDataSource());
 
  199                 candidateFiles = files;
 
  201             for (AbstractFile file : candidateFiles) {
 
  202                 FileIngestTask task = 
new FileIngestTask(ingestPipeline, file);
 
  203                 if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
 
  204                     topLevelFileIngestTasksQueue.add(task);
 
  207             refillFileIngestTasksQueue();
 
  223     synchronized void scheduleStreamedFileIngestTasks(IngestJobPipeline ingestPipeline, List<Long> fileIds) {
 
  224         if (!ingestPipeline.isCancelled()) {
 
  225             for (
long id : fileIds) {
 
  233                 FileIngestTask task = 
new FileIngestTask(ingestPipeline, 
id);
 
  234                 streamedFileIngestTasksQueue.add(task);
 
  236             refillFileIngestTasksQueue();
 
  255     synchronized void fastTrackFileIngestTasks(IngestJobPipeline ingestPipeline, Collection<AbstractFile> files) {
 
  256         if (!ingestPipeline.isCancelled()) {
 
  265             for (AbstractFile file : files) {
 
  266                 FileIngestTask fileTask = 
new FileIngestTask(ingestPipeline, file);
 
  267                 if (shouldEnqueueFileTask(fileTask)) {
 
  269                         fileIngestTasksQueue.putFirst(fileTask);
 
  270                     } 
catch (InterruptedException ex) {
 
  271                         DataSource dataSource = ingestPipeline.getDataSource();
 
  272                         logger.log(Level.WARNING, String.format(
"Interrupted while enqueuing file tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); 
 
  273                         Thread.currentThread().interrupt();
 
  293     synchronized void scheduleDataArtifactIngestTasks(IngestJobPipeline ingestPipeline) {
 
  294         if (!ingestPipeline.isCancelled()) {
 
  295             Blackboard blackboard = Case.getCurrentCase().getSleuthkitCase().getBlackboard();
 
  297                 List<DataArtifact> artifacts = blackboard.getDataArtifacts(ingestPipeline.getDataSource().getId(), null);
 
  298                 scheduleDataArtifactIngestTasks(ingestPipeline, artifacts);
 
  299             } 
catch (TskCoreException ex) {
 
  300                 DataSource dataSource = ingestPipeline.getDataSource();
 
  301                 logger.log(Level.SEVERE, String.format(
"Failed to retrieve data artifacts for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); 
 
  321     synchronized void scheduleDataArtifactIngestTasks(IngestJobPipeline ingestPipeline, List<DataArtifact> artifacts) {
 
  322         if (!ingestPipeline.isCancelled()) {
 
  323             for (DataArtifact artifact : artifacts) {
 
  324                 DataArtifactIngestTask task = 
new DataArtifactIngestTask(ingestPipeline, artifact);
 
  326                     this.artifactIngestTasksQueue.putLast(task);
 
  327                 } 
catch (InterruptedException ex) {
 
  328                     DataSource dataSource = ingestPipeline.getDataSource();
 
  329                     logger.log(Level.WARNING, String.format(
"Interrupted while enqueuing data artifact tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); 
 
  330                     Thread.currentThread().interrupt();
 
  343     synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
 
  344         dataSourceIngestTasksQueue.taskCompleted(task);
 
  353     synchronized void notifyTaskCompleted(FileIngestTask task) {
 
  354         fileIngestTasksQueue.taskCompleted(task);
 
  355         refillFileIngestTasksQueue();
 
  364     synchronized void notifyTaskCompleted(DataArtifactIngestTask task) {
 
  365         artifactIngestTasksQueue.taskCompleted(task);
 
  376     synchronized boolean currentTasksAreCompleted(IngestJobPipeline ingestPipeline) {
 
  377         long pipelineId = ingestPipeline.getId();
 
  378         return !(dataSourceIngestTasksQueue.hasTasksForJob(pipelineId)
 
  379                 || hasTasksForJob(topLevelFileIngestTasksQueue, pipelineId)
 
  380                 || hasTasksForJob(batchedFileIngestTasksQueue, pipelineId)
 
  381                 || hasTasksForJob(streamedFileIngestTasksQueue, pipelineId)
 
  382                 || fileIngestTasksQueue.hasTasksForJob(pipelineId)
 
  383                 || artifactIngestTasksQueue.hasTasksForJob(pipelineId));
 
  405     synchronized void cancelPendingFileTasksForIngestJob(IngestJobPipeline ingestJobPipeline) {
 
  406         long jobId = ingestJobPipeline.getId();
 
  407         removeTasksForJob(topLevelFileIngestTasksQueue, jobId);
 
  408         removeTasksForJob(batchedFileIngestTasksQueue, jobId);
 
  409         removeTasksForJob(streamedFileIngestTasksQueue, jobId);
 
  420     private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
 
  421         List<AbstractFile> topLevelFiles = 
new ArrayList<>();
 
  422         Collection<AbstractFile> rootObjects = dataSource.accept(
new GetRootDirectoryVisitor());
 
  423         if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
 
  425             topLevelFiles.add((AbstractFile) dataSource);
 
  427             for (AbstractFile root : rootObjects) {
 
  428                 List<Content> children;
 
  430                     children = root.getChildren();
 
  431                     if (children.isEmpty()) {
 
  434                         topLevelFiles.add(root);
 
  438                         for (Content child : children) {
 
  439                             if (child instanceof AbstractFile) {
 
  440                                 topLevelFiles.add((AbstractFile) child);
 
  444                 } 
catch (TskCoreException ex) {
 
  445                     logger.log(Level.SEVERE, 
"Could not get children of root to enqueue: " + root.getId() + 
": " + root.getName(), ex); 
 
  449         return topLevelFiles;
 
  459     synchronized private void refillFileIngestTasksQueue() {
 
  461             takeFromStreamingFileTasksQueue();
 
  462             takeFromBatchTasksQueues();
 
  463         } 
catch (InterruptedException ex) {
 
  464             IngestTasksScheduler.logger.log(Level.INFO, 
"Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
 
  465             Thread.currentThread().interrupt();
 
  475     synchronized private void takeFromStreamingFileTasksQueue() throws InterruptedException {
 
  476         while (fileIngestTasksQueue.isEmpty()) {
 
  478             while (taskCount < IngestManager.getInstance().getNumberOfFileIngestThreads()) {
 
  479                 final FileIngestTask streamingTask = streamedFileIngestTasksQueue.poll();
 
  480                 if (streamingTask == null) {
 
  483                 if (shouldEnqueueFileTask(streamingTask)) {
 
  484                     fileIngestTasksQueue.putLast(streamingTask);
 
  516     synchronized private void takeFromBatchTasksQueues() throws InterruptedException {
 
  518         while (fileIngestTasksQueue.isEmpty()) {
 
  523             if (batchedFileIngestTasksQueue.isEmpty()) {
 
  524                 final FileIngestTask topLevelTask = topLevelFileIngestTasksQueue.pollFirst();
 
  525                 if (topLevelTask != null) {
 
  526                     batchedFileIngestTasksQueue.addLast(topLevelTask);
 
  534             final FileIngestTask nextTask = batchedFileIngestTasksQueue.pollFirst();
 
  535             if (nextTask == null) {
 
  538             if (shouldEnqueueFileTask(nextTask)) {
 
  539                 fileIngestTasksQueue.putLast(nextTask);
 
  546             AbstractFile file = null;
 
  548                 file = nextTask.getFile();
 
  549                 for (Content child : file.getChildren()) {
 
  550                     if (child instanceof AbstractFile) {
 
  551                         AbstractFile childFile = (AbstractFile) child;
 
  552                         FileIngestTask childTask = 
new FileIngestTask(nextTask.getIngestJobPipeline(), childFile);
 
  553                         if (childFile.hasChildren()) {
 
  554                             batchedFileIngestTasksQueue.add(childTask);
 
  555                         } 
else if (shouldEnqueueFileTask(childTask)) {
 
  556                             fileIngestTasksQueue.putLast(childTask);
 
  560             } 
catch (TskCoreException ex) {
 
  562                     logger.log(Level.SEVERE, String.format(
"Error getting the children of %s (object ID = %d)", file.getName(), file.getId()), ex); 
 
  564                     logger.log(Level.SEVERE, 
"Error loading file with object ID = {0}", nextTask.getFileId()); 
 
  580     private static boolean shouldEnqueueFileTask(
final FileIngestTask task) {
 
  583             file = task.getFile();
 
  584         } 
catch (TskCoreException ex) {
 
  585             logger.log(Level.SEVERE, 
"Error loading file with ID {0}", task.getFileId());
 
  591         String fileName = file.getName();
 
  593         if (fileName.equals(
".") || fileName.equals(
"..")) {
 
  609         if (!file.isDir() && !shouldBeCarved(task) && !fileAcceptedByFilter(task)) {
 
  619             TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
 
  621                 FileSystem fs = f.getFileSystem();
 
  623                     fsType = fs.getFsType();
 
  625             } 
catch (TskCoreException ex) {
 
  626                 logger.log(Level.SEVERE, 
"Error querying file system for " + f, ex); 
 
  630             if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
 
  635             boolean isInRootDir = 
false;
 
  637                 AbstractFile parent = f.getParentDirectory();
 
  638                 if (parent == null) {
 
  641                     isInRootDir = parent.isRoot();
 
  643             } 
catch (TskCoreException ex) {
 
  644                 logger.log(Level.WARNING, 
"Error querying parent directory for" + f.getName(), ex); 
 
  650             if (isInRootDir && f.getMetaAddr() < 32) {
 
  651                 String name = f.getName();
 
  652                 if (name.length() > 0 && name.charAt(0) == 
'$' && name.contains(
":")) {
 
  668     private static boolean shouldBeCarved(
final FileIngestTask task) {
 
  670             AbstractFile file = task.getFile();
 
  671             return task.getIngestJobPipeline().shouldProcessUnallocatedSpace() && file.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS);
 
  672         } 
catch (TskCoreException ex) {
 
  685     private static boolean fileAcceptedByFilter(
final FileIngestTask task) {
 
  687             AbstractFile file = task.getFile();
 
  688             return !(task.getIngestJobPipeline().getFileIngestFilter().fileIsMemberOf(file) == null);
 
  689         } 
catch (TskCoreException ex) {
 
  703     synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks, 
long pipelineId) {
 
  704         for (IngestTask task : tasks) {
 
  705             if (task.getIngestJobPipeline().getId() == pipelineId) {
 
  719     private static void removeTasksForJob(Collection<? extends IngestTask> tasks, 
long pipelineId) {
 
  720         Iterator<? extends IngestTask> iterator = tasks.iterator();
 
  721         while (iterator.hasNext()) {
 
  722             IngestTask task = iterator.next();
 
  723             if (task.getIngestJobPipeline().getId() == pipelineId) {
 
  738     private static int countTasksForJob(Collection<? extends IngestTask> tasks, 
long pipelineId) {
 
  740         for (IngestTask task : tasks) {
 
  741             if (task.getIngestJobPipeline().getId() == pipelineId) {
 
  756     synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(
long jobId) {
 
  757         return new IngestJobTasksSnapshot(jobId, dataSourceIngestTasksQueue.countQueuedTasksForJob(jobId),
 
  758                 countTasksForJob(topLevelFileIngestTasksQueue, jobId),
 
  759                 countTasksForJob(batchedFileIngestTasksQueue, jobId),
 
  760                 fileIngestTasksQueue.countQueuedTasksForJob(jobId),
 
  761                 dataSourceIngestTasksQueue.countRunningTasksForJob(jobId) + fileIngestTasksQueue.countRunningTasksForJob(jobId) + artifactIngestTasksQueue.countRunningTasksForJob(jobId),
 
  762                 countTasksForJob(streamedFileIngestTasksQueue, jobId),
 
  763                 artifactIngestTasksQueue.countQueuedTasksForJob(jobId));
 
  773         public int compare(FileIngestTask q1, FileIngestTask q2) {
 
  776             AbstractFile file1 = null;
 
  777             AbstractFile file2 = null;
 
  779                 file1 = q1.getFile();
 
  780             } 
catch (TskCoreException ex) {
 
  785                 file2 = q2.getFile();
 
  786             } 
catch (TskCoreException ex) {
 
  792                     return (
int) (q2.getFileId() - q1.getFileId());
 
  796             } 
else if (file2 == null) {
 
  803                 return (
int) (file2.getId() - file1.getId());
 
  805                 return p2.ordinal() - p1.ordinal();
 
  820                 LAST, LOW, MEDIUM, HIGH
 
  823             static final List<Pattern> LAST_PRI_PATHS = 
new ArrayList<>();
 
  825             static final List<Pattern> LOW_PRI_PATHS = 
new ArrayList<>();
 
  827             static final List<Pattern> MEDIUM_PRI_PATHS = 
new ArrayList<>();
 
  829             static final List<Pattern> HIGH_PRI_PATHS = 
new ArrayList<>();
 
  845                 LAST_PRI_PATHS.add(Pattern.compile(
"^pagefile", Pattern.CASE_INSENSITIVE));
 
  846                 LAST_PRI_PATHS.add(Pattern.compile(
"^hiberfil", Pattern.CASE_INSENSITIVE));
 
  849                 LOW_PRI_PATHS.add(Pattern.compile(
"^\\$OrphanFiles", Pattern.CASE_INSENSITIVE));
 
  850                 LOW_PRI_PATHS.add(Pattern.compile(
"^Windows", Pattern.CASE_INSENSITIVE));
 
  852                 MEDIUM_PRI_PATHS.add(Pattern.compile(
"^Program Files", Pattern.CASE_INSENSITIVE));
 
  854                 HIGH_PRI_PATHS.add(Pattern.compile(
"^Users", Pattern.CASE_INSENSITIVE));
 
  855                 HIGH_PRI_PATHS.add(Pattern.compile(
"^Documents and Settings", Pattern.CASE_INSENSITIVE));
 
  856                 HIGH_PRI_PATHS.add(Pattern.compile(
"^home", Pattern.CASE_INSENSITIVE));
 
  857                 HIGH_PRI_PATHS.add(Pattern.compile(
"^ProgramData", Pattern.CASE_INSENSITIVE));
 
  868                 if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) {
 
  874                 final String path = abstractFile.getName();
 
  878                 for (Pattern p : HIGH_PRI_PATHS) {
 
  879                     Matcher m = p.matcher(path);
 
  884                 for (Pattern p : MEDIUM_PRI_PATHS) {
 
  885                     Matcher m = p.matcher(path);
 
  890                 for (Pattern p : LOW_PRI_PATHS) {
 
  891                     Matcher m = p.matcher(path);
 
  896                 for (Pattern p : LAST_PRI_PATHS) {
 
  897                     Matcher m = p.matcher(path);
 
  915         private final BlockingDeque<IngestTask> 
taskQueue = 
new LinkedBlockingDeque<>();
 
  931         void putFirst(IngestTask task) throws InterruptedException {
 
  932             synchronized (
this) {
 
  936                 this.taskQueue.putFirst(task);
 
  937             } 
catch (InterruptedException ex) {
 
  938                 synchronized (
this) {
 
  955         void putLast(IngestTask task) 
throws InterruptedException {
 
  956             synchronized (
this) {
 
  960                 this.taskQueue.putLast(task);
 
  961             } 
catch (InterruptedException ex) {
 
  962                 synchronized (
this) {
 
  981             IngestTask task = taskQueue.takeFirst();
 
  982             synchronized (
this) {
 
  995             synchronized (
this) {
 
 1006         void taskCompleted(IngestTask task) {
 
 1007             synchronized (
this) {
 
 1020         boolean hasTasksForJob(
long pipelineId) {
 
 1021             synchronized (
this) {
 
 1022                 return IngestTasksScheduler.hasTasksForJob(
queuedTasks, pipelineId) || IngestTasksScheduler.hasTasksForJob(
tasksInProgress, pipelineId);
 
 1033         int countQueuedTasksForJob(
long pipelineId) {
 
 1034             synchronized (
this) {
 
 1035                 return IngestTasksScheduler.countTasksForJob(
queuedTasks, pipelineId);
 
 1046         int countRunningTasksForJob(
long pipelineId) {
 
 1047             synchronized (
this) {
 
 1048                 return IngestTasksScheduler.countTasksForJob(
tasksInProgress, pipelineId);
 
 1057     static final class IngestJobTasksSnapshot 
implements Serializable {
 
 1059         private static final long serialVersionUID = 1L;
 
 1060         private final long jobId;
 
 1061         private final long dsQueueSize;
 
 1062         private final long rootQueueSize;
 
 1063         private final long dirQueueSize;
 
 1064         private final long fileQueueSize;
 
 1065         private final long runningListSize;
 
 1066         private final long streamingQueueSize;
 
 1067         private final long artifactsQueueSize;
 
 1083         IngestJobTasksSnapshot(
long jobId, 
long dsQueueSize, 
long rootQueueSize, 
long dirQueueSize, 
long fileQueueSize,
 
 1084                 long runningListSize, 
long streamingQueueSize, 
long artifactsQueueSize) {
 
 1086             this.dsQueueSize = dsQueueSize;
 
 1087             this.rootQueueSize = rootQueueSize;
 
 1088             this.dirQueueSize = dirQueueSize;
 
 1089             this.fileQueueSize = fileQueueSize;
 
 1090             this.runningListSize = runningListSize;
 
 1091             this.streamingQueueSize = streamingQueueSize;
 
 1092             this.artifactsQueueSize = artifactsQueueSize;
 
 1111         long getRootQueueSize() {
 
 1112             return rootQueueSize;
 
 1121         long getDirectoryTasksQueueSize() {
 
 1122             return dirQueueSize;
 
 1125         long getFileQueueSize() {
 
 1126             return fileQueueSize;
 
 1129         long getStreamingQueueSize() {
 
 1130             return streamingQueueSize;
 
 1133         long getDsQueueSize() {
 
 1137         long getRunningListSize() {
 
 1138             return runningListSize;
 
 1141         long getArtifactsQueueSize() {
 
 1142             return artifactsQueueSize;
 
final List< IngestTask > queuedTasks
 
final BlockingDeque< IngestTask > taskQueue
 
final List< IngestTask > tasksInProgress
 
int compare(FileIngestTask q1, FileIngestTask q2)