19 package org.sleuthkit.autopsy.ingest;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Date;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.logging.Level;
41 import java.util.stream.Collectors;
42 import java.util.stream.Stream;
43 import javax.swing.JOptionPane;
44 import org.netbeans.api.progress.ProgressHandle;
45 import org.openide.util.Cancellable;
46 import org.openide.util.NbBundle;
111 .map(IngestJobEvent::toString)
112 .collect(Collectors.toSet());
114 .map(IngestModuleEvent::toString)
115 .collect(Collectors.toSet());
235 if (instance == null) {
256 this.ingestModuleRunTimes =
new ConcurrentHashMap<>();
257 this.ingestThreadActivitySnapshots =
new ConcurrentHashMap<>();
258 this.ingestErrorMessagePosts =
new AtomicLong(0L);
260 this.eventPublishingExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-ingest-events-%d").build());
263 this.dataSourceIngestThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-data-source-ingest-%d").build());
264 this.startIngestJobsThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-start-ingest-jobs-%d").build());
265 this.nextThreadId =
new AtomicLong(0L);
266 this.jobsById =
new HashMap<>();
267 this.startIngestJobTasks =
new ConcurrentHashMap<>();
275 if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) {
279 fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads,
new ThreadFactoryBuilder().setNameFormat(
"IM-file-ingest-%d").build());
290 long threadId = nextThreadId.incrementAndGet();
291 dataSourceIngestThreadPool.submit(
new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
292 ingestThreadActivitySnapshots.put(threadId,
new IngestThreadActivitySnapshot(threadId));
300 long threadId = nextThreadId.incrementAndGet();
301 fileIngestThreadPool.submit(
new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
302 ingestThreadActivitySnapshots.put(threadId,
new IngestThreadActivitySnapshot(threadId));
311 public void propertyChange(PropertyChangeEvent event) {
312 if (event.getNewValue() != null) {
326 PropertyChangeListener propChangeListener =
new PropertyChangeListener() {
328 public void propertyChange(PropertyChangeEvent evt) {
336 }
catch (IllegalStateException ignore) {
343 logger.log(Level.SEVERE,
"Service {0} is down! Cancelling all running ingest jobs", serviceDisplayName);
347 EventQueue.invokeLater(
new Runnable() {
350 JOptionPane.showMessageDialog(null,
351 NbBundle.getMessage(
this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
352 NbBundle.getMessage(
this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
353 JOptionPane.ERROR_MESSAGE);
365 Set<String> servicesList =
new HashSet<>();
368 this.servicesMonitor.
addSubscriber(servicesList, propChangeListener);
371 synchronized void handleCaseOpened() {
372 this.jobCreationIsEnabled =
true;
387 }
catch (IllegalStateException | AutopsyEventException ex) {
388 logger.log(Level.SEVERE,
"Failed to open remote events channel", ex);
389 MessageNotifyUtil.Notify.error(NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.Title"),
390 NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.ErrMsg"));
394 synchronized void handleCaseClosed() {
397 this.jobCreationIsEnabled =
false;
418 void initIngestMessageInbox() {
420 ingestMessageBox = IngestMessageTopComponent.findInstance();
429 void postIngestMessage(IngestMessage message) {
431 if (ingestMessageBox != null && RuntimeProperties.coreComponentsAreActive()) {
432 if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
433 ingestMessageBox.displayMessage(message);
435 long errorPosts = ingestErrorMessagePosts.incrementAndGet();
436 if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
437 ingestMessageBox.displayMessage(message);
438 }
else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
439 IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
440 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
441 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
442 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.msg",
MAX_ERROR_MESSAGE_POSTS));
443 ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
452 if (ingestMessageBox != null) {
453 ingestMessageBox.clearMessages();
455 ingestErrorMessagePosts.set(0);
477 if (jobCreationIsEnabled) {
479 if (job.hasIngestPipeline()) {
480 long taskId = nextThreadId.incrementAndGet();
481 Future<Void> task = startIngestJobsThreadPool.submit(
new StartIngestJobTask(taskId, job));
482 startIngestJobTasks.put(taskId, task);
499 if (this.jobCreationIsEnabled) {
501 if (job.hasIngestPipeline()) {
533 "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
534 "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
535 "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
536 "IngestManager.startupErr.dlgErrorList=Errors:"
539 List<IngestModuleError> errors = null;
540 if (this.jobCreationIsEnabled) {
547 EventQueue.invokeLater(
new Runnable() {
551 JOptionPane.showMessageDialog(null,
552 NbBundle.getMessage(
this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
553 NbBundle.getMessage(
this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
554 JOptionPane.ERROR_MESSAGE);
566 if (!ingestMonitor.isRunning()) {
567 ingestMonitor.start();
571 jobsById.put(job.
getId(), job);
573 errors = job.start();
574 if (errors.isEmpty()) {
575 this.fireIngestJobStarted(job.
getId());
579 this.jobsById.remove(job.
getId());
582 logger.log(Level.SEVERE, String.format(
"Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.
getId()), error.getThrowable());
586 final StringBuilder message =
new StringBuilder();
587 message.append(Bundle.IngestManager_startupErr_dlgMsg()).append(
"\n");
588 message.append(Bundle.IngestManager_startupErr_dlgSolution()).append(
"\n\n");
589 message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append(
"\n");
591 String moduleName = error.getModuleDisplayName();
592 String errorMessage = error.getThrowable().getLocalizedMessage();
593 message.append(moduleName).append(
": ").append(errorMessage).append(
"\n");
595 message.append(
"\n\n");
596 EventQueue.invokeLater(() -> {
597 JOptionPane.showMessageDialog(null, message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
607 synchronized void finishIngestJob(
IngestJob job) {
608 long jobId = job.
getId();
610 jobsById.remove(jobId);
613 IngestManager.logger.log(Level.INFO,
"Ingest job {0} completed", jobId);
614 fireIngestJobCompleted(jobId);
616 IngestManager.logger.log(Level.INFO,
"Ingest job {0} cancelled", jobId);
617 fireIngestJobCancelled(jobId);
628 return !jobsById.isEmpty();
652 for (Future<Void> handle : startIngestJobTasks.values()) {
660 for (
IngestJob job : this.jobsById.values()) {
690 moduleEventPublisher.
addSubscriber(moduleEventNames, listener);
735 void fireIngestJobStarted(
long ingestJobId) {
737 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
745 void fireIngestJobCompleted(
long ingestJobId) {
746 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
747 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
755 void fireIngestJobCancelled(
long ingestJobId) {
756 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
757 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
767 void fireDataSourceAnalysisStarted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
768 AutopsyEvent
event =
new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
769 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
779 void fireDataSourceAnalysisCompleted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
780 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
781 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
791 void fireDataSourceAnalysisCancelled(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
792 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
793 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
801 void fireFileIngestDone(AbstractFile file) {
802 AutopsyEvent
event =
new FileAnalyzedEvent(file);
803 eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
811 void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
812 AutopsyEvent
event =
new BlackboardPostEvent(moduleDataEvent);
813 eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
823 void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
824 AutopsyEvent
event =
new ContentChangedEvent(moduleContentEvent);
825 eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
834 void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
835 ingestThreadActivitySnapshots.put(task.getThreadId(),
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
844 void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
845 IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
846 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
847 ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
849 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
857 void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
858 ingestThreadActivitySnapshots.put(task.getThreadId(),
new IngestThreadActivitySnapshot(task.getThreadId()));
866 void setIngestTaskProgressCompleted(FileIngestTask task) {
867 IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
868 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId());
869 ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
870 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
880 if (moduleName.equals(
"IDLE")) {
885 Long prevTimeL = ingestModuleRunTimes.get(moduleName);
887 if (prevTimeL != null) {
888 prevTime = prevTimeL;
890 prevTime += duration;
891 ingestModuleRunTimes.put(moduleName, prevTime);
900 Map<String, Long> getModuleRunTimes() {
912 List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
913 return new ArrayList<>(ingestThreadActivitySnapshots.values());
921 List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
922 List<DataSourceIngestJob.Snapshot> snapShots =
new ArrayList<>();
924 for (IngestJob job : jobsById.values()) {
925 snapShots.addAll(job.getDataSourceIngestJobSnapshots());
937 long getFreeDiskSpace() {
938 if (ingestMonitor != null) {
939 return ingestMonitor.getFreeSpace();
962 if (Thread.currentThread().isInterrupted()) {
964 jobsById.remove(job.
getId());
970 final String displayName = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.displayName");
971 this.progress = ProgressHandle.createHandle(displayName,
new Cancellable() {
973 public boolean cancel() {
974 if (progress != null) {
975 progress.setDisplayName(NbBundle.getMessage(
this.getClass(),
"IngestManager.StartIngestJobsTask.run.cancelling", displayName));
977 Future<?> handle = startIngestJobTasks.remove(threadId);
989 if (null != progress) {
992 startIngestJobTasks.remove(threadId);
1015 IngestTask task = tasks.getNextTask();
1016 task.execute(threadId);
1017 }
catch (InterruptedException ex) {
1020 if (Thread.currentThread().isInterrupted()) {
1054 static final class IngestThreadActivitySnapshot {
1056 private final long threadId;
1057 private final Date startTime;
1058 private final String activity;
1059 private final String dataSourceName;
1060 private final String fileName;
1061 private final long jobId;
1064 IngestThreadActivitySnapshot(
long threadId) {
1065 this.threadId = threadId;
1066 startTime =
new Date();
1067 this.activity = NbBundle.getMessage(this.getClass(),
"IngestManager.IngestThreadActivitySnapshot.idleThread");
1068 this.dataSourceName =
"";
1074 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource) {
1075 this.threadId = threadId;
1077 startTime =
new Date();
1078 this.activity = activity;
1079 this.dataSourceName = dataSource.getName();
1084 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource, AbstractFile file) {
1085 this.threadId = threadId;
1087 startTime =
new Date();
1088 this.activity = activity;
1089 this.dataSourceName = dataSource.getName();
1090 this.fileName = file.getName();
1097 long getThreadId() {
1101 Date getStartTime() {
1105 String getActivity() {
1109 String getDataSourceName() {
1110 return dataSourceName;
1113 String getFileName() {
1142 super(message, cause);
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
String getServiceStatus(String service)
void removeIngestModuleEventListener(final PropertyChangeListener listener)
static final int MIN_NUMBER_OF_FILE_INGEST_THREADS
IngestManagerException(String message, Throwable cause)
static synchronized IngestManager getInstance()
static IngestManager instance
IngestJobStartResult startIngestJob(IngestJob job)
final ExecutorService startIngestJobsThreadPool
void cancelAllIngestJobs()
final Map< Long, Future< Void > > startIngestJobTasks
void publish(AutopsyEvent event)
static void addPropertyChangeListener(final PropertyChangeListener listener)
static void setNumberOfFileIngestThreads(int value)
static final Logger logger
final ExecutorService eventPublishingExecutor
void clearIngestMessageBox()
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void subscribeToServiceMonitorEvents()
boolean isIngestRunning()
static void setCoreComponentsActive(boolean coreComponentsActive)
DATA_SOURCE_ANALYSIS_COMPLETED
static void removePropertyChangeListener(final PropertyChangeListener listener)
volatile IngestMessageTopComponent ingestMessageBox
final IngestTaskQueue tasks
void addSubscriber(PropertyChangeListener subscriber)
static synchronized ServicesMonitor getInstance()
final AutopsyEventPublisher publisher
int numberOfFileIngestThreads
final ServicesMonitor servicesMonitor
void removeIngestJobEventListener(final PropertyChangeListener listener)
final ExecutorService fileIngestThreadPool
static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS
static int numberOfFileIngestThreads()
void incrementModuleRunTime(String moduleName, Long duration)
void addIngestJobEventListener(final PropertyChangeListener listener)
final Object ingestMessageBoxLock
void startFileIngestThread()
void openRemoteEventChannel(String channelName)
static boolean coreComponentsAreActive()
final Map< Long, IngestJob > jobsById
AutopsyEventPublisher jobEventPublisher
IngestManagerException(String message)
void closeRemoteEventChannel()
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ExecutorService dataSourceIngestThreadPool
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final String MODULE_EVENT_CHANNEL_NAME
volatile boolean jobCreationIsEnabled
static final int MAX_ERROR_MESSAGE_POSTS
final AtomicLong ingestErrorMessagePosts
void startDataSourceIngestThread()
static final Set< String > jobEventNames
int getNumberOfFileIngestThreads()
static final Set< String > moduleEventNames
void addIngestModuleEventListener(final PropertyChangeListener listener)
static final String JOB_EVENT_CHANNEL_NAME
static Case getCurrentCase()
synchronized static Logger getLogger(String name)
DATA_SOURCE_ANALYSIS_STARTED
static final int MAX_NUMBER_OF_FILE_INGEST_THREADS
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
AutopsyEventPublisher moduleEventPublisher
final IngestMonitor ingestMonitor
static final long serialVersionUID
synchronized IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final AtomicLong nextThreadId
static boolean isCaseOpen()
String getTextIndexName()
void subscribeToCaseEvents()
synchronized void setRunInteractively(boolean runInteractively)
static void addEventSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)