19 package org.sleuthkit.autopsy.ingest;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.io.Serializable;
26 import java.lang.reflect.InvocationTargetException;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.Date;
31 import java.util.EnumSet;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.atomic.AtomicLong;
43 import java.util.logging.Level;
44 import java.util.stream.Collectors;
45 import java.util.stream.Stream;
46 import javax.annotation.concurrent.GuardedBy;
47 import javax.annotation.concurrent.Immutable;
48 import javax.annotation.concurrent.ThreadSafe;
49 import javax.swing.JOptionPane;
50 import javax.swing.SwingUtilities;
51 import org.netbeans.api.progress.ProgressHandle;
52 import org.openide.util.Cancellable;
53 import org.openide.util.NbBundle;
54 import org.openide.windows.WindowManager;
123 @GuardedBy(
"IngestManager.class")
127 private final ExecutorService
startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build());
134 private final ExecutorService
eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build());
153 if (null == instance) {
155 instance.subscribeToServiceMonitorEvents();
156 instance.subscribeToCaseEvents();
194 PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> {
209 logger.log(Level.SEVERE,
"Service {0} is down, cancelling all running ingest jobs", serviceDisplayName);
211 EventQueue.invokeLater(
new Runnable() {
214 JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
215 NbBundle.getMessage(this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
216 NbBundle.getMessage(this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
217 JOptionPane.ERROR_MESSAGE);
231 Set<String> servicesList =
new HashSet<>();
243 if (event.getNewValue() != null) {
258 void handleCaseOpened() {
263 String channelPrefix = openedCase.
getName();
268 }
catch (NoCurrentCaseException | AutopsyEventException ex) {
269 logger.log(Level.SEVERE,
"Failed to open remote events channel", ex);
270 MessageNotifyUtil.Notify.error(NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.Title"),
271 NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.ErrMsg"));
283 void handleCaseClosed() {
308 IngestJobInputStream stream =
new IngestJobInputStream(job);
309 if (stream.getIngestJobStartResult().getJob() != null) {
311 }
else if (stream.getIngestJobStartResult().getModuleErrors().isEmpty()) {
312 for (
IngestModuleError error : stream.getIngestJobStartResult().getModuleErrors()) {
313 logger.log(Level.SEVERE, String.format(
"%s ingest module startup error for %s", error.getModuleDisplayName(), dataSource.getName()), error.getThrowable());
315 throw new TskCoreException(
"Error starting ingest modules");
317 throw new TskCoreException(
"Error starting ingest modules", stream.getIngestJobStartResult().getStartupException());
340 if (job.hasIngestPipeline()) {
361 if (job.hasIngestPipeline()) {
383 if (job.hasIngestPipeline()) {
384 return startIngestJob(job);
400 "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
401 "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
402 "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
403 "IngestManager.startupErr.dlgErrorList=Errors:"
405 IngestJobStartResult startIngestJob(IngestJob job) {
409 if (SwingUtilities.isEventDispatchThread()) {
410 initIngestMessageInbox();
413 SwingUtilities.invokeAndWait(() -> initIngestMessageInbox());
414 }
catch (InterruptedException ex) {
416 }
catch (InvocationTargetException ex) {
417 logger.log(Level.WARNING,
"There was an error starting ingest message inbox", ex);
421 List<IngestModuleError> errors = null;
424 openCase = Case.getCurrentCaseThrows();
425 }
catch (NoCurrentCaseException ex) {
426 return new IngestJobStartResult(null,
new IngestManagerException(
"Exception while getting open case.", ex), Collections.<IngestModuleError>emptyList());
428 if (openCase.getCaseType() == Case.CaseType.MULTI_USER_CASE) {
431 if (RuntimeProperties.runningWithGUI()) {
432 EventQueue.invokeLater(
new Runnable() {
435 String serviceDisplayName = ServicesMonitor.Service.REMOTE_CASE_DATABASE.getDisplayName();
436 JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
437 NbBundle.getMessage(this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
438 NbBundle.getMessage(this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
439 JOptionPane.ERROR_MESSAGE);
443 return new IngestJobStartResult(null,
new IngestManagerException(
"Ingest aborted. Remote database is down"), Collections.<IngestModuleError>emptyList());
445 }
catch (ServicesMonitor.ServicesMonitorException ex) {
446 return new IngestJobStartResult(null,
new IngestManagerException(
"Database server is down", ex), Collections.<IngestModuleError>emptyList());
457 IngestManager.logger.log(Level.INFO,
"Starting ingest job {0}", job.getId());
458 errors = job.start();
459 if (errors.isEmpty()) {
460 this.fireIngestJobStarted(job.getId());
465 for (IngestModuleError error : errors) {
466 logger.log(Level.SEVERE, String.format(
"Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.getId()), error.getThrowable());
468 IngestManager.logger.log(Level.SEVERE,
"Ingest job {0} could not be started", job.getId());
469 if (RuntimeProperties.runningWithGUI()) {
470 final StringBuilder message =
new StringBuilder(1024);
471 message.append(Bundle.IngestManager_startupErr_dlgMsg()).append(
"\n");
472 message.append(Bundle.IngestManager_startupErr_dlgSolution()).append(
"\n\n");
473 message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append(
"\n");
474 for (IngestModuleError error : errors) {
475 String moduleName = error.getModuleDisplayName();
476 String errorMessage = error.getThrowable().getLocalizedMessage();
477 message.append(moduleName).append(
": ").append(errorMessage).append(
"\n");
479 message.append(
"\n\n");
480 EventQueue.invokeLater(() -> {
481 JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(), message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
484 return new IngestJobStartResult(null,
new IngestManagerException(
"Errors occurred while starting ingest"), errors);
487 return new IngestJobStartResult(job, null, errors);
495 void finishIngestJob(IngestJob job) {
496 long jobId = job.getId();
500 if (!job.isCancelled()) {
501 IngestManager.logger.log(Level.INFO,
"Ingest job {0} completed", jobId);
502 fireIngestJobCompleted(jobId);
504 IngestManager.logger.log(Level.INFO,
"Ingest job {0} cancelled", jobId);
505 fireIngestJobCancelled(jobId);
628 void fireIngestJobStarted(
long ingestJobId) {
638 void fireIngestJobCompleted(
long ingestJobId) {
639 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
648 void fireIngestJobCancelled(
long ingestJobId) {
649 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
661 void fireDataSourceAnalysisStarted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
662 AutopsyEvent
event =
new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
674 void fireDataSourceAnalysisCompleted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
675 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
687 void fireDataSourceAnalysisCancelled(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
688 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
698 void fireFileIngestDone(AbstractFile file) {
699 AutopsyEvent
event =
new FileAnalyzedEvent(file);
710 void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
711 AutopsyEvent
event =
new BlackboardPostEvent(moduleDataEvent);
722 void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
723 AutopsyEvent
event =
new ContentChangedEvent(moduleContentEvent);
735 void initIngestMessageInbox() {
746 void postIngestMessage(IngestMessage message) {
749 if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
753 if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
755 }
else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
756 IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
757 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
758 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
759 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.msg",
MAX_ERROR_MESSAGE_POSTS));
787 void setIngestTaskProgress(DataSourceIngestTask task, String currentModuleName) {
789 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), currentModuleName, task.getDataSource());
796 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
807 void setIngestTaskProgress(FileIngestTask task, String currentModuleName) {
809 IngestThreadActivitySnapshot newSnap;
811 newSnap =
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), currentModuleName, task.getDataSource(), task.getFile());
812 }
catch (TskCoreException ex) {
813 logger.log(Level.SEVERE,
"Error getting file from file ingest task", ex);
814 newSnap =
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), currentModuleName, task.getDataSource());
822 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
830 void setIngestTaskProgressCompleted(IngestTask task) {
832 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId());
839 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
848 void incrementModuleRunTime(String moduleDisplayName, Long duration) {
849 if (moduleDisplayName.equals(
"IDLE")) {
856 if (prevTimeL != null) {
857 prevTime = prevTimeL;
859 prevTime += duration;
895 List<Snapshot> snapShots =
new ArrayList<>();
898 snapShots.addAll(job.getDataSourceIngestJobSnapshots());
910 long getFreeDiskSpace() {
935 if (Thread.currentThread().isInterrupted()) {
943 final String displayName = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.displayName");
944 this.progress = ProgressHandle.createHandle(displayName,
new Cancellable() {
946 public boolean cancel() {
947 if (progress != null) {
948 progress.setDisplayName(NbBundle.getMessage(
this.getClass(),
"IngestManager.StartIngestJobsTask.run.cancelling", displayName));
964 if (null != progress) {
981 private final BlockingIngestTaskQueue
tasks;
992 IngestTask task = tasks.getNextTask();
993 task.execute(threadId);
994 }
catch (InterruptedException ex) {
997 if (Thread.currentThread().isInterrupted()) {
1056 startTime =
new Date();
1057 this.activity = NbBundle.getMessage(this.getClass(),
"IngestManager.IngestThreadActivitySnapshot.idleThread");
1058 this.dataSourceName =
"";
1076 startTime =
new Date();
1078 this.dataSourceName = dataSource.getName();
1094 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource, AbstractFile file) {
1097 startTime =
new Date();
1099 this.dataSourceName = dataSource.getName();
1100 this.fileName = file.getName();
1108 long getIngestJobId() {
1117 long getThreadId() {
1126 Date getStartTime() {
1135 String getActivity() {
1146 String getDataSourceName() {
1155 String getFileName() {
1237 private static final long serialVersionUID = 1L;
1255 super(message, cause);
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
void addIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final Map< Long, Future< Void > > startIngestJobFutures
String getServiceStatus(String service)
void removeIngestModuleEventListener(final PropertyChangeListener listener)
IngestStream openIngestStream(DataSource dataSource, IngestJobSettings settings)
static final String INGEST_MODULE_EVENT_CHANNEL_NAME
List< IngestThreadActivitySnapshot > getIngestThreadActivitySnapshots()
void queueIngestJob(Content dataSource, List< AbstractFile > files, IngestJobSettings settings)
IngestManagerException(String message, Throwable cause)
static synchronized IngestManager getInstance()
static IngestManager instance
final String dataSourceName
void removeIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final ExecutorService dataSourceLevelIngestJobTasksExecutor
static boolean runningWithGUI
void cancelAllIngestJobs()
void publish(AutopsyEvent event)
static void addPropertyChangeListener(final PropertyChangeListener listener)
IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
static final Logger logger
final ExecutorService eventPublishingExecutor
void clearIngestMessageBox()
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void subscribeToServiceMonitorEvents()
boolean isIngestRunning()
DATA_SOURCE_ANALYSIS_COMPLETED
static void removePropertyChangeListener(final PropertyChangeListener listener)
static final Set< String > INGEST_MODULE_EVENT_NAMES
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
List< Snapshot > getIngestJobSnapshots()
final AutopsyEventPublisher publisher
synchronized void closeRemoteEventChannel()
Map< String, Long > getModuleRunTimes()
final ServicesMonitor servicesMonitor
void removeIngestJobEventListener(final PropertyChangeListener listener)
void addIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
final BlockingIngestTaskQueue tasks
static final String INGEST_JOB_EVENT_CHANNEL_NAME
static final Set< String > INGEST_JOB_EVENT_NAMES
final AutopsyEventPublisher moduleEventPublisher
static int numberOfFileIngestThreads()
synchronized void openRemoteEventChannel(String channelName)
void addIngestJobEventListener(final PropertyChangeListener listener)
final Object ingestMessageBoxLock
IngestManagerException(String message)
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final int MAX_ERROR_MESSAGE_POSTS
final AtomicLong ingestErrorMessagePosts
volatile boolean caseIsOpen
final AtomicLong nextIngestManagerTaskId
int getNumberOfFileIngestThreads()
void addIngestModuleEventListener(final PropertyChangeListener listener)
synchronized static Logger getLogger(String name)
DATA_SOURCE_ANALYSIS_STARTED
static Case getCurrentCaseThrows()
static void addEventTypeSubscriber(Set< Events > eventTypes, PropertyChangeListener subscriber)
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
final IngestMonitor ingestMonitor
final ExecutorService fileLevelIngestJobTasksExecutor
final Map< Long, IngestJob > ingestJobsById
void subscribeToCaseEvents()
final ExecutorService startIngestJobsExecutor
void removeIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
final int numberOfFileIngestThreads
static final long serialVersionUID
final AutopsyEventPublisher jobEventPublisher