19 package org.sleuthkit.autopsy.ingest;
 
   21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
   22 import java.awt.EventQueue;
 
   23 import java.beans.PropertyChangeEvent;
 
   24 import java.beans.PropertyChangeListener;
 
   25 import java.util.ArrayList;
 
   26 import java.util.Collection;
 
   27 import java.util.Collections;
 
   28 import java.util.Date;
 
   29 import java.util.HashMap;
 
   30 import java.util.HashSet;
 
   31 import java.util.List;
 
   34 import java.util.concurrent.Callable;
 
   35 import java.util.concurrent.ConcurrentHashMap;
 
   36 import java.util.concurrent.ExecutorService;
 
   37 import java.util.concurrent.Executors;
 
   38 import java.util.concurrent.Future;
 
   39 import java.util.concurrent.atomic.AtomicLong;
 
   40 import java.util.logging.Level;
 
   41 import java.util.stream.Collectors;
 
   42 import java.util.stream.Stream;
 
   43 import javax.swing.JOptionPane;
 
   44 import org.netbeans.api.progress.ProgressHandle;
 
   45 import org.openide.util.Cancellable;
 
   46 import org.openide.util.NbBundle;
 
  111             .map(IngestJobEvent::toString)
 
  112             .collect(Collectors.toSet());
 
  114             .map(IngestModuleEvent::toString)
 
  115             .collect(Collectors.toSet());
 
  235         if (instance == null) {
 
  256         this.ingestModuleRunTimes = 
new ConcurrentHashMap<>();
 
  257         this.ingestThreadActivitySnapshots = 
new ConcurrentHashMap<>();
 
  258         this.ingestErrorMessagePosts = 
new AtomicLong(0L);
 
  260         this.eventPublishingExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-ingest-events-%d").build()); 
 
  263         this.dataSourceIngestThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-data-source-ingest-%d").build()); 
 
  264         this.startIngestJobsThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-start-ingest-jobs-%d").build()); 
 
  265         this.nextThreadId = 
new AtomicLong(0L);
 
  266         this.jobsById = 
new HashMap<>();
 
  267         this.startIngestJobTasks = 
new ConcurrentHashMap<>();
 
  275         if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) {
 
  279         fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads, 
new ThreadFactoryBuilder().setNameFormat(
"IM-file-ingest-%d").build()); 
 
  290         long threadId = nextThreadId.incrementAndGet();
 
  291         dataSourceIngestThreadPool.submit(
new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
 
  292         ingestThreadActivitySnapshots.put(threadId, 
new IngestThreadActivitySnapshot(threadId));
 
  300         long threadId = nextThreadId.incrementAndGet();
 
  301         fileIngestThreadPool.submit(
new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
 
  302         ingestThreadActivitySnapshots.put(threadId, 
new IngestThreadActivitySnapshot(threadId));
 
  311             public void propertyChange(PropertyChangeEvent event) {
 
  312                 if (event.getNewValue() != null) {
 
  326         PropertyChangeListener propChangeListener = 
new PropertyChangeListener() {
 
  328             public void propertyChange(PropertyChangeEvent evt) {
 
  336                     } 
catch (IllegalStateException ignore) {
 
  343                     logger.log(Level.SEVERE, 
"Service {0} is down! Cancelling all running ingest jobs", serviceDisplayName); 
 
  347                         EventQueue.invokeLater(
new Runnable() {
 
  350                                 JOptionPane.showMessageDialog(null,
 
  351                                         NbBundle.getMessage(
this.getClass(), 
"IngestManager.cancellingIngest.msgDlg.text"),
 
  352                                         NbBundle.getMessage(
this.getClass(), 
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
 
  353                                         JOptionPane.ERROR_MESSAGE);
 
  365         Set<String> servicesList = 
new HashSet<>();
 
  368         this.servicesMonitor.
addSubscriber(servicesList, propChangeListener);
 
  371     synchronized void handleCaseOpened() {
 
  372         this.jobCreationIsEnabled = 
true;
 
  387         } 
catch (IllegalStateException | AutopsyEventException ex) {
 
  388             logger.log(Level.SEVERE, 
"Failed to open remote events channel", ex); 
 
  389             MessageNotifyUtil.Notify.error(NbBundle.getMessage(
IngestManager.class, 
"IngestManager.OpenEventChannel.Fail.Title"),
 
  390                     NbBundle.getMessage(
IngestManager.class, 
"IngestManager.OpenEventChannel.Fail.ErrMsg"));
 
  394     synchronized void handleCaseClosed() {
 
  397         this.jobCreationIsEnabled = 
false;
 
  418     void initIngestMessageInbox() {
 
  420             ingestMessageBox = IngestMessageTopComponent.findInstance();
 
  429     void postIngestMessage(IngestMessage message) {
 
  431             if (ingestMessageBox != null && RuntimeProperties.coreComponentsAreActive()) {
 
  432                 if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
 
  433                     ingestMessageBox.displayMessage(message);
 
  435                     long errorPosts = ingestErrorMessagePosts.incrementAndGet();
 
  436                     if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
 
  437                         ingestMessageBox.displayMessage(message);
 
  438                     } 
else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
 
  439                         IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
 
  440                                 NbBundle.getMessage(
this.getClass(), 
"IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
 
  441                                 NbBundle.getMessage(
this.getClass(), 
"IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
 
  442                                 NbBundle.getMessage(
this.getClass(), 
"IngestManager.IngestMessage.ErrorMessageLimitReached.msg", 
MAX_ERROR_MESSAGE_POSTS));
 
  443                         ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
 
  452             if (ingestMessageBox != null) {
 
  453                 ingestMessageBox.clearMessages();
 
  455             ingestErrorMessagePosts.set(0);
 
  477         if (jobCreationIsEnabled) {
 
  479             if (job.hasIngestPipeline()) {
 
  480                 long taskId = nextThreadId.incrementAndGet();
 
  481                 Future<Void> task = startIngestJobsThreadPool.submit(
new StartIngestJobTask(taskId, job));
 
  482                 startIngestJobTasks.put(taskId, task);
 
  499         if (this.jobCreationIsEnabled) {
 
  501             if (job.hasIngestPipeline()) {
 
  533         "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
 
  534         "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
 
  535         "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
 
  536         "IngestManager.startupErr.dlgErrorList=Errors:" 
  539         List<IngestModuleError> errors = null;
 
  540         if (this.jobCreationIsEnabled) {
 
  547                             EventQueue.invokeLater(
new Runnable() {
 
  551                                     JOptionPane.showMessageDialog(null,
 
  552                                             NbBundle.getMessage(
this.getClass(), 
"IngestManager.cancellingIngest.msgDlg.text"),
 
  553                                             NbBundle.getMessage(
this.getClass(), 
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
 
  554                                             JOptionPane.ERROR_MESSAGE);
 
  566             if (!ingestMonitor.isRunning()) {
 
  567                 ingestMonitor.start();
 
  571                 jobsById.put(job.
getId(), job);
 
  573             errors = job.start();
 
  574             if (errors.isEmpty()) {
 
  575                 this.fireIngestJobStarted(job.
getId());
 
  579                     this.jobsById.remove(job.
getId());
 
  582                     logger.log(Level.SEVERE, String.format(
"Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.
getId()), error.getThrowable()); 
 
  586                     final StringBuilder message = 
new StringBuilder();
 
  587                     message.append(Bundle.IngestManager_startupErr_dlgMsg()).append(
"\n");
 
  588                     message.append(Bundle.IngestManager_startupErr_dlgSolution()).append(
"\n\n");
 
  589                     message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append(
"\n");
 
  591                         String moduleName = error.getModuleDisplayName();
 
  592                         String errorMessage = error.getThrowable().getLocalizedMessage();
 
  593                         message.append(moduleName).append(
": ").append(errorMessage).append(
"\n");
 
  595                     message.append(
"\n\n");
 
  596                     EventQueue.invokeLater(() -> {
 
  597                         JOptionPane.showMessageDialog(null, message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
 
  607     synchronized void finishIngestJob(
IngestJob job) {
 
  608         long jobId = job.
getId();
 
  610             jobsById.remove(jobId);
 
  613             IngestManager.logger.log(Level.INFO, 
"Ingest job {0} completed", jobId); 
 
  614             fireIngestJobCompleted(jobId);
 
  616             IngestManager.logger.log(Level.INFO, 
"Ingest job {0} cancelled", jobId); 
 
  617             fireIngestJobCancelled(jobId);
 
  628             return !jobsById.isEmpty();
 
  652         for (Future<Void> handle : startIngestJobTasks.values()) {
 
  660             for (
IngestJob job : this.jobsById.values()) {
 
  690         moduleEventPublisher.
addSubscriber(moduleEventNames, listener);
 
  735     void fireIngestJobStarted(
long ingestJobId) {
 
  737         eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
 
  745     void fireIngestJobCompleted(
long ingestJobId) {
 
  746         AutopsyEvent 
event = 
new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
 
  747         eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
 
  755     void fireIngestJobCancelled(
long ingestJobId) {
 
  756         AutopsyEvent 
event = 
new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
 
  757         eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
 
  767     void fireDataSourceAnalysisStarted(
long ingestJobId, 
long dataSourceIngestJobId, Content dataSource) {
 
  768         AutopsyEvent 
event = 
new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
 
  769         eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
 
  779     void fireDataSourceAnalysisCompleted(
long ingestJobId, 
long dataSourceIngestJobId, Content dataSource) {
 
  780         AutopsyEvent 
event = 
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
 
  781         eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
 
  791     void fireDataSourceAnalysisCancelled(
long ingestJobId, 
long dataSourceIngestJobId, Content dataSource) {
 
  792         AutopsyEvent 
event = 
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
 
  793         eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
 
  801     void fireFileIngestDone(AbstractFile file) {
 
  802         AutopsyEvent 
event = 
new FileAnalyzedEvent(file);
 
  803         eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
 
  811     void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
 
  812         AutopsyEvent 
event = 
new BlackboardPostEvent(moduleDataEvent);
 
  813         eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
 
  823     void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
 
  824         AutopsyEvent 
event = 
new ContentChangedEvent(moduleContentEvent);
 
  825         eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
 
  834     void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
 
  835         ingestThreadActivitySnapshots.put(task.getThreadId(), 
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
 
  844     void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
 
  845         IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
 
  846         IngestThreadActivitySnapshot newSnap = 
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
 
  847         ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
 
  849         incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
 
  857     void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
 
  858         ingestThreadActivitySnapshots.put(task.getThreadId(), 
new IngestThreadActivitySnapshot(task.getThreadId()));
 
  866     void setIngestTaskProgressCompleted(FileIngestTask task) {
 
  867         IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
 
  868         IngestThreadActivitySnapshot newSnap = 
new IngestThreadActivitySnapshot(task.getThreadId());
 
  869         ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
 
  870         incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
 
  880         if (moduleName.equals(
"IDLE")) { 
 
  885             Long prevTimeL = ingestModuleRunTimes.get(moduleName);
 
  887             if (prevTimeL != null) {
 
  888                 prevTime = prevTimeL;
 
  890             prevTime += duration;
 
  891             ingestModuleRunTimes.put(moduleName, prevTime);
 
  900     Map<String, Long> getModuleRunTimes() {
 
  912     List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
 
  913         return new ArrayList<>(ingestThreadActivitySnapshots.values());
 
  921     List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
 
  922         List<DataSourceIngestJob.Snapshot> snapShots = 
new ArrayList<>();
 
  924             for (IngestJob job : jobsById.values()) {
 
  925                 snapShots.addAll(job.getDataSourceIngestJobSnapshots());
 
  937     long getFreeDiskSpace() {
 
  938         if (ingestMonitor != null) {
 
  939             return ingestMonitor.getFreeSpace();
 
  962                 if (Thread.currentThread().isInterrupted()) {
 
  964                         jobsById.remove(job.
getId());
 
  970                     final String displayName = NbBundle.getMessage(this.getClass(), 
"IngestManager.StartIngestJobsTask.run.displayName");
 
  971                     this.progress = ProgressHandle.createHandle(displayName, 
new Cancellable() {
 
  973                         public boolean cancel() {
 
  974                             if (progress != null) {
 
  975                                 progress.setDisplayName(NbBundle.getMessage(
this.getClass(), 
"IngestManager.StartIngestJobsTask.run.cancelling", displayName));
 
  977                             Future<?> handle = startIngestJobTasks.remove(threadId);
 
  989                 if (null != progress) {
 
  992                 startIngestJobTasks.remove(threadId);
 
 1015                     IngestTask task = tasks.getNextTask(); 
 
 1016                     task.execute(threadId);
 
 1017                 } 
catch (InterruptedException ex) {
 
 1020                 if (Thread.currentThread().isInterrupted()) {
 
 1054     static final class IngestThreadActivitySnapshot {
 
 1056         private final long threadId;
 
 1057         private final Date startTime;
 
 1058         private final String activity;
 
 1059         private final String dataSourceName;
 
 1060         private final String fileName;
 
 1061         private final long jobId;
 
 1064         IngestThreadActivitySnapshot(
long threadId) {
 
 1065             this.threadId = threadId;
 
 1066             startTime = 
new Date();
 
 1067             this.activity = NbBundle.getMessage(this.getClass(), 
"IngestManager.IngestThreadActivitySnapshot.idleThread");
 
 1068             this.dataSourceName = 
"";
 
 1074         IngestThreadActivitySnapshot(
long threadId, 
long jobId, String activity, Content dataSource) {
 
 1075             this.threadId = threadId;
 
 1077             startTime = 
new Date();
 
 1078             this.activity = activity;
 
 1079             this.dataSourceName = dataSource.getName();
 
 1084         IngestThreadActivitySnapshot(
long threadId, 
long jobId, String activity, Content dataSource, AbstractFile file) {
 
 1085             this.threadId = threadId;
 
 1087             startTime = 
new Date();
 
 1088             this.activity = activity;
 
 1089             this.dataSourceName = dataSource.getName();
 
 1090             this.fileName = file.getName();
 
 1097         long getThreadId() {
 
 1101         Date getStartTime() {
 
 1105         String getActivity() {
 
 1109         String getDataSourceName() {
 
 1110             return dataSourceName;
 
 1113         String getFileName() {
 
 1142             super(message, cause);
 
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
String getServiceStatus(String service)
void removeIngestModuleEventListener(final PropertyChangeListener listener)
static final int MIN_NUMBER_OF_FILE_INGEST_THREADS
IngestManagerException(String message, Throwable cause)
static synchronized IngestManager getInstance()
static IngestManager instance
IngestJobStartResult startIngestJob(IngestJob job)
final ExecutorService startIngestJobsThreadPool
void cancelAllIngestJobs()
final Map< Long, Future< Void > > startIngestJobTasks
void publish(AutopsyEvent event)
static void addPropertyChangeListener(final PropertyChangeListener listener)
static void setNumberOfFileIngestThreads(int value)
static final Logger logger
final ExecutorService eventPublishingExecutor
void clearIngestMessageBox()
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void subscribeToServiceMonitorEvents()
boolean isIngestRunning()
static void setCoreComponentsActive(boolean coreComponentsActive)
DATA_SOURCE_ANALYSIS_COMPLETED
static void removePropertyChangeListener(final PropertyChangeListener listener)
volatile IngestMessageTopComponent ingestMessageBox
final IngestTaskQueue tasks
void addSubscriber(PropertyChangeListener subscriber)
static synchronized ServicesMonitor getInstance()
final AutopsyEventPublisher publisher
int numberOfFileIngestThreads
final ServicesMonitor servicesMonitor
void removeIngestJobEventListener(final PropertyChangeListener listener)
final ExecutorService fileIngestThreadPool
static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS
static int numberOfFileIngestThreads()
void incrementModuleRunTime(String moduleName, Long duration)
void addIngestJobEventListener(final PropertyChangeListener listener)
final Object ingestMessageBoxLock
void startFileIngestThread()
void openRemoteEventChannel(String channelName)
static boolean coreComponentsAreActive()
final Map< Long, IngestJob > jobsById
AutopsyEventPublisher jobEventPublisher
IngestManagerException(String message)
void closeRemoteEventChannel()
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ExecutorService dataSourceIngestThreadPool
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final String MODULE_EVENT_CHANNEL_NAME
volatile boolean jobCreationIsEnabled
static final int MAX_ERROR_MESSAGE_POSTS
final AtomicLong ingestErrorMessagePosts
void startDataSourceIngestThread()
static final Set< String > jobEventNames
int getNumberOfFileIngestThreads()
static final Set< String > moduleEventNames
void addIngestModuleEventListener(final PropertyChangeListener listener)
static final String JOB_EVENT_CHANNEL_NAME
static Case getCurrentCase()
synchronized static Logger getLogger(String name)
DATA_SOURCE_ANALYSIS_STARTED
static final int MAX_NUMBER_OF_FILE_INGEST_THREADS
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
AutopsyEventPublisher moduleEventPublisher
final IngestMonitor ingestMonitor
static final long serialVersionUID
synchronized IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final AtomicLong nextThreadId
static boolean isCaseOpen()
String getTextIndexName()
void subscribeToCaseEvents()
synchronized void setRunInteractively(boolean runInteractively)
static void addEventSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)