19 package org.sleuthkit.autopsy.ingest;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Date;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.logging.Level;
41 import java.util.stream.Collectors;
42 import java.util.stream.Stream;
43 import javax.swing.JOptionPane;
44 import org.netbeans.api.progress.ProgressHandle;
45 import org.openide.util.Cancellable;
46 import org.openide.util.NbBundle;
111 .map(IngestJobEvent::toString)
112 .collect(Collectors.toSet());
114 .map(IngestModuleEvent::toString)
115 .collect(Collectors.toSet());
235 if (instance == null) {
256 this.ingestModuleRunTimes =
new ConcurrentHashMap<>();
257 this.ingestThreadActivitySnapshots =
new ConcurrentHashMap<>();
258 this.ingestErrorMessagePosts =
new AtomicLong(0L);
260 this.eventPublishingExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-ingest-events-%d").build());
263 this.dataSourceIngestThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-data-source-ingest-%d").build());
264 this.startIngestJobsThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-start-ingest-jobs-%d").build());
265 this.nextThreadId =
new AtomicLong(0L);
266 this.jobsById =
new HashMap<>();
267 this.startIngestJobTasks =
new ConcurrentHashMap<>();
275 if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) {
279 fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads,
new ThreadFactoryBuilder().setNameFormat(
"IM-file-ingest-%d").build());
290 long threadId = nextThreadId.incrementAndGet();
291 dataSourceIngestThreadPool.submit(
new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
292 ingestThreadActivitySnapshots.put(threadId,
new IngestThreadActivitySnapshot(threadId));
300 long threadId = nextThreadId.incrementAndGet();
301 fileIngestThreadPool.submit(
new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
302 ingestThreadActivitySnapshots.put(threadId,
new IngestThreadActivitySnapshot(threadId));
311 public void propertyChange(PropertyChangeEvent event) {
312 if (event.getNewValue() != null) {
326 PropertyChangeListener propChangeListener =
new PropertyChangeListener() {
328 public void propertyChange(PropertyChangeEvent evt) {
336 }
catch (IllegalStateException ignore) {
343 logger.log(Level.SEVERE,
"Service {0} is down! Cancelling all running ingest jobs", serviceDisplayName);
347 EventQueue.invokeLater(
new Runnable() {
350 JOptionPane.showMessageDialog(null,
351 NbBundle.getMessage(
this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
352 NbBundle.getMessage(
this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
353 JOptionPane.ERROR_MESSAGE);
365 Set<String> servicesList =
new HashSet<>();
368 this.servicesMonitor.
addSubscriber(servicesList, propChangeListener);
371 synchronized void handleCaseOpened() {
372 this.jobCreationIsEnabled =
true;
382 String channelPrefix = openedCase.
getName();
387 }
catch (IllegalStateException | AutopsyEventException ex) {
388 logger.log(Level.SEVERE,
"Failed to open remote events channel", ex);
389 MessageNotifyUtil.Notify.error(NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.Title"),
390 NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.ErrMsg"));
394 synchronized void handleCaseClosed() {
402 this.jobCreationIsEnabled =
false;
411 void initIngestMessageInbox() {
413 ingestMessageBox = IngestMessageTopComponent.findInstance();
422 void postIngestMessage(IngestMessage message) {
424 if (ingestMessageBox != null && RuntimeProperties.runningWithGUI()) {
425 if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
426 ingestMessageBox.displayMessage(message);
428 long errorPosts = ingestErrorMessagePosts.incrementAndGet();
429 if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
430 ingestMessageBox.displayMessage(message);
431 }
else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
432 IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
433 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
434 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
435 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.msg",
MAX_ERROR_MESSAGE_POSTS));
436 ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
445 if (ingestMessageBox != null) {
446 ingestMessageBox.clearMessages();
448 ingestErrorMessagePosts.set(0);
470 if (jobCreationIsEnabled) {
472 if (job.hasIngestPipeline()) {
473 long taskId = nextThreadId.incrementAndGet();
474 Future<Void> task = startIngestJobsThreadPool.submit(
new StartIngestJobTask(taskId, job));
475 startIngestJobTasks.put(taskId, task);
492 if (this.jobCreationIsEnabled) {
494 if (job.hasIngestPipeline()) {
526 "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
527 "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
528 "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
529 "IngestManager.startupErr.dlgErrorList=Errors:"
532 List<IngestModuleError> errors = null;
533 if (this.jobCreationIsEnabled) {
540 EventQueue.invokeLater(
new Runnable() {
544 JOptionPane.showMessageDialog(null,
545 NbBundle.getMessage(
this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
546 NbBundle.getMessage(
this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
547 JOptionPane.ERROR_MESSAGE);
559 if (!ingestMonitor.isRunning()) {
560 ingestMonitor.start();
564 jobsById.put(job.
getId(), job);
566 errors = job.start();
567 if (errors.isEmpty()) {
568 this.fireIngestJobStarted(job.
getId());
572 this.jobsById.remove(job.
getId());
575 logger.log(Level.SEVERE, String.format(
"Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.
getId()), error.getThrowable());
579 final StringBuilder message =
new StringBuilder();
580 message.append(Bundle.IngestManager_startupErr_dlgMsg()).append(
"\n");
581 message.append(Bundle.IngestManager_startupErr_dlgSolution()).append(
"\n\n");
582 message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append(
"\n");
584 String moduleName = error.getModuleDisplayName();
585 String errorMessage = error.getThrowable().getLocalizedMessage();
586 message.append(moduleName).append(
": ").append(errorMessage).append(
"\n");
588 message.append(
"\n\n");
589 EventQueue.invokeLater(() -> {
590 JOptionPane.showMessageDialog(null, message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
600 synchronized void finishIngestJob(
IngestJob job) {
601 long jobId = job.
getId();
603 jobsById.remove(jobId);
606 IngestManager.logger.log(Level.INFO,
"Ingest job {0} completed", jobId);
607 fireIngestJobCompleted(jobId);
609 IngestManager.logger.log(Level.INFO,
"Ingest job {0} cancelled", jobId);
610 fireIngestJobCancelled(jobId);
621 return !jobsById.isEmpty();
645 for (Future<Void> handle : startIngestJobTasks.values()) {
653 for (
IngestJob job : this.jobsById.values()) {
683 moduleEventPublisher.
addSubscriber(moduleEventNames, listener);
728 void fireIngestJobStarted(
long ingestJobId) {
730 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
738 void fireIngestJobCompleted(
long ingestJobId) {
739 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
740 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
748 void fireIngestJobCancelled(
long ingestJobId) {
749 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
750 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
760 void fireDataSourceAnalysisStarted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
761 AutopsyEvent
event =
new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
762 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
772 void fireDataSourceAnalysisCompleted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
773 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
774 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
784 void fireDataSourceAnalysisCancelled(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
785 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
786 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
794 void fireFileIngestDone(AbstractFile file) {
795 AutopsyEvent
event =
new FileAnalyzedEvent(file);
796 eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
804 void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
805 AutopsyEvent
event =
new BlackboardPostEvent(moduleDataEvent);
806 eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
816 void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
817 AutopsyEvent
event =
new ContentChangedEvent(moduleContentEvent);
818 eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
827 void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
828 ingestThreadActivitySnapshots.put(task.getThreadId(),
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
837 void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
838 IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
839 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
840 ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
842 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
850 void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
851 ingestThreadActivitySnapshots.put(task.getThreadId(),
new IngestThreadActivitySnapshot(task.getThreadId()));
859 void setIngestTaskProgressCompleted(FileIngestTask task) {
860 IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
861 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId());
862 ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
863 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
873 if (moduleName.equals(
"IDLE")) {
878 Long prevTimeL = ingestModuleRunTimes.get(moduleName);
880 if (prevTimeL != null) {
881 prevTime = prevTimeL;
883 prevTime += duration;
884 ingestModuleRunTimes.put(moduleName, prevTime);
893 Map<String, Long> getModuleRunTimes() {
905 List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
906 return new ArrayList<>(ingestThreadActivitySnapshots.values());
914 List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
915 List<DataSourceIngestJob.Snapshot> snapShots =
new ArrayList<>();
917 for (IngestJob job : jobsById.values()) {
918 snapShots.addAll(job.getDataSourceIngestJobSnapshots());
930 long getFreeDiskSpace() {
931 if (ingestMonitor != null) {
932 return ingestMonitor.getFreeSpace();
955 if (Thread.currentThread().isInterrupted()) {
957 jobsById.remove(job.
getId());
963 final String displayName = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.displayName");
964 this.progress = ProgressHandle.createHandle(displayName,
new Cancellable() {
966 public boolean cancel() {
967 if (progress != null) {
968 progress.setDisplayName(NbBundle.getMessage(
this.getClass(),
"IngestManager.StartIngestJobsTask.run.cancelling", displayName));
970 Future<?> handle = startIngestJobTasks.remove(threadId);
982 if (null != progress) {
985 startIngestJobTasks.remove(threadId);
997 private final IngestTaskQueue
tasks;
1008 IngestTask task = tasks.getNextTask();
1009 task.execute(threadId);
1010 }
catch (InterruptedException ex) {
1013 if (Thread.currentThread().isInterrupted()) {
1047 static final class IngestThreadActivitySnapshot {
1049 private final long threadId;
1050 private final Date startTime;
1051 private final String activity;
1052 private final String dataSourceName;
1053 private final String fileName;
1054 private final long jobId;
1057 IngestThreadActivitySnapshot(
long threadId) {
1058 this.threadId = threadId;
1059 startTime =
new Date();
1060 this.activity = NbBundle.getMessage(this.getClass(),
"IngestManager.IngestThreadActivitySnapshot.idleThread");
1061 this.dataSourceName =
"";
1067 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource) {
1068 this.threadId = threadId;
1070 startTime =
new Date();
1071 this.activity = activity;
1072 this.dataSourceName = dataSource.getName();
1077 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource, AbstractFile file) {
1078 this.threadId = threadId;
1080 startTime =
new Date();
1081 this.activity = activity;
1082 this.dataSourceName = dataSource.getName();
1083 this.fileName = file.getName();
1090 long getThreadId() {
1094 Date getStartTime() {
1098 String getActivity() {
1102 String getDataSourceName() {
1103 return dataSourceName;
1106 String getFileName() {
1135 super(message, cause);
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
String getServiceStatus(String service)
void removeIngestModuleEventListener(final PropertyChangeListener listener)
static final int MIN_NUMBER_OF_FILE_INGEST_THREADS
IngestManagerException(String message, Throwable cause)
static synchronized IngestManager getInstance()
static IngestManager instance
IngestJobStartResult startIngestJob(IngestJob job)
static boolean runningWithGUI
final ExecutorService startIngestJobsThreadPool
void cancelAllIngestJobs()
final Map< Long, Future< Void > > startIngestJobTasks
void publish(AutopsyEvent event)
static void addPropertyChangeListener(final PropertyChangeListener listener)
static void setNumberOfFileIngestThreads(int value)
static final Logger logger
final ExecutorService eventPublishingExecutor
void clearIngestMessageBox()
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void subscribeToServiceMonitorEvents()
boolean isIngestRunning()
DATA_SOURCE_ANALYSIS_COMPLETED
static void removePropertyChangeListener(final PropertyChangeListener listener)
volatile IngestMessageTopComponent ingestMessageBox
final IngestTaskQueue tasks
void addSubscriber(PropertyChangeListener subscriber)
static synchronized ServicesMonitor getInstance()
final AutopsyEventPublisher publisher
int numberOfFileIngestThreads
final ServicesMonitor servicesMonitor
void removeIngestJobEventListener(final PropertyChangeListener listener)
final ExecutorService fileIngestThreadPool
static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS
static int numberOfFileIngestThreads()
void incrementModuleRunTime(String moduleName, Long duration)
void addIngestJobEventListener(final PropertyChangeListener listener)
final Object ingestMessageBoxLock
void startFileIngestThread()
void openRemoteEventChannel(String channelName)
final Map< Long, IngestJob > jobsById
AutopsyEventPublisher jobEventPublisher
IngestManagerException(String message)
void closeRemoteEventChannel()
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ExecutorService dataSourceIngestThreadPool
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final String MODULE_EVENT_CHANNEL_NAME
volatile boolean jobCreationIsEnabled
static final int MAX_ERROR_MESSAGE_POSTS
final AtomicLong ingestErrorMessagePosts
void startDataSourceIngestThread()
static final Set< String > jobEventNames
int getNumberOfFileIngestThreads()
static final Set< String > moduleEventNames
void addIngestModuleEventListener(final PropertyChangeListener listener)
static final String JOB_EVENT_CHANNEL_NAME
static Case getCurrentCase()
synchronized static Logger getLogger(String name)
DATA_SOURCE_ANALYSIS_STARTED
static final int MAX_NUMBER_OF_FILE_INGEST_THREADS
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
AutopsyEventPublisher moduleEventPublisher
final IngestMonitor ingestMonitor
static final long serialVersionUID
synchronized IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final AtomicLong nextThreadId
void subscribeToCaseEvents()
static void addEventSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)