19 package org.sleuthkit.autopsy.ingest;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.beans.PropertyChangeSupport;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Date;
29 import java.util.HashMap;
30 import java.util.List;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.atomic.AtomicLong;
38 import java.util.logging.Level;
39 import javax.swing.JOptionPane;
40 import org.netbeans.api.progress.ProgressHandle;
41 import org.netbeans.api.progress.ProgressHandleFactory;
42 import org.openide.util.Cancellable;
43 import org.openide.util.NbBundle;
64 private final ConcurrentHashMap<Long, IngestJob>
jobsById;
203 if (instance == null) {
224 this.runInteractively =
true;
225 this.ingestModuleRunTimes =
new ConcurrentHashMap<>();
226 this.ingestThreadActivitySnapshots =
new ConcurrentHashMap<>();
227 this.ingestErrorMessagePosts =
new AtomicLong(0L);
229 this.ingestModuleEventPublisher =
new PropertyChangeSupport(
IngestManager.class);
230 this.fireIngestEventsThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-ingest-events-%d").build());
231 this.ingestJobEventPublisher =
new PropertyChangeSupport(
IngestManager.class);
232 this.dataSourceIngestThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-data-source-ingest-%d").build());
233 this.startIngestJobsThreadPool = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(
"IM-start-ingest-jobs-%d").build());
234 this.nextThreadId =
new AtomicLong(0L);
235 this.jobsById =
new ConcurrentHashMap<>();
236 this.ingestJobStarters =
new ConcurrentHashMap<>();
241 if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) {
245 fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads,
new ThreadFactoryBuilder().setNameFormat(
"IM-file-ingest-%d").build());
256 long threadId = nextThreadId.incrementAndGet();
257 dataSourceIngestThreadPool.submit(
new IngestTaskExecuter(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
258 ingestThreadActivitySnapshots.put(threadId,
new IngestThreadActivitySnapshot(threadId));
266 long threadId = nextThreadId.incrementAndGet();
267 fileIngestThreadPool.submit(
new IngestTaskExecuter(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
268 ingestThreadActivitySnapshots.put(threadId,
new IngestThreadActivitySnapshot(threadId));
274 public void propertyChange(PropertyChangeEvent event) {
276 if (event.getNewValue() != null) {
286 synchronized void handleCaseOpened() {
287 this.jobCreationIsEnabled =
true;
291 synchronized void handleCaseClosed() {
292 this.jobCreationIsEnabled =
false;
314 void initIngestMessageInbox() {
315 ingestMessageBox = IngestMessageTopComponent.findInstance();
323 synchronized void postIngestMessage(IngestMessage message) {
324 if (ingestMessageBox != null && this.runInteractively) {
325 if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
326 ingestMessageBox.displayMessage(message);
328 long errorPosts = ingestErrorMessagePosts.incrementAndGet();
329 if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
330 ingestMessageBox.displayMessage(message);
331 }
else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
332 IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
333 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
334 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
335 NbBundle.getMessage(
this.getClass(),
"IngestManager.IngestMessage.ErrorMessageLimitReached.msg",
MAX_ERROR_MESSAGE_POSTS));
336 ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
343 if (ingestMessageBox != null) {
344 ingestMessageBox.clearMessages();
346 ingestErrorMessagePosts.set(0);
367 if (this.jobCreationIsEnabled) {
369 if (job.hasIngestPipeline()) {
370 long taskId = nextThreadId.incrementAndGet();
371 Future<Void> task = startIngestJobsThreadPool.submit(
new IngestJobStarter(taskId, job));
372 ingestJobStarters.put(taskId, task);
385 if (this.jobCreationIsEnabled) {
387 if (job.hasIngestPipeline()) {
403 boolean success =
false;
404 if (this.jobCreationIsEnabled) {
408 if (runInteractively && jobsById.size() == 1) {
412 if (!ingestMonitor.isRunning()) {
413 ingestMonitor.start();
422 this.jobsById.put(job.
getId(), job);
423 List<IngestModuleError> errors = job.start();
424 if (errors.isEmpty()) {
425 this.fireIngestJobStarted(job.
getId());
429 this.jobsById.remove(job.
getId());
430 for (IngestModuleError error : errors) {
431 logger.log(Level.SEVERE, String.format(
"Error starting %s ingest module", error.getModuleDisplayName()), error.getModuleError());
434 if (this.runInteractively) {
435 EventQueue.invokeLater(
new Runnable() {
439 StringBuilder moduleStartUpErrors =
new StringBuilder();
440 for (IngestModuleError error : errors) {
441 String moduleName = error.getModuleDisplayName();
442 moduleStartUpErrors.append(moduleName);
443 moduleStartUpErrors.append(
": ");
444 moduleStartUpErrors.append(error.getModuleError().getLocalizedMessage());
445 moduleStartUpErrors.append(
"\n");
447 StringBuilder notifyMessage =
new StringBuilder();
448 notifyMessage.append(NbBundle.getMessage(
this.getClass(),
449 "IngestManager.StartIngestJobsTask.run.startupErr.dlgMsg"));
450 notifyMessage.append(
"\n");
451 notifyMessage.append(NbBundle.getMessage(
this.getClass(),
452 "IngestManager.StartIngestJobsTask.run.startupErr.dlgSolution"));
453 notifyMessage.append(
"\n");
454 notifyMessage.append(NbBundle.getMessage(
this.getClass(),
455 "IngestManager.StartIngestJobsTask.run.startupErr.dlgErrorList",
456 moduleStartUpErrors.toString()));
457 notifyMessage.append(
"\n\n");
458 JOptionPane.showMessageDialog(null, notifyMessage.toString(),
459 NbBundle.getMessage(this.getClass(),
460 "IngestManager.StartIngestJobsTask.run.startupErr.dlgTitle"), JOptionPane.ERROR_MESSAGE);
469 synchronized void finishIngestJob(
IngestJob job) {
470 long jobId = job.
getId();
471 this.jobsById.remove(jobId);
474 this.fireIngestJobCompleted(jobId);
476 IngestManager.logger.log(Level.INFO,
"Ingest job {0} cancelled", jobId);
477 this.fireIngestJobCancelled(jobId);
487 return !this.jobsById.isEmpty();
495 for (Future<Void> handle : ingestJobStarters.values()) {
500 for (
IngestJob job : this.jobsById.values()) {
511 ingestJobEventPublisher.addPropertyChangeListener(listener);
520 ingestJobEventPublisher.removePropertyChangeListener(listener);
529 ingestModuleEventPublisher.addPropertyChangeListener(listener);
538 ingestModuleEventPublisher.removePropertyChangeListener(listener);
572 void fireIngestJobStarted(
long ingestJobId) {
573 fireIngestEventsThreadPool.submit(
new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null));
581 void fireIngestJobCompleted(
long ingestJobId) {
582 fireIngestEventsThreadPool.submit(
new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null));
590 void fireIngestJobCancelled(
long ingestJobId) {
591 fireIngestEventsThreadPool.submit(
new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null));
599 void fireFileIngestDone(AbstractFile file) {
600 fireIngestEventsThreadPool.submit(
new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, file.getId(), file));
608 void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
609 fireIngestEventsThreadPool.submit(
new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null));
619 void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
620 fireIngestEventsThreadPool.submit(
new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null));
629 void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
630 ingestThreadActivitySnapshots.put(task.getThreadId(),
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
639 void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
640 IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
641 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
642 ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
644 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
652 void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
653 ingestThreadActivitySnapshots.put(task.getThreadId(),
new IngestThreadActivitySnapshot(task.getThreadId()));
661 void setIngestTaskProgressCompleted(FileIngestTask task) {
662 IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
663 IngestThreadActivitySnapshot newSnap =
new IngestThreadActivitySnapshot(task.getThreadId());
664 ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
665 incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
675 if (moduleName.equals(
"IDLE")) {
680 Long prevTimeL = ingestModuleRunTimes.get(moduleName);
682 if (prevTimeL != null) {
683 prevTime = prevTimeL;
685 prevTime += duration;
686 ingestModuleRunTimes.put(moduleName, prevTime);
695 Map<String, Long> getModuleRunTimes() {
707 List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
708 return new ArrayList<>(ingestThreadActivitySnapshots.values());
716 List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
717 List<DataSourceIngestJob.Snapshot> snapShots =
new ArrayList<>();
718 for (IngestJob job : this.jobsById.values()) {
719 snapShots.addAll(job.getDataSourceIngestJobSnapshots());
730 long getFreeDiskSpace() {
731 if (ingestMonitor != null) {
732 return ingestMonitor.getFreeSpace();
755 if (Thread.currentThread().isInterrupted()) {
756 jobsById.remove(job.
getId());
760 if (runInteractively) {
761 final String displayName = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.displayName");
762 this.progress = ProgressHandleFactory.createHandle(displayName,
new Cancellable() {
764 public boolean cancel() {
765 if (progress != null) {
766 progress.setDisplayName(NbBundle.getMessage(
this.getClass(),
"IngestManager.StartIngestJobsTask.run.cancelling", displayName));
768 Future<?> handle = ingestJobStarters.remove(threadId);
780 if (null != progress) {
783 ingestJobStarters.remove(threadId);
795 private final IngestTaskQueue
tasks;
806 IngestTask task = tasks.getNextTask();
807 task.execute(threadId);
808 }
catch (InterruptedException ex) {
811 if (Thread.currentThread().isInterrupted()) {
831 this.jobEvent = event;
832 this.moduleEvent = null;
839 this.jobEvent = null;
840 this.moduleEvent = event;
848 publisher.firePropertyChange((jobEvent != null ? jobEvent.toString() : moduleEvent.toString()), oldValue, newValue);
849 }
catch (Exception e) {
850 logger.log(Level.SEVERE,
"Ingest manager listener threw exception", e);
852 NbBundle.getMessage(
IngestManager.class,
"IngestManager.moduleErr.errListenToUpdates.msg"),
858 static final class IngestThreadActivitySnapshot {
860 private final long threadId;
861 private final Date startTime;
862 private final String activity;
863 private final String dataSourceName;
864 private final String fileName;
865 private final long jobId;
868 IngestThreadActivitySnapshot(
long threadId) {
869 this.threadId = threadId;
870 startTime =
new Date();
871 this.activity = NbBundle.getMessage(this.getClass(),
"IngestManager.IngestThreadActivitySnapshot.idleThread");
872 this.dataSourceName =
"";
878 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource) {
879 this.threadId = threadId;
881 startTime =
new Date();
882 this.activity = activity;
883 this.dataSourceName = dataSource.getName();
888 IngestThreadActivitySnapshot(
long threadId,
long jobId, String activity, Content dataSource, AbstractFile file) {
889 this.threadId = threadId;
891 startTime =
new Date();
892 this.activity = activity;
893 this.dataSourceName = dataSource.getName();
894 this.fileName = file.getName();
905 Date getStartTime() {
909 String getActivity() {
913 String getDataSourceName() {
914 return dataSourceName;
917 String getFileName() {
final ExecutorService fireIngestEventsThreadPool
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
final IngestTaskQueue tasks
final ConcurrentHashMap< Long, IngestJob > jobsById
void removeIngestModuleEventListener(final PropertyChangeListener listener)
final IngestJobEvent jobEvent
static final int MIN_NUMBER_OF_FILE_INGEST_THREADS
volatile boolean runInteractively
static synchronized IngestManager getInstance()
static IngestManager instance
final ExecutorService startIngestJobsThreadPool
final PropertyChangeSupport ingestJobEventPublisher
static void addPropertyChangeListener(final PropertyChangeListener listener)
static void setNumberOfFileIngestThreads(int value)
static final Logger logger
void clearIngestMessageBox()
final ConcurrentHashMap< Long, Future< Void > > ingestJobStarters
boolean isIngestRunning()
static void removePropertyChangeListener(final PropertyChangeListener listener)
volatile IngestMessageTopComponent ingestMessageBox
synchronized void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
int numberOfFileIngestThreads
void removeIngestJobEventListener(final PropertyChangeListener listener)
final ExecutorService fileIngestThreadPool
static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS
static int numberOfFileIngestThreads()
void incrementModuleRunTime(String moduleName, Long duration)
void addIngestJobEventListener(final PropertyChangeListener listener)
void startFileIngestThread()
final ExecutorService dataSourceIngestThreadPool
final PropertyChangeSupport publisher
volatile boolean jobCreationIsEnabled
static final int MAX_ERROR_MESSAGE_POSTS
final AtomicLong ingestErrorMessagePosts
void startDataSourceIngestThread()
boolean startIngestJob(IngestJob job)
int getNumberOfFileIngestThreads()
void addIngestModuleEventListener(final PropertyChangeListener listener)
static synchronized void addPropertyChangeListener(PropertyChangeListener listener)
synchronized void cancelAllIngestJobs()
static void show(String title, String message, MessageType type, ActionListener actionListener)
static final int MAX_NUMBER_OF_FILE_INGEST_THREADS
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
final IngestMonitor ingestMonitor
final AtomicLong nextThreadId
void subscribeToCaseEvents()
static Logger getLogger(String name)
synchronized void setRunInteractively(boolean runInteractively)
final PropertyChangeSupport ingestModuleEventPublisher
final IngestModuleEvent moduleEvent