19 package org.sleuthkit.autopsy.ingest;
 
   21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
   22 import java.awt.EventQueue;
 
   23 import java.beans.PropertyChangeEvent;
 
   24 import java.beans.PropertyChangeListener;
 
   25 import java.util.ArrayList;
 
   26 import java.util.Collection;
 
   27 import java.util.Collections;
 
   28 import java.util.Date;
 
   29 import java.util.HashMap;
 
   30 import java.util.HashSet;
 
   31 import java.util.List;
 
   34 import java.util.concurrent.Callable;
 
   35 import java.util.concurrent.ConcurrentHashMap;
 
   36 import java.util.concurrent.ExecutorService;
 
   37 import java.util.concurrent.Executors;
 
   38 import java.util.concurrent.Future;
 
   39 import java.util.concurrent.atomic.AtomicLong;
 
   40 import java.util.logging.Level;
 
   41 import java.util.stream.Collectors;
 
   42 import java.util.stream.Stream;
 
   43 import javax.swing.JOptionPane;
 
   44 import org.netbeans.api.progress.ProgressHandle;
 
   45 import org.openide.util.Cancellable;
 
   46 import org.openide.util.NbBundle;
 
  111             .map(IngestJobEvent::toString)
 
  112             .collect(Collectors.toSet());
 
  114             .map(IngestModuleEvent::toString)
 
  115             .collect(Collectors.toSet());
 
  235         if (instance == null) {
 
  256         this.ingestModuleRunTimes = 
new ConcurrentHashMap<>();
 
  257         this.ingestThreadActivitySnapshots = 
new ConcurrentHashMap<>();
 
  258         this.ingestErrorMessagePosts = 
new AtomicLong(0L);
 
  260         this.eventPublishingExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-ingest-events-%d").build()); 
 
  263         this.dataSourceIngestThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-data-source-ingest-%d").build()); 
 
  264         this.startIngestJobsThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-start-ingest-jobs-%d").build()); 
 
  265         this.nextThreadId = 
new AtomicLong(0L);
 
  266         this.jobsById = 
new HashMap<>();
 
  267         this.startIngestJobTasks = 
new ConcurrentHashMap<>();
 
  275         if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) {
 
  279         fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads, 
new ThreadFactoryBuilder().setNameFormat(
"IM-file-ingest-%d").build()); 
 
  290         long threadId = nextThreadId.incrementAndGet();
 
  291         dataSourceIngestThreadPool.submit(
new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
 
  292         ingestThreadActivitySnapshots.put(threadId, 
new IngestThreadActivitySnapshot(threadId));
 
  300         long threadId = nextThreadId.incrementAndGet();
 
  301         fileIngestThreadPool.submit(
new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
 
  302         ingestThreadActivitySnapshots.put(threadId, 
new IngestThreadActivitySnapshot(threadId));
 
  311             public void propertyChange(PropertyChangeEvent event) {
 
  312                 if (event.getNewValue() != null) {
 
  326         PropertyChangeListener propChangeListener = 
new PropertyChangeListener() {
 
  328             public void propertyChange(PropertyChangeEvent evt) {
 
  336                     } 
catch (IllegalStateException ignore) {
 
  343                     logger.log(Level.SEVERE, 
"Service {0} is down! Cancelling all running ingest jobs", serviceDisplayName); 
 
  347                         EventQueue.invokeLater(
new Runnable() {
 
  350                                 JOptionPane.showMessageDialog(null,
 
  351                                         NbBundle.getMessage(
this.getClass(), 
"IngestManager.cancellingIngest.msgDlg.text"),
 
  352                                         NbBundle.getMessage(
this.getClass(), 
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
 
  353                                         JOptionPane.ERROR_MESSAGE);
 
  365         Set<String> servicesList = 
new HashSet<>();
 
  368         this.servicesMonitor.
addSubscriber(servicesList, propChangeListener);
 
  371     synchronized void handleCaseOpened() {
 
  372         this.jobCreationIsEnabled = 
true;
 
  382             String channelPrefix = openedCase.
getName();
 
  387         } 
catch (IllegalStateException | AutopsyEventException ex) {
 
  388             logger.log(Level.SEVERE, 
"Failed to open remote events channel", ex); 
 
  389             MessageNotifyUtil.Notify.error(NbBundle.getMessage(
IngestManager.class, 
"IngestManager.OpenEventChannel.Fail.Title"),
 
  390                     NbBundle.getMessage(
IngestManager.class, 
"IngestManager.OpenEventChannel.Fail.ErrMsg"));
 
  394     synchronized void handleCaseClosed() {
 
  402         this.jobCreationIsEnabled = 
false;
 
  411     void initIngestMessageInbox() {
 
  413             ingestMessageBox = IngestMessageTopComponent.findInstance();
 
  422     void postIngestMessage(IngestMessage message) {
 
  424             if (ingestMessageBox != null && RuntimeProperties.runningWithGUI()) {
 
  425                 if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
 
  426                     ingestMessageBox.displayMessage(message);
 
  428                     long errorPosts = ingestErrorMessagePosts.incrementAndGet();
 
  429                     if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
 
  430                         ingestMessageBox.displayMessage(message);
 
  431                     } 
else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
 
  432                         IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
 
  433                                 NbBundle.getMessage(
this.getClass(), 
"IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
 
  434                                 NbBundle.getMessage(
this.getClass(), 
"IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
 
  435                                 NbBundle.getMessage(
this.getClass(), 
"IngestManager.IngestMessage.ErrorMessageLimitReached.msg", 
MAX_ERROR_MESSAGE_POSTS));
 
  436                         ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
 
  445             if (ingestMessageBox != null) {
 
  446                 ingestMessageBox.clearMessages();
 
  448             ingestErrorMessagePosts.set(0);
 
  470         if (jobCreationIsEnabled) {
 
  472             if (job.hasIngestPipeline()) {
 
  473                 long taskId = nextThreadId.incrementAndGet();
 
  474                 Future<Void> task = startIngestJobsThreadPool.submit(
new StartIngestJobTask(taskId, job));
 
  475                 startIngestJobTasks.put(taskId, task);
 
  492         if (this.jobCreationIsEnabled) {
 
  494             if (job.hasIngestPipeline()) {
 
  526         "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
 
  527         "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
 
  528         "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
 
  529         "IngestManager.startupErr.dlgErrorList=Errors:" 
  532         List<IngestModuleError> errors = null;
 
  533         if (this.jobCreationIsEnabled) {
 
  540                             EventQueue.invokeLater(
new Runnable() {
 
  544                                     JOptionPane.showMessageDialog(null,
 
  545                                             NbBundle.getMessage(
this.getClass(), 
"IngestManager.cancellingIngest.msgDlg.text"),
 
  546                                             NbBundle.getMessage(
this.getClass(), 
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
 
  547                                             JOptionPane.ERROR_MESSAGE);
 
  559             if (!ingestMonitor.isRunning()) {
 
  560                 ingestMonitor.start();
 
  564                 jobsById.put(job.
getId(), job);
 
  566             errors = job.start();
 
  567             if (errors.isEmpty()) {
 
  568                 this.fireIngestJobStarted(job.
getId());
 
  572                     this.jobsById.remove(job.
getId());
 
  575                     logger.log(Level.SEVERE, String.format(
"Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.
getId()), error.getThrowable()); 
 
  579                     final StringBuilder message = 
new StringBuilder();
 
  580                     message.append(Bundle.IngestManager_startupErr_dlgMsg()).append(
"\n");
 
  581                     message.append(Bundle.IngestManager_startupErr_dlgSolution()).append(
"\n\n");
 
  582                     message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append(
"\n");
 
  584                         String moduleName = error.getModuleDisplayName();
 
  585                         String errorMessage = error.getThrowable().getLocalizedMessage();
 
  586                         message.append(moduleName).append(
": ").append(errorMessage).append(
"\n");
 
  588                     message.append(
"\n\n");
 
  589                     EventQueue.invokeLater(() -> {
 
  590                         JOptionPane.showMessageDialog(null, message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
 
  600     synchronized void finishIngestJob(
IngestJob job) {
 
  601         long jobId = job.
getId();
 
  603             jobsById.remove(jobId);
 
  606             IngestManager.logger.log(Level.INFO, 
"Ingest job {0} completed", jobId); 
 
  607             fireIngestJobCompleted(jobId);
 
  609             IngestManager.logger.log(Level.INFO, 
"Ingest job {0} cancelled", jobId); 
 
  610             fireIngestJobCancelled(jobId);
 
  621             return !jobsById.isEmpty();
 
  645         for (Future<Void> handle : startIngestJobTasks.values()) {
 
  653             for (
IngestJob job : this.jobsById.values()) {
 
  683         moduleEventPublisher.
addSubscriber(moduleEventNames, listener);
 
  728     void fireIngestJobStarted(
long ingestJobId) {
 
  730         eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
 
  738     void fireIngestJobCompleted(
long ingestJobId) {
 
  739         AutopsyEvent 
event = 
new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
 
  740         eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
 
  748     void fireIngestJobCancelled(
long ingestJobId) {
 
  749         AutopsyEvent 
event = 
new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
 
  750         eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
 
  760     void fireDataSourceAnalysisStarted(
long ingestJobId, 
long dataSourceIngestJobId, Content dataSource) {
 
  761         AutopsyEvent 
event = 
new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
 
  762         eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
 
  772     void fireDataSourceAnalysisCompleted(
long ingestJobId, 
long dataSourceIngestJobId, Content dataSource) {
 
  773         AutopsyEvent 
event = 
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
 
  774         eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
 
  784     void fireDataSourceAnalysisCancelled(
long ingestJobId, 
long dataSourceIngestJobId, Content dataSource) {
 
  785         AutopsyEvent 
event = 
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
 
  786         eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
 
  794     void fireFileIngestDone(AbstractFile file) {
 
  795         AutopsyEvent 
event = 
new FileAnalyzedEvent(file);
 
  796         eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
 
  804     void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
 
  805         AutopsyEvent 
event = 
new BlackboardPostEvent(moduleDataEvent);
 
  806         eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
 
  816     void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
 
  817         AutopsyEvent 
event = 
new ContentChangedEvent(moduleContentEvent);
 
  818         eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
 
  827     void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
 
  828         ingestThreadActivitySnapshots.put(task.getThreadId(), 
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
 
  837     void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
 
  838         IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
 
  839         IngestThreadActivitySnapshot newSnap = 
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
 
  840         ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
 
  842         incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
 
  850     void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
 
  851         ingestThreadActivitySnapshots.put(task.getThreadId(), 
new IngestThreadActivitySnapshot(task.getThreadId()));
 
  859     void setIngestTaskProgressCompleted(FileIngestTask task) {
 
  860         IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
 
  861         IngestThreadActivitySnapshot newSnap = 
new IngestThreadActivitySnapshot(task.getThreadId());
 
  862         ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
 
  863         incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
 
  873         if (moduleName.equals(
"IDLE")) { 
 
  878             Long prevTimeL = ingestModuleRunTimes.get(moduleName);
 
  880             if (prevTimeL != null) {
 
  881                 prevTime = prevTimeL;
 
  883             prevTime += duration;
 
  884             ingestModuleRunTimes.put(moduleName, prevTime);
 
  893     Map<String, Long> getModuleRunTimes() {
 
  905     List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
 
  906         return new ArrayList<>(ingestThreadActivitySnapshots.values());
 
  914     List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
 
  915         List<DataSourceIngestJob.Snapshot> snapShots = 
new ArrayList<>();
 
  917             for (IngestJob job : jobsById.values()) {
 
  918                 snapShots.addAll(job.getDataSourceIngestJobSnapshots());
 
  930     long getFreeDiskSpace() {
 
  931         if (ingestMonitor != null) {
 
  932             return ingestMonitor.getFreeSpace();
 
  955                 if (Thread.currentThread().isInterrupted()) {
 
  957                         jobsById.remove(job.
getId());
 
  963                     final String displayName = NbBundle.getMessage(this.getClass(), 
"IngestManager.StartIngestJobsTask.run.displayName");
 
  964                     this.progress = ProgressHandle.createHandle(displayName, 
new Cancellable() {
 
  966                         public boolean cancel() {
 
  967                             if (progress != null) {
 
  968                                 progress.setDisplayName(NbBundle.getMessage(
this.getClass(), 
"IngestManager.StartIngestJobsTask.run.cancelling", displayName));
 
  970                             Future<?> handle = startIngestJobTasks.remove(threadId);
 
  982                 if (null != progress) {
 
  985                 startIngestJobTasks.remove(threadId);
 
  997         private final IngestTaskQueue 
tasks;
 
 1008                     IngestTask task = tasks.getNextTask(); 
 
 1009                     task.execute(threadId);
 
 1010                 } 
catch (InterruptedException ex) {
 
 1013                 if (Thread.currentThread().isInterrupted()) {
 
 1047     static final class IngestThreadActivitySnapshot {
 
 1049         private final long threadId;
 
 1050         private final Date startTime;
 
 1051         private final String activity;
 
 1052         private final String dataSourceName;
 
 1053         private final String fileName;
 
 1054         private final long jobId;
 
 1057         IngestThreadActivitySnapshot(
long threadId) {
 
 1058             this.threadId = threadId;
 
 1059             startTime = 
new Date();
 
 1060             this.activity = NbBundle.getMessage(this.getClass(), 
"IngestManager.IngestThreadActivitySnapshot.idleThread");
 
 1061             this.dataSourceName = 
"";
 
 1067         IngestThreadActivitySnapshot(
long threadId, 
long jobId, String activity, Content dataSource) {
 
 1068             this.threadId = threadId;
 
 1070             startTime = 
new Date();
 
 1071             this.activity = activity;
 
 1072             this.dataSourceName = dataSource.getName();
 
 1077         IngestThreadActivitySnapshot(
long threadId, 
long jobId, String activity, Content dataSource, AbstractFile file) {
 
 1078             this.threadId = threadId;
 
 1080             startTime = 
new Date();
 
 1081             this.activity = activity;
 
 1082             this.dataSourceName = dataSource.getName();
 
 1083             this.fileName = file.getName();
 
 1090         long getThreadId() {
 
 1094         Date getStartTime() {
 
 1098         String getActivity() {
 
 1102         String getDataSourceName() {
 
 1103             return dataSourceName;
 
 1106         String getFileName() {
 
 1135             super(message, cause);
 
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
 
String getServiceStatus(String service)
 
void removeIngestModuleEventListener(final PropertyChangeListener listener)
 
static final int MIN_NUMBER_OF_FILE_INGEST_THREADS
 
IngestManagerException(String message, Throwable cause)
 
static synchronized IngestManager getInstance()
 
static IngestManager instance
 
IngestJobStartResult startIngestJob(IngestJob job)
 
static boolean runningWithGUI
 
final ExecutorService startIngestJobsThreadPool
 
void cancelAllIngestJobs()
 
final Map< Long, Future< Void > > startIngestJobTasks
 
void publish(AutopsyEvent event)
 
static void addPropertyChangeListener(final PropertyChangeListener listener)
 
static void setNumberOfFileIngestThreads(int value)
 
static final Logger logger
 
final ExecutorService eventPublishingExecutor
 
void clearIngestMessageBox()
 
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
 
void subscribeToServiceMonitorEvents()
 
boolean isIngestRunning()
 
DATA_SOURCE_ANALYSIS_COMPLETED
 
static void removePropertyChangeListener(final PropertyChangeListener listener)
 
volatile IngestMessageTopComponent ingestMessageBox
 
final IngestTaskQueue tasks
 
void addSubscriber(PropertyChangeListener subscriber)
 
static synchronized ServicesMonitor getInstance()
 
final AutopsyEventPublisher publisher
 
int numberOfFileIngestThreads
 
final ServicesMonitor servicesMonitor
 
void removeIngestJobEventListener(final PropertyChangeListener listener)
 
final ExecutorService fileIngestThreadPool
 
static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS
 
static int numberOfFileIngestThreads()
 
void incrementModuleRunTime(String moduleName, Long duration)
 
void addIngestJobEventListener(final PropertyChangeListener listener)
 
final Object ingestMessageBoxLock
 
void startFileIngestThread()
 
void openRemoteEventChannel(String channelName)
 
final Map< Long, IngestJob > jobsById
 
AutopsyEventPublisher jobEventPublisher
 
IngestManagerException(String message)
 
void closeRemoteEventChannel()
 
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
 
final ExecutorService dataSourceIngestThreadPool
 
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
 
static final String MODULE_EVENT_CHANNEL_NAME
 
volatile boolean jobCreationIsEnabled
 
static final int MAX_ERROR_MESSAGE_POSTS
 
final AtomicLong ingestErrorMessagePosts
 
void startDataSourceIngestThread()
 
static final Set< String > jobEventNames
 
int getNumberOfFileIngestThreads()
 
static final Set< String > moduleEventNames
 
void addIngestModuleEventListener(final PropertyChangeListener listener)
 
static final String JOB_EVENT_CHANNEL_NAME
 
static Case getCurrentCase()
 
synchronized static Logger getLogger(String name)
 
DATA_SOURCE_ANALYSIS_STARTED
 
static final int MAX_NUMBER_OF_FILE_INGEST_THREADS
 
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
 
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
 
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
 
AutopsyEventPublisher moduleEventPublisher
 
final IngestMonitor ingestMonitor
 
static final long serialVersionUID
 
synchronized IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
 
final AtomicLong nextThreadId
 
void subscribeToCaseEvents()
 
static void addEventSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)