19 package org.sleuthkit.autopsy.ingest;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Date;
29 import java.util.EnumSet;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.List;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.atomic.AtomicLong;
41 import java.util.logging.Level;
42 import java.util.stream.Collectors;
43 import java.util.stream.Stream;
44 import javax.annotation.concurrent.GuardedBy;
45 import javax.annotation.concurrent.Immutable;
46 import javax.annotation.concurrent.ThreadSafe;
47 import javax.swing.JOptionPane;
48 import org.netbeans.api.progress.ProgressHandle;
49 import org.openide.util.Cancellable;
50 import org.openide.util.NbBundle;
116 @GuardedBy(
"IngestManager.class")
120 private final ExecutorService
startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build());
125 private final ExecutorService
eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build());
144 if (null == instance) {
146 instance.subscribeToServiceMonitorEvents();
147 instance.subscribeToCaseEvents();
185 PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> {
195 }
catch (IllegalStateException noCaseOpenException) {
200 LOGGER.log(Level.SEVERE,
"Service {0} is down, cancelling all running ingest jobs", serviceDisplayName);
202 EventQueue.invokeLater(
new Runnable() {
205 JOptionPane.showMessageDialog(null,
206 NbBundle.getMessage(
this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
207 NbBundle.getMessage(
this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
208 JOptionPane.ERROR_MESSAGE);
222 Set<String> servicesList =
new HashSet<>();
234 if (event.getNewValue() != null) {
249 void handleCaseOpened() {
254 String channelPrefix = openedCase.
getName();
259 }
catch (IllegalStateException | AutopsyEventException ex) {
260 LOGGER.log(Level.SEVERE,
"Failed to open remote events channel", ex);
261 MessageNotifyUtil.Notify.error(NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.Title"),
262 NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.ErrMsg"));
274 void handleCaseClosed() {
305 if (job.hasIngestPipeline()) {
325 if (job.hasIngestPipeline()) {
342 "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
343 "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
344 "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
345 "IngestManager.startupErr.dlgErrorList=Errors:"
348 List<IngestModuleError> errors = null;
354 EventQueue.invokeLater(
new Runnable() {
358 JOptionPane.showMessageDialog(null,
359 NbBundle.getMessage(
this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
360 NbBundle.getMessage(
this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
361 JOptionPane.ERROR_MESSAGE);
377 errors = job.start();
378 if (errors.isEmpty()) {
379 this.fireIngestJobStarted(job.
getId());
384 LOGGER.log(Level.SEVERE, String.format(
"Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.
getId()), error.getThrowable());
388 final StringBuilder message =
new StringBuilder(1024);
389 message.append(Bundle.IngestManager_startupErr_dlgMsg()).append(
"\n");
390 message.append(Bundle.IngestManager_startupErr_dlgSolution()).append(
"\n\n");
391 message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append(
"\n");
393 String moduleName = error.getModuleDisplayName();
394 String errorMessage = error.getThrowable().getLocalizedMessage();
395 message.append(moduleName).append(
": ").append(errorMessage).append(
"\n");
397 message.append(
"\n\n");
398 EventQueue.invokeLater(() -> {
399 JOptionPane.showMessageDialog(null, message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
415 long jobId = job.
getId();
419 fireIngestJobCompleted(jobId);
421 IngestManager.LOGGER.log(Level.INFO,
"Ingest job {0} cancelled", jobId);
422 fireIngestJobCancelled(jobId);
491 void fireIngestJobStarted(
long ingestJobId) {
501 void fireIngestJobCompleted(
long ingestJobId) {
502 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
511 void fireIngestJobCancelled(
long ingestJobId) {
512 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
524 void fireDataSourceAnalysisStarted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
525 AutopsyEvent
event =
new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
537 void fireDataSourceAnalysisCompleted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
538 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
550 void fireDataSourceAnalysisCancelled(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
551 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
561 void fireFileIngestDone(AbstractFile file) {
562 AutopsyEvent
event =
new FileAnalyzedEvent(file);
573 void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
574 AutopsyEvent
event =
new BlackboardPostEvent(moduleDataEvent);
585 void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
586 AutopsyEvent
event =
new ContentChangedEvent(moduleContentEvent);
595 void initIngestMessageInbox() {
606 void postIngestMessage(IngestMessage message) {
609 if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
613 if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
615 }
else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
616 IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
617 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
618 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
619 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.msg",
MAX_ERROR_MESSAGE_POSTS));
650 void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
651 ingestThreadActivitySnapshots.put(task.getThreadId(),
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
665 void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
667 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
669 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
679 void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
690 void setIngestTaskProgressCompleted(FileIngestTask task) {
692 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId());
694 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
704 if (moduleDisplayName.equals(
"IDLE")) {
711 if (prevTimeL != null) {
712 prevTime = prevTimeL;
714 prevTime += duration;
724 Map<String, Long> getModuleRunTimes() {
737 List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
746 List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
747 List<DataSourceIngestJob.Snapshot> snapShots =
new ArrayList<>();
749 snapShots.addAll(job.getDataSourceIngestJobSnapshots());
760 long getFreeDiskSpace() {
785 if (Thread.currentThread().isInterrupted()) {
791 final String displayName = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.displayName");
792 this.progress = ProgressHandle.createHandle(displayName,
new Cancellable() {
794 public boolean cancel() {
795 if (progress != null) {
796 progress.setDisplayName(NbBundle.getMessage(
this.getClass(),
"IngestManager.StartIngestJobsTask.run.cancelling", displayName));
810 if (null != progress) {
825 private final IngestTaskQueue
tasks;
836 IngestTask task = tasks.getNextTask();
837 task.execute(threadId);
838 }
catch (InterruptedException ex) {
841 if (Thread.currentThread().isInterrupted()) {
880 static final class IngestThreadActivitySnapshot {
882 private final long threadId;
883 private final Date startTime;
884 private final String activity;
885 private final String dataSourceName;
886 private final String fileName;
887 private final long jobId;
896 IngestThreadActivitySnapshot(
long threadId) {
897 this.threadId = threadId;
898 startTime =
new Date();
899 this.activity = NbBundle.getMessage(this.getClass(),
"IngestManager.IngestThreadActivitySnapshot.idleThread");
900 this.dataSourceName =
"";
915 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource) {
916 this.threadId = threadId;
918 startTime =
new Date();
919 this.activity = activity;
920 this.dataSourceName = dataSource.getName();
936 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource, AbstractFile file) {
937 this.threadId = threadId;
939 startTime =
new Date();
940 this.activity = activity;
941 this.dataSourceName = dataSource.getName();
942 this.fileName = file.getName();
950 long getIngestJobId() {
968 Date getStartTime() {
977 String getActivity() {
988 String getDataSourceName() {
989 return dataSourceName;
997 String getFileName() {
1098 super(message, cause);
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
final Map< Long, Future< Void > > startIngestJobFutures
String getServiceStatus(String service)
void removeIngestModuleEventListener(final PropertyChangeListener listener)
static final String INGEST_MODULE_EVENT_CHANNEL_NAME
IngestManagerException(String message, Throwable cause)
static synchronized IngestManager getInstance()
static IngestManager instance
IngestJobStartResult startIngestJob(IngestJob job)
final ExecutorService dataSourceLevelIngestJobTasksExecutor
static boolean runningWithGUI
void cancelAllIngestJobs()
void publish(AutopsyEvent event)
static void addPropertyChangeListener(final PropertyChangeListener listener)
IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ExecutorService eventPublishingExecutor
void clearIngestMessageBox()
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final Logger LOGGER
void subscribeToServiceMonitorEvents()
boolean isIngestRunning()
DATA_SOURCE_ANALYSIS_COMPLETED
static void removePropertyChangeListener(final PropertyChangeListener listener)
static final Set< String > INGEST_MODULE_EVENT_NAMES
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
final AutopsyEventPublisher publisher
synchronized void closeRemoteEventChannel()
final ServicesMonitor servicesMonitor
void removeIngestJobEventListener(final PropertyChangeListener listener)
static final String INGEST_JOB_EVENT_CHANNEL_NAME
static final Set< String > INGEST_JOB_EVENT_NAMES
final AutopsyEventPublisher moduleEventPublisher
static int numberOfFileIngestThreads()
synchronized void openRemoteEventChannel(String channelName)
void addIngestJobEventListener(final PropertyChangeListener listener)
final Object ingestMessageBoxLock
IngestManagerException(String message)
final IngestTaskQueue tasks
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final int MAX_ERROR_MESSAGE_POSTS
final AtomicLong ingestErrorMessagePosts
volatile boolean caseIsOpen
final AtomicLong nextIngestManagerTaskId
int getNumberOfFileIngestThreads()
void addIngestModuleEventListener(final PropertyChangeListener listener)
static Case getCurrentCase()
synchronized static Logger getLogger(String name)
DATA_SOURCE_ANALYSIS_STARTED
void incrementModuleRunTime(String moduleDisplayName, Long duration)
static void addEventTypeSubscriber(Set< Events > eventTypes, PropertyChangeListener subscriber)
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
final IngestMonitor ingestMonitor
final ExecutorService fileLevelIngestJobTasksExecutor
static final long serialVersionUID
final Map< Long, IngestJob > ingestJobsById
void subscribeToCaseEvents()
final ExecutorService startIngestJobsExecutor
final int numberOfFileIngestThreads
final AutopsyEventPublisher jobEventPublisher