19 package org.sleuthkit.autopsy.ingest;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.io.Serializable;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.Date;
30 import java.util.EnumSet;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.List;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.atomic.AtomicLong;
42 import java.util.logging.Level;
43 import java.util.stream.Collectors;
44 import java.util.stream.Stream;
45 import javax.annotation.concurrent.GuardedBy;
46 import javax.annotation.concurrent.Immutable;
47 import javax.annotation.concurrent.ThreadSafe;
48 import javax.swing.JOptionPane;
49 import org.netbeans.api.progress.ProgressHandle;
50 import org.openide.util.Cancellable;
51 import org.openide.util.NbBundle;
52 import org.openide.windows.WindowManager;
119 @GuardedBy(
"IngestManager.class")
123 private final ExecutorService
startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build());
128 private final ExecutorService
eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build());
147 if (null == instance) {
149 instance.subscribeToServiceMonitorEvents();
150 instance.subscribeToCaseEvents();
188 PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> {
203 logger.log(Level.SEVERE,
"Service {0} is down, cancelling all running ingest jobs", serviceDisplayName);
205 EventQueue.invokeLater(
new Runnable() {
208 JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
209 NbBundle.getMessage(this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
210 NbBundle.getMessage(this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
211 JOptionPane.ERROR_MESSAGE);
225 Set<String> servicesList =
new HashSet<>();
237 if (event.getNewValue() != null) {
252 void handleCaseOpened() {
257 String channelPrefix = openedCase.
getName();
262 }
catch (NoCurrentCaseException | AutopsyEventException ex) {
263 logger.log(Level.SEVERE,
"Failed to open remote events channel", ex);
264 MessageNotifyUtil.Notify.error(NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.Title"),
265 NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.ErrMsg"));
277 void handleCaseClosed() {
308 if (job.hasIngestPipeline()) {
327 if (job.hasIngestPipeline()) {
347 if (job.hasIngestPipeline()) {
364 "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
365 "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
366 "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
367 "IngestManager.startupErr.dlgErrorList=Errors:"
370 List<IngestModuleError> errors = null;
381 EventQueue.invokeLater(
new Runnable() {
385 JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
386 NbBundle.getMessage(this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
387 NbBundle.getMessage(this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
388 JOptionPane.ERROR_MESSAGE);
406 errors = job.start();
407 if (errors.isEmpty()) {
408 this.fireIngestJobStarted(job.
getId());
415 logger.log(Level.SEVERE, String.format(
"Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.
getId()), error.getThrowable());
419 final StringBuilder message =
new StringBuilder(1024);
420 message.append(Bundle.IngestManager_startupErr_dlgMsg()).append(
"\n");
421 message.append(Bundle.IngestManager_startupErr_dlgSolution()).append(
"\n\n");
422 message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append(
"\n");
424 String moduleName = error.getModuleDisplayName();
425 String errorMessage = error.getThrowable().getLocalizedMessage();
426 message.append(moduleName).append(
": ").append(errorMessage).append(
"\n");
428 message.append(
"\n\n");
429 EventQueue.invokeLater(() -> {
430 JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(), message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
445 long jobId = job.
getId();
450 IngestManager.logger.log(Level.INFO,
"Ingest job {0} completed", jobId);
451 fireIngestJobCompleted(jobId);
453 IngestManager.logger.log(Level.INFO,
"Ingest job {0} cancelled", jobId);
454 fireIngestJobCancelled(jobId);
527 void fireIngestJobStarted(
long ingestJobId) {
537 void fireIngestJobCompleted(
long ingestJobId) {
538 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
547 void fireIngestJobCancelled(
long ingestJobId) {
548 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
560 void fireDataSourceAnalysisStarted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
561 AutopsyEvent
event =
new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
573 void fireDataSourceAnalysisCompleted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
574 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
586 void fireDataSourceAnalysisCancelled(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
587 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
597 void fireFileIngestDone(AbstractFile file) {
598 AutopsyEvent
event =
new FileAnalyzedEvent(file);
609 void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
610 AutopsyEvent
event =
new BlackboardPostEvent(moduleDataEvent);
621 void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
622 AutopsyEvent
event =
new ContentChangedEvent(moduleContentEvent);
631 void initIngestMessageInbox() {
642 void postIngestMessage(IngestMessage message) {
645 if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
649 if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
651 }
else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
652 IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
653 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
654 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
655 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.msg",
MAX_ERROR_MESSAGE_POSTS));
686 void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
687 ingestThreadActivitySnapshots.put(task.getThreadId(),
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
701 void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
703 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
705 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
715 void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
726 void setIngestTaskProgressCompleted(FileIngestTask task) {
728 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId());
730 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
740 if (moduleDisplayName.equals(
"IDLE")) {
747 if (prevTimeL != null) {
748 prevTime = prevTimeL;
750 prevTime += duration;
789 snapShots.addAll(job.getDataSourceIngestJobSnapshots());
801 long getFreeDiskSpace() {
826 if (Thread.currentThread().isInterrupted()) {
834 final String displayName = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.displayName");
835 this.progress = ProgressHandle.createHandle(displayName,
new Cancellable() {
837 public boolean cancel() {
838 if (progress != null) {
839 progress.setDisplayName(NbBundle.getMessage(
this.getClass(),
"IngestManager.StartIngestJobsTask.run.cancelling", displayName));
853 if (null != progress) {
868 private final BlockingIngestTaskQueue
tasks;
879 IngestTask task = tasks.getNextTask();
880 task.execute(threadId);
881 }
catch (InterruptedException ex) {
884 if (Thread.currentThread().isInterrupted()) {
943 startTime =
new Date();
944 this.activity = NbBundle.getMessage(this.getClass(),
"IngestManager.IngestThreadActivitySnapshot.idleThread");
945 this.dataSourceName =
"";
963 startTime =
new Date();
965 this.dataSourceName = dataSource.getName();
981 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource, AbstractFile file) {
984 startTime =
new Date();
986 this.dataSourceName = dataSource.getName();
987 this.fileName = file.getName();
995 long getIngestJobId() {
1004 long getThreadId() {
1013 Date getStartTime() {
1022 String getActivity() {
1033 String getDataSourceName() {
1042 String getFileName() {
1125 private static final long serialVersionUID = 1L;
1143 super(message, cause);
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
final Map< Long, Future< Void > > startIngestJobFutures
String getServiceStatus(String service)
void removeIngestModuleEventListener(final PropertyChangeListener listener)
static final String INGEST_MODULE_EVENT_CHANNEL_NAME
List< IngestThreadActivitySnapshot > getIngestThreadActivitySnapshots()
void queueIngestJob(Content dataSource, List< AbstractFile > files, IngestJobSettings settings)
IngestManagerException(String message, Throwable cause)
static synchronized IngestManager getInstance()
static IngestManager instance
final String dataSourceName
IngestJobStartResult startIngestJob(IngestJob job)
final ExecutorService dataSourceLevelIngestJobTasksExecutor
static boolean runningWithGUI
void cancelAllIngestJobs()
void publish(AutopsyEvent event)
static void addPropertyChangeListener(final PropertyChangeListener listener)
IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
static final Logger logger
final ExecutorService eventPublishingExecutor
void clearIngestMessageBox()
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void subscribeToServiceMonitorEvents()
boolean isIngestRunning()
DATA_SOURCE_ANALYSIS_COMPLETED
static void removePropertyChangeListener(final PropertyChangeListener listener)
static final Set< String > INGEST_MODULE_EVENT_NAMES
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
final AutopsyEventPublisher publisher
synchronized void closeRemoteEventChannel()
Map< String, Long > getModuleRunTimes()
final ServicesMonitor servicesMonitor
void removeIngestJobEventListener(final PropertyChangeListener listener)
final BlockingIngestTaskQueue tasks
static final String INGEST_JOB_EVENT_CHANNEL_NAME
static final Set< String > INGEST_JOB_EVENT_NAMES
final AutopsyEventPublisher moduleEventPublisher
static int numberOfFileIngestThreads()
List< DataSourceIngestJob.Snapshot > getIngestJobSnapshots()
synchronized void openRemoteEventChannel(String channelName)
void addIngestJobEventListener(final PropertyChangeListener listener)
final Object ingestMessageBoxLock
IngestManagerException(String message)
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final int MAX_ERROR_MESSAGE_POSTS
final AtomicLong ingestErrorMessagePosts
volatile boolean caseIsOpen
final AtomicLong nextIngestManagerTaskId
int getNumberOfFileIngestThreads()
void addIngestModuleEventListener(final PropertyChangeListener listener)
synchronized static Logger getLogger(String name)
DATA_SOURCE_ANALYSIS_STARTED
static Case getCurrentCaseThrows()
void incrementModuleRunTime(String moduleDisplayName, Long duration)
static void addEventTypeSubscriber(Set< Events > eventTypes, PropertyChangeListener subscriber)
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
final IngestMonitor ingestMonitor
final ExecutorService fileLevelIngestJobTasksExecutor
final Map< Long, IngestJob > ingestJobsById
void subscribeToCaseEvents()
final ExecutorService startIngestJobsExecutor
final int numberOfFileIngestThreads
static final long serialVersionUID
final AutopsyEventPublisher jobEventPublisher