Autopsy  4.4.1
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestManager.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2011-2017 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.Date;
29 import java.util.EnumSet;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Executors;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.atomic.AtomicLong;
41 import java.util.logging.Level;
42 import java.util.stream.Collectors;
43 import java.util.stream.Stream;
44 import javax.annotation.concurrent.GuardedBy;
45 import javax.annotation.concurrent.Immutable;
46 import javax.annotation.concurrent.ThreadSafe;
47 import javax.swing.JOptionPane;
48 import org.netbeans.api.progress.ProgressHandle;
49 import org.openide.util.Cancellable;
50 import org.openide.util.NbBundle;
65 import org.sleuthkit.datamodel.AbstractFile;
66 import org.sleuthkit.datamodel.Content;
67 
107 @ThreadSafe
108 public class IngestManager {
109 
110  private final static Logger LOGGER = Logger.getLogger(IngestManager.class.getName());
111  private final static String INGEST_JOB_EVENT_CHANNEL_NAME = "%s-Ingest-Job-Events"; //NON-NLS
112  private final static Set<String> INGEST_JOB_EVENT_NAMES = Stream.of(IngestJobEvent.values()).map(IngestJobEvent::toString).collect(Collectors.toSet());
113  private final static String INGEST_MODULE_EVENT_CHANNEL_NAME = "%s-Ingest-Module-Events"; //NON-NLS
114  private final static Set<String> INGEST_MODULE_EVENT_NAMES = Stream.of(IngestModuleEvent.values()).map(IngestModuleEvent::toString).collect(Collectors.toSet());
115  private final static int MAX_ERROR_MESSAGE_POSTS = 200;
116  @GuardedBy("IngestManager.class")
117  private static IngestManager instance;
118  private final int numberOfFileIngestThreads;
119  private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L);
120  private final ExecutorService startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS;
121  private final Map<Long, Future<Void>> startIngestJobFutures = new ConcurrentHashMap<>();
122  private final Map<Long, IngestJob> ingestJobsById = new ConcurrentHashMap<>();
123  private final ExecutorService dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS;
124  private final ExecutorService fileLevelIngestJobTasksExecutor;
125  private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS;
129  private final AutopsyEventPublisher moduleEventPublisher = new AutopsyEventPublisher();
130  private final Object ingestMessageBoxLock = new Object();
131  private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L);
132  private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots = new ConcurrentHashMap<>();
133  private final ConcurrentHashMap<String, Long> ingestModuleRunTimes = new ConcurrentHashMap<>();
134  private volatile IngestMessageTopComponent ingestMessageBox;
135  private volatile boolean caseIsOpen;
136 
143  public synchronized static IngestManager getInstance() {
144  if (null == instance) {
145  instance = new IngestManager();
146  instance.subscribeToServiceMonitorEvents();
147  instance.subscribeToCaseEvents();
148  }
149  return instance;
150  }
151 
156  private IngestManager() {
157  /*
158  * Submit a single Runnable ingest manager task for processing data
159  * source level ingest job tasks to the data source level ingest job
160  * tasks executor.
161  */
162  long threadId = nextIngestManagerTaskId.incrementAndGet();
163  dataSourceLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
164  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
165 
166  /*
167  * Submit a configurable number of Runnable ingest manager tasks for
168  * processing file level ingest job tasks to the file level ingest job
169  * tasks executor.
170  */
172  fileLevelIngestJobTasksExecutor = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS
173  for (int i = 0; i < numberOfFileIngestThreads; ++i) {
174  threadId = nextIngestManagerTaskId.incrementAndGet();
175  fileLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
176  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
177  }
178  }
179 
185  PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> {
186  if (evt.getNewValue().equals(ServicesMonitor.ServiceStatus.DOWN.toString())) {
187  /*
188  * The application services considered to be key services are
189  * only necessary for multi-user cases.
190  */
191  try {
193  return;
194  }
195  } catch (IllegalStateException noCaseOpenException) {
196  return;
197  }
198 
199  String serviceDisplayName = ServicesMonitor.Service.valueOf(evt.getPropertyName()).getDisplayName();
200  LOGGER.log(Level.SEVERE, "Service {0} is down, cancelling all running ingest jobs", serviceDisplayName); //NON-NLS
202  EventQueue.invokeLater(new Runnable() {
203  @Override
204  public void run() {
205  JOptionPane.showMessageDialog(null,
206  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
207  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
208  JOptionPane.ERROR_MESSAGE);
209  }
210  });
211  }
213  }
214  };
215 
216  /*
217  * The key services for multi-user cases are currently the case database
218  * server and the Solr server. The Solr server is a key service not
219  * because search is essential, but because the coordination service
220  * (ZooKeeper) is running embedded within the Solr server.
221  */
222  Set<String> servicesList = new HashSet<>();
223  servicesList.add(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString());
224  servicesList.add(ServicesMonitor.Service.REMOTE_KEYWORD_SEARCH.toString());
225  this.servicesMonitor.addSubscriber(servicesList, propChangeListener);
226  }
227 
232  private void subscribeToCaseEvents() {
233  Case.addEventTypeSubscriber(EnumSet.of(Case.Events.CURRENT_CASE), (PropertyChangeEvent event) -> {
234  if (event.getNewValue() != null) {
235  handleCaseOpened();
236  } else {
237  handleCaseClosed();
238  }
239  });
240  }
241 
242  /*
243  * Handles a current case opened event by clearing the ingest messages inbox
244  * and opening a remote event channel for the current case.
245  *
246  * Note that current case change events are published in a strictly
247  * serialized manner, i.e., one event at a time, synchronously.
248  */
249  void handleCaseOpened() {
250  caseIsOpen = true;
252  try {
253  Case openedCase = Case.getCurrentCase();
254  String channelPrefix = openedCase.getName();
255  if (Case.CaseType.MULTI_USER_CASE == openedCase.getCaseType()) {
256  jobEventPublisher.openRemoteEventChannel(String.format(INGEST_JOB_EVENT_CHANNEL_NAME, channelPrefix));
257  moduleEventPublisher.openRemoteEventChannel(String.format(INGEST_MODULE_EVENT_CHANNEL_NAME, channelPrefix));
258  }
259  } catch (IllegalStateException | AutopsyEventException ex) {
260  LOGGER.log(Level.SEVERE, "Failed to open remote events channel", ex); //NON-NLS
261  MessageNotifyUtil.Notify.error(NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.Title"),
262  NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.ErrMsg"));
263  }
264  }
265 
266  /*
267  * Handles a current case closed event by cancelling all ingest jobs for the
268  * case, closing the remote event channel for the case, and clearing the
269  * ingest messages inbox.
270  *
271  * Note that current case change events are published in a strictly
272  * serialized manner, i.e., one event at a time, synchronously.
273  */
274  void handleCaseClosed() {
275  /*
276  * TODO (JIRA-2227): IngestManager should wait for cancelled ingest jobs
277  * to complete when a case is closed.
278  */
279  this.cancelAllIngestJobs(IngestJob.CancellationReason.CASE_CLOSED);
282  caseIsOpen = false;
284  }
285 
294  }
295 
302  public void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
303  if (caseIsOpen) {
304  IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI());
305  if (job.hasIngestPipeline()) {
306  long taskId = nextIngestManagerTaskId.incrementAndGet();
307  Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
308  startIngestJobFutures.put(taskId, task);
309  }
310  }
311  }
312 
322  public IngestJobStartResult beginIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
323  if (caseIsOpen) {
324  IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI());
325  if (job.hasIngestPipeline()) {
326  return startIngestJob(job);
327  }
328  return new IngestJobStartResult(null, new IngestManagerException("No ingest pipeline created, likely due to no ingest modules being enabled"), null); //NON-NLS
329  }
330  return new IngestJobStartResult(null, new IngestManagerException("No case open"), null); //NON-NLS
331  }
332 
341  @NbBundle.Messages({
342  "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
343  "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
344  "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
345  "IngestManager.startupErr.dlgErrorList=Errors:"
346  })
348  List<IngestModuleError> errors = null;
349  if (caseIsOpen) {
351  try {
354  EventQueue.invokeLater(new Runnable() {
355  @Override
356  public void run() {
357  String serviceDisplayName = ServicesMonitor.Service.REMOTE_CASE_DATABASE.getDisplayName();
358  JOptionPane.showMessageDialog(null,
359  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
360  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
361  JOptionPane.ERROR_MESSAGE);
362  }
363  });
364  }
365  return new IngestJobStartResult(null, new IngestManagerException("Ingest aborted. Remote database is down"), Collections.<IngestModuleError>emptyList()); //NON-NLS
366  }
368  return new IngestJobStartResult(null, new IngestManagerException("Database server is down", ex), Collections.<IngestModuleError>emptyList()); //NON-NLS
369  }
370  }
371 
372  if (!ingestMonitor.isRunning()) {
373  ingestMonitor.start();
374  }
375 
376  ingestJobsById.put(job.getId(), job);
377  errors = job.start();
378  if (errors.isEmpty()) {
379  this.fireIngestJobStarted(job.getId());
380  IngestManager.LOGGER.log(Level.INFO, "Ingest job {0} started", job.getId()); //NON-NLS
381  } else {
382  this.ingestJobsById.remove(job.getId());
383  for (IngestModuleError error : errors) {
384  LOGGER.log(Level.SEVERE, String.format("Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.getId()), error.getThrowable()); //NON-NLS
385  }
386  IngestManager.LOGGER.log(Level.SEVERE, "Ingest job {0} could not be started", job.getId()); //NON-NLS
388  final StringBuilder message = new StringBuilder(1024);
389  message.append(Bundle.IngestManager_startupErr_dlgMsg()).append("\n"); //NON-NLS
390  message.append(Bundle.IngestManager_startupErr_dlgSolution()).append("\n\n"); //NON-NLS
391  message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append("\n"); //NON-NLS
392  for (IngestModuleError error : errors) {
393  String moduleName = error.getModuleDisplayName();
394  String errorMessage = error.getThrowable().getLocalizedMessage();
395  message.append(moduleName).append(": ").append(errorMessage).append("\n"); //NON-NLS
396  }
397  message.append("\n\n");
398  EventQueue.invokeLater(() -> {
399  JOptionPane.showMessageDialog(null, message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
400  });
401  }
402  return new IngestJobStartResult(null, new IngestManagerException("Errors occurred while starting ingest"), errors); //NON-NLS
403  }
404  }
405 
406  return new IngestJobStartResult(job, null, errors);
407  }
408 
414  void finishIngestJob(IngestJob job) {
415  long jobId = job.getId();
416  ingestJobsById.remove(jobId);
417  if (!job.isCancelled()) {
418  IngestManager.LOGGER.log(Level.INFO, "Ingest job {0} completed", jobId); //NON-NLS
419  fireIngestJobCompleted(jobId);
420  } else {
421  IngestManager.LOGGER.log(Level.INFO, "Ingest job {0} cancelled", jobId); //NON-NLS
422  fireIngestJobCancelled(jobId);
423  }
424  }
425 
432  public boolean isIngestRunning() {
433  return !ingestJobsById.isEmpty();
434  }
435 
442  startIngestJobFutures.values().forEach((handle) -> {
443  handle.cancel(true);
444  });
445  this.ingestJobsById.values().forEach((job) -> {
446  job.cancel(reason);
447  });
448  }
449 
455  public void addIngestJobEventListener(final PropertyChangeListener listener) {
456  jobEventPublisher.addSubscriber(INGEST_JOB_EVENT_NAMES, listener);
457  }
458 
464  public void removeIngestJobEventListener(final PropertyChangeListener listener) {
465  jobEventPublisher.removeSubscriber(INGEST_JOB_EVENT_NAMES, listener);
466  }
467 
473  public void addIngestModuleEventListener(final PropertyChangeListener listener) {
474  moduleEventPublisher.addSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
475  }
476 
482  public void removeIngestModuleEventListener(final PropertyChangeListener listener) {
483  moduleEventPublisher.removeSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
484  }
485 
491  void fireIngestJobStarted(long ingestJobId) {
492  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.STARTED.toString(), ingestJobId, null);
493  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
494  }
495 
501  void fireIngestJobCompleted(long ingestJobId) {
502  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
503  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
504  }
505 
511  void fireIngestJobCancelled(long ingestJobId) {
512  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
513  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
514  }
515 
524  void fireDataSourceAnalysisStarted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
525  AutopsyEvent event = new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
526  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
527  }
528 
537  void fireDataSourceAnalysisCompleted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
538  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
539  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
540  }
541 
550  void fireDataSourceAnalysisCancelled(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
551  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
552  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
553  }
554 
561  void fireFileIngestDone(AbstractFile file) {
562  AutopsyEvent event = new FileAnalyzedEvent(file);
563  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
564  }
565 
573  void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
574  AutopsyEvent event = new BlackboardPostEvent(moduleDataEvent);
575  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
576  }
577 
585  void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
586  AutopsyEvent event = new ContentChangedEvent(moduleContentEvent);
587  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
588  }
589 
595  void initIngestMessageInbox() {
596  synchronized (this.ingestMessageBoxLock) {
597  ingestMessageBox = IngestMessageTopComponent.findInstance();
598  }
599  }
600 
606  void postIngestMessage(IngestMessage message) {
607  synchronized (this.ingestMessageBoxLock) {
608  if (ingestMessageBox != null && RuntimeProperties.runningWithGUI()) {
609  if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
610  ingestMessageBox.displayMessage(message);
611  } else {
612  long errorPosts = ingestErrorMessagePosts.incrementAndGet();
613  if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
614  ingestMessageBox.displayMessage(message);
615  } else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
616  IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
617  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
618  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
619  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.msg", MAX_ERROR_MESSAGE_POSTS));
620  ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
621  }
622  }
623  }
624  }
625  }
626 
627  /*
628  * Clears the ingest messages inbox.
629  */
630  private void clearIngestMessageBox() {
631  synchronized (this.ingestMessageBoxLock) {
632  if (null != ingestMessageBox) {
633  ingestMessageBox.clearMessages();
634  }
636  }
637  }
638 
650  void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
651  ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
652  }
653 
665  void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
666  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
667  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
668  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
669  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
670  }
671 
679  void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
680  ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId()));
681  }
682 
690  void setIngestTaskProgressCompleted(FileIngestTask task) {
691  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
692  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId());
693  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
694  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
695  }
696 
703  private void incrementModuleRunTime(String moduleDisplayName, Long duration) {
704  if (moduleDisplayName.equals("IDLE")) { //NON-NLS
705  return;
706  }
707 
708  synchronized (ingestModuleRunTimes) {
709  Long prevTimeL = ingestModuleRunTimes.get(moduleDisplayName);
710  long prevTime = 0;
711  if (prevTimeL != null) {
712  prevTime = prevTimeL;
713  }
714  prevTime += duration;
715  ingestModuleRunTimes.put(moduleDisplayName, prevTime);
716  }
717  }
718 
724  Map<String, Long> getModuleRunTimes() {
725  synchronized (ingestModuleRunTimes) {
726  Map<String, Long> times = new HashMap<>(ingestModuleRunTimes);
727  return times;
728  }
729  }
730 
737  List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
738  return new ArrayList<>(ingestThreadActivitySnapshots.values());
739  }
740 
746  List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
747  List<DataSourceIngestJob.Snapshot> snapShots = new ArrayList<>();
748  ingestJobsById.values().forEach((job) -> {
749  snapShots.addAll(job.getDataSourceIngestJobSnapshots());
750  });
751  return snapShots;
752  }
753 
760  long getFreeDiskSpace() {
761  if (ingestMonitor != null) {
762  return ingestMonitor.getFreeSpace();
763  } else {
764  return -1;
765  }
766  }
767 
771  private final class StartIngestJobTask implements Callable<Void> {
772 
773  private final long threadId;
774  private final IngestJob job;
775  private ProgressHandle progress;
776 
777  StartIngestJobTask(long threadId, IngestJob job) {
778  this.threadId = threadId;
779  this.job = job;
780  }
781 
782  @Override
783  public Void call() {
784  try {
785  if (Thread.currentThread().isInterrupted()) {
786  ingestJobsById.remove(job.getId());
787  return null;
788  }
789 
791  final String displayName = NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.displayName");
792  this.progress = ProgressHandle.createHandle(displayName, new Cancellable() {
793  @Override
794  public boolean cancel() {
795  if (progress != null) {
796  progress.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.cancelling", displayName));
797  }
798  Future<?> handle = startIngestJobFutures.remove(threadId);
799  handle.cancel(true);
800  return true;
801  }
802  });
803  progress.start();
804  }
805 
806  startIngestJob(job);
807  return null;
808 
809  } finally {
810  if (null != progress) {
811  progress.finish();
812  }
813  startIngestJobFutures.remove(threadId);
814  }
815  }
816 
817  }
818 
822  private final class ExecuteIngestJobTasksTask implements Runnable {
823 
824  private final long threadId;
825  private final IngestTaskQueue tasks;
826 
827  ExecuteIngestJobTasksTask(long threadId, IngestTaskQueue tasks) {
828  this.threadId = threadId;
829  this.tasks = tasks;
830  }
831 
832  @Override
833  public void run() {
834  while (true) {
835  try {
836  IngestTask task = tasks.getNextTask(); // Blocks.
837  task.execute(threadId);
838  } catch (InterruptedException ex) {
839  break;
840  }
841  if (Thread.currentThread().isInterrupted()) {
842  break;
843  }
844  }
845  }
846  }
847 
851  private static final class PublishEventTask implements Runnable {
852 
853  private final AutopsyEvent event;
855 
864  this.event = event;
865  this.publisher = publisher;
866  }
867 
868  @Override
869  public void run() {
870  publisher.publish(event);
871  }
872 
873  }
874 
879  @Immutable
880  static final class IngestThreadActivitySnapshot {
881 
882  private final long threadId;
883  private final Date startTime;
884  private final String activity;
885  private final String dataSourceName;
886  private final String fileName;
887  private final long jobId;
888 
896  IngestThreadActivitySnapshot(long threadId) {
897  this.threadId = threadId;
898  startTime = new Date();
899  this.activity = NbBundle.getMessage(this.getClass(), "IngestManager.IngestThreadActivitySnapshot.idleThread");
900  this.dataSourceName = "";
901  this.fileName = "";
902  this.jobId = 0;
903  }
904 
915  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource) {
916  this.threadId = threadId;
917  this.jobId = jobId;
918  startTime = new Date();
919  this.activity = activity;
920  this.dataSourceName = dataSource.getName();
921  this.fileName = "";
922  }
923 
936  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource, AbstractFile file) {
937  this.threadId = threadId;
938  this.jobId = jobId;
939  startTime = new Date();
940  this.activity = activity;
941  this.dataSourceName = dataSource.getName();
942  this.fileName = file.getName();
943  }
944 
950  long getIngestJobId() {
951  return jobId;
952  }
953 
959  long getThreadId() {
960  return threadId;
961  }
962 
968  Date getStartTime() {
969  return startTime;
970  }
971 
977  String getActivity() {
978  return activity;
979  }
980 
988  String getDataSourceName() {
989  return dataSourceName;
990  }
991 
997  String getFileName() {
998  return fileName;
999  }
1000 
1001  }
1002 
1006  public enum IngestJobEvent {
1007 
1044  }
1045 
1049  public enum IngestModuleEvent {
1050 
1073  }
1074 
1078  public final static class IngestManagerException extends Exception {
1079 
1080  private static final long serialVersionUID = 1L;
1081 
1087  private IngestManagerException(String message) {
1088  super(message);
1089  }
1090 
1097  private IngestManagerException(String message, Throwable cause) {
1098  super(message, cause);
1099  }
1100  }
1101 
1110  @Deprecated
1111  public static void addPropertyChangeListener(final PropertyChangeListener listener) {
1114  }
1115 
1124  @Deprecated
1125  public static void removePropertyChangeListener(final PropertyChangeListener listener) {
1128  }
1129 
1140  @Deprecated
1141  public synchronized IngestJob startIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
1142  return beginIngestJob(dataSources, settings).getJob();
1143  }
1144 
1151  @Deprecated
1152  public void cancelAllIngestJobs() {
1154  }
1155 
1156 }
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
final Map< Long, Future< Void > > startIngestJobFutures
void removeIngestModuleEventListener(final PropertyChangeListener listener)
static synchronized IngestManager getInstance()
IngestJobStartResult startIngestJob(IngestJob job)
final ExecutorService dataSourceLevelIngestJobTasksExecutor
static void addPropertyChangeListener(final PropertyChangeListener listener)
IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static void removePropertyChangeListener(final PropertyChangeListener listener)
static final Set< String > INGEST_MODULE_EVENT_NAMES
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
void removeIngestJobEventListener(final PropertyChangeListener listener)
static final Set< String > INGEST_JOB_EVENT_NAMES
final AutopsyEventPublisher moduleEventPublisher
synchronized void openRemoteEventChannel(String channelName)
void addIngestJobEventListener(final PropertyChangeListener listener)
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void addIngestModuleEventListener(final PropertyChangeListener listener)
synchronized static Logger getLogger(String name)
Definition: Logger.java:161
void incrementModuleRunTime(String moduleDisplayName, Long duration)
static void addEventTypeSubscriber(Set< Events > eventTypes, PropertyChangeListener subscriber)
Definition: Case.java:395
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
final ExecutorService fileLevelIngestJobTasksExecutor
final Map< Long, IngestJob > ingestJobsById
final AutopsyEventPublisher jobEventPublisher

Copyright © 2012-2016 Basis Technology. Generated on: Fri Sep 29 2017
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.