19 package org.sleuthkit.autopsy.ingest;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.List;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.atomic.AtomicLong;
39 import java.util.logging.Level;
40 import java.util.stream.Collectors;
41 import java.util.stream.Stream;
42 import javax.swing.JOptionPane;
43 import org.netbeans.api.progress.ProgressHandle;
44 import org.netbeans.api.progress.ProgressHandleFactory;
45 import org.openide.util.Cancellable;
46 import org.openide.util.NbBundle;
111 .map(IngestJobEvent::toString)
112 .collect(Collectors.toSet());
114 .map(IngestModuleEvent::toString)
115 .collect(Collectors.toSet());
235 if (instance == null) {
256 this.ingestModuleRunTimes =
new ConcurrentHashMap<>();
257 this.ingestThreadActivitySnapshots =
new ConcurrentHashMap<>();
258 this.ingestErrorMessagePosts =
new AtomicLong(0L);
260 this.eventPublishingExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-ingest-events-%d").build());
263 this.dataSourceIngestThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-data-source-ingest-%d").build());
264 this.startIngestJobsThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-start-ingest-jobs-%d").build());
265 this.nextThreadId =
new AtomicLong(0L);
266 this.jobsById =
new HashMap<>();
267 this.startIngestJobTasks =
new ConcurrentHashMap<>();
275 if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) {
279 fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads,
new ThreadFactoryBuilder().setNameFormat(
"IM-file-ingest-%d").build());
290 long threadId = nextThreadId.incrementAndGet();
291 dataSourceIngestThreadPool.submit(
new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
292 ingestThreadActivitySnapshots.put(threadId,
new IngestThreadActivitySnapshot(threadId));
300 long threadId = nextThreadId.incrementAndGet();
301 fileIngestThreadPool.submit(
new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
302 ingestThreadActivitySnapshots.put(threadId,
new IngestThreadActivitySnapshot(threadId));
311 public void propertyChange(PropertyChangeEvent event) {
312 if (event.getNewValue() != null) {
326 PropertyChangeListener propChangeListener =
new PropertyChangeListener() {
328 public void propertyChange(PropertyChangeEvent evt) {
336 }
catch (IllegalStateException ignore) {
343 logger.log(Level.SEVERE,
"Service {0} is down! Cancelling all running ingest jobs", serviceDisplayName);
347 EventQueue.invokeLater(
new Runnable() {
350 JOptionPane.showMessageDialog(null,
351 NbBundle.getMessage(
this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
352 NbBundle.getMessage(
this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
353 JOptionPane.ERROR_MESSAGE);
365 Set<String> servicesList =
new HashSet<>();
368 this.servicesMonitor.
addSubscriber(servicesList, propChangeListener);
371 synchronized void handleCaseOpened() {
372 this.jobCreationIsEnabled =
true;
387 }
catch (IllegalStateException | AutopsyEventException ex) {
388 logger.log(Level.SEVERE,
"Failed to open remote events channel", ex);
389 MessageNotifyUtil.Notify.error(NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.Title"),
390 NbBundle.getMessage(
IngestManager.class,
"IngestManager.OpenEventChannel.Fail.ErrMsg"));
394 synchronized void handleCaseClosed() {
397 this.jobCreationIsEnabled =
false;
418 void initIngestMessageInbox() {
420 ingestMessageBox = IngestMessageTopComponent.findInstance();
429 void postIngestMessage(IngestMessage message) {
431 if (ingestMessageBox != null && RuntimeProperties.coreComponentsAreActive()) {
432 if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
433 ingestMessageBox.displayMessage(message);
435 long errorPosts = ingestErrorMessagePosts.incrementAndGet();
436 if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
437 ingestMessageBox.displayMessage(message);
438 }
else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
439 IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
440 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
441 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
442 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.msg",
MAX_ERROR_MESSAGE_POSTS));
443 ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
452 if (ingestMessageBox != null) {
453 ingestMessageBox.clearMessages();
455 ingestErrorMessagePosts.set(0);
477 if (jobCreationIsEnabled) {
479 if (job.hasIngestPipeline()) {
480 long taskId = nextThreadId.incrementAndGet();
481 Future<Void> task = startIngestJobsThreadPool.submit(
new StartIngestJobTask(taskId, job));
482 startIngestJobTasks.put(taskId, task);
496 if (this.jobCreationIsEnabled) {
498 if (job.hasIngestPipeline()) {
515 boolean success =
false;
516 if (this.jobCreationIsEnabled) {
523 EventQueue.invokeLater(
new Runnable() {
527 JOptionPane.showMessageDialog(null,
528 NbBundle.getMessage(
this.getClass(),
"IngestManager.cancellingIngest.msgDlg.text"),
529 NbBundle.getMessage(
this.getClass(),
"IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
530 JOptionPane.ERROR_MESSAGE);
542 if (!ingestMonitor.isRunning()) {
543 ingestMonitor.start();
547 jobsById.put(job.
getId(), job);
549 List<IngestModuleError> errors = job.start();
550 if (errors.isEmpty()) {
551 this.fireIngestJobStarted(job.
getId());
556 this.jobsById.remove(job.
getId());
558 for (IngestModuleError error : errors) {
559 logger.log(Level.SEVERE, String.format(
"Error starting %s ingest module", error.getModuleDisplayName()), error.getModuleError());
563 EventQueue.invokeLater(
new Runnable() {
567 StringBuilder moduleStartUpErrors =
new StringBuilder();
568 for (IngestModuleError error : errors) {
569 String moduleName = error.getModuleDisplayName();
570 moduleStartUpErrors.append(moduleName);
571 moduleStartUpErrors.append(
": ");
572 moduleStartUpErrors.append(error.getModuleError().getLocalizedMessage());
573 moduleStartUpErrors.append(
"\n");
575 StringBuilder notifyMessage =
new StringBuilder();
576 notifyMessage.append(NbBundle.getMessage(
this.getClass(),
577 "IngestManager.StartIngestJobsTask.run.startupErr.dlgMsg"));
578 notifyMessage.append(
"\n");
579 notifyMessage.append(NbBundle.getMessage(
this.getClass(),
580 "IngestManager.StartIngestJobsTask.run.startupErr.dlgSolution"));
581 notifyMessage.append(
"\n");
582 notifyMessage.append(NbBundle.getMessage(
this.getClass(),
583 "IngestManager.StartIngestJobsTask.run.startupErr.dlgErrorList",
584 moduleStartUpErrors.toString()));
585 notifyMessage.append(
"\n\n");
586 JOptionPane.showMessageDialog(null, notifyMessage.toString(),
587 NbBundle.getMessage(this.getClass(),
588 "IngestManager.StartIngestJobsTask.run.startupErr.dlgTitle"), JOptionPane.ERROR_MESSAGE);
597 synchronized void finishIngestJob(
IngestJob job) {
598 long jobId = job.
getId();
600 jobsById.remove(jobId);
603 IngestManager.logger.log(Level.INFO,
"Ingest job {0} completed", jobId);
604 fireIngestJobCompleted(jobId);
606 IngestManager.logger.log(Level.INFO,
"Ingest job {0} cancelled", jobId);
607 fireIngestJobCancelled(jobId);
618 return !jobsById.isEmpty();
642 for (Future<Void> handle : startIngestJobTasks.values()) {
650 for (
IngestJob job : this.jobsById.values()) {
680 moduleEventPublisher.
addSubscriber(moduleEventNames, listener);
725 void fireIngestJobStarted(
long ingestJobId) {
727 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
735 void fireIngestJobCompleted(
long ingestJobId) {
736 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
737 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
745 void fireIngestJobCancelled(
long ingestJobId) {
746 AutopsyEvent
event =
new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
747 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
757 void fireDataSourceAnalysisStarted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
758 AutopsyEvent
event =
new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
759 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
769 void fireDataSourceAnalysisCompleted(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
770 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
771 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
781 void fireDataSourceAnalysisCancelled(
long ingestJobId,
long dataSourceIngestJobId, Content dataSource) {
782 AutopsyEvent
event =
new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
783 eventPublishingExecutor.submit(
new PublishEventTask(event, jobEventPublisher));
791 void fireFileIngestDone(AbstractFile file) {
792 AutopsyEvent
event =
new FileAnalyzedEvent(file);
793 eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
801 void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
802 AutopsyEvent
event =
new BlackboardPostEvent(moduleDataEvent);
803 eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
813 void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
814 AutopsyEvent
event =
new ContentChangedEvent(moduleContentEvent);
815 eventPublishingExecutor.submit(
new PublishEventTask(event, moduleEventPublisher));
824 void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
825 ingestThreadActivitySnapshots.put(task.getThreadId(),
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
834 void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
835 IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
836 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
837 ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
839 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
847 void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
848 ingestThreadActivitySnapshots.put(task.getThreadId(),
new IngestThreadActivitySnapshot(task.getThreadId()));
856 void setIngestTaskProgressCompleted(FileIngestTask task) {
857 IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
858 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId());
859 ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
860 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
870 if (moduleName.equals(
"IDLE")) {
875 Long prevTimeL = ingestModuleRunTimes.get(moduleName);
877 if (prevTimeL != null) {
878 prevTime = prevTimeL;
880 prevTime += duration;
881 ingestModuleRunTimes.put(moduleName, prevTime);
890 Map<String, Long> getModuleRunTimes() {
902 List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
903 return new ArrayList<>(ingestThreadActivitySnapshots.values());
911 List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
912 List<DataSourceIngestJob.Snapshot> snapShots =
new ArrayList<>();
914 for (IngestJob job : jobsById.values()) {
915 snapShots.addAll(job.getDataSourceIngestJobSnapshots());
927 long getFreeDiskSpace() {
928 if (ingestMonitor != null) {
929 return ingestMonitor.getFreeSpace();
952 if (Thread.currentThread().isInterrupted()) {
954 jobsById.remove(job.
getId());
960 final String displayName = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.displayName");
961 this.progress = ProgressHandleFactory.createHandle(displayName,
new Cancellable() {
963 public boolean cancel() {
964 if (progress != null) {
965 progress.setDisplayName(NbBundle.getMessage(
this.getClass(),
"IngestManager.StartIngestJobsTask.run.cancelling", displayName));
967 Future<?> handle = startIngestJobTasks.remove(threadId);
979 if (null != progress) {
982 startIngestJobTasks.remove(threadId);
994 private final IngestTaskQueue
tasks;
1005 IngestTask task = tasks.getNextTask();
1006 task.execute(threadId);
1007 }
catch (InterruptedException ex) {
1010 if (Thread.currentThread().isInterrupted()) {
1047 static final class IngestThreadActivitySnapshot {
1049 private final long threadId;
1050 private final Date startTime;
1051 private final String activity;
1052 private final String dataSourceName;
1053 private final String fileName;
1054 private final long jobId;
1057 IngestThreadActivitySnapshot(
long threadId) {
1058 this.threadId = threadId;
1059 startTime =
new Date();
1060 this.activity = NbBundle.getMessage(this.getClass(),
"IngestManager.IngestThreadActivitySnapshot.idleThread");
1061 this.dataSourceName =
"";
1067 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource) {
1068 this.threadId = threadId;
1070 startTime =
new Date();
1071 this.activity = activity;
1072 this.dataSourceName = dataSource.getName();
1077 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource, AbstractFile file) {
1078 this.threadId = threadId;
1080 startTime =
new Date();
1081 this.activity = activity;
1082 this.dataSourceName = dataSource.getName();
1083 this.fileName = file.getName();
1090 long getThreadId() {
1094 Date getStartTime() {
1098 String getActivity() {
1102 String getDataSourceName() {
1103 return dataSourceName;
1106 String getFileName() {
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
String getServiceStatus(String service)
void removeIngestModuleEventListener(final PropertyChangeListener listener)
static final int MIN_NUMBER_OF_FILE_INGEST_THREADS
static synchronized IngestManager getInstance()
static IngestManager instance
final ExecutorService startIngestJobsThreadPool
void cancelAllIngestJobs()
final Map< Long, Future< Void > > startIngestJobTasks
void publish(AutopsyEvent event)
static void addPropertyChangeListener(final PropertyChangeListener listener)
static void setNumberOfFileIngestThreads(int value)
static final Logger logger
final ExecutorService eventPublishingExecutor
void clearIngestMessageBox()
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void subscribeToServiceMonitorEvents()
boolean isIngestRunning()
static void setCoreComponentsActive(boolean coreComponentsActive)
DATA_SOURCE_ANALYSIS_COMPLETED
static void removePropertyChangeListener(final PropertyChangeListener listener)
volatile IngestMessageTopComponent ingestMessageBox
final IngestTaskQueue tasks
void addSubscriber(PropertyChangeListener subscriber)
static synchronized ServicesMonitor getInstance()
final AutopsyEventPublisher publisher
int numberOfFileIngestThreads
final ServicesMonitor servicesMonitor
void removeIngestJobEventListener(final PropertyChangeListener listener)
final ExecutorService fileIngestThreadPool
static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS
static int numberOfFileIngestThreads()
void incrementModuleRunTime(String moduleName, Long duration)
void addIngestJobEventListener(final PropertyChangeListener listener)
final Object ingestMessageBoxLock
void startFileIngestThread()
void openRemoteEventChannel(String channelName)
static boolean coreComponentsAreActive()
final Map< Long, IngestJob > jobsById
AutopsyEventPublisher jobEventPublisher
void closeRemoteEventChannel()
synchronized boolean isCancelled()
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ExecutorService dataSourceIngestThreadPool
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static final String MODULE_EVENT_CHANNEL_NAME
volatile boolean jobCreationIsEnabled
static final int MAX_ERROR_MESSAGE_POSTS
final AtomicLong ingestErrorMessagePosts
void startDataSourceIngestThread()
boolean startIngestJob(IngestJob job)
static final Set< String > jobEventNames
int getNumberOfFileIngestThreads()
static final Set< String > moduleEventNames
void addIngestModuleEventListener(final PropertyChangeListener listener)
static final String JOB_EVENT_CHANNEL_NAME
static Case getCurrentCase()
synchronized static Logger getLogger(String name)
DATA_SOURCE_ANALYSIS_STARTED
static final int MAX_NUMBER_OF_FILE_INGEST_THREADS
synchronized void cancel()
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
AutopsyEventPublisher moduleEventPublisher
final IngestMonitor ingestMonitor
final AtomicLong nextThreadId
static boolean isCaseOpen()
String getTextIndexName()
void subscribeToCaseEvents()
synchronized void setRunInteractively(boolean runInteractively)
static void addEventSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)