Autopsy  4.21.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestManager.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2012-2021 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import com.google.common.eventbus.Subscribe;
22 import com.google.common.util.concurrent.ThreadFactoryBuilder;
23 import java.awt.EventQueue;
24 import java.awt.GraphicsEnvironment;
25 import java.beans.PropertyChangeEvent;
26 import java.beans.PropertyChangeListener;
27 import java.io.Serializable;
28 import java.lang.reflect.InvocationTargetException;
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.Date;
33 import java.util.EnumSet;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Optional;
39 import java.util.Set;
40 import java.util.concurrent.Callable;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.atomic.AtomicLong;
46 import java.util.logging.Level;
47 import java.util.stream.Collectors;
48 import java.util.stream.Stream;
49 import javax.annotation.concurrent.GuardedBy;
50 import javax.annotation.concurrent.Immutable;
51 import javax.annotation.concurrent.ThreadSafe;
52 import javax.swing.JOptionPane;
53 import javax.swing.SwingUtilities;
54 import org.netbeans.api.progress.ProgressHandle;
55 import org.openide.util.Cancellable;
56 import org.openide.util.NbBundle;
57 import org.openide.windows.WindowManager;
81 
121 @ThreadSafe
123 
124  private final static Logger logger = Logger.getLogger(IngestManager.class.getName());
125  private final static String INGEST_JOB_EVENT_CHANNEL_NAME = "%s-Ingest-Job-Events"; //NON-NLS
126  private final static Set<String> INGEST_JOB_EVENT_NAMES = Stream.of(IngestJobEvent.values()).map(IngestJobEvent::toString).collect(Collectors.toSet());
127  private final static String INGEST_MODULE_EVENT_CHANNEL_NAME = "%s-Ingest-Module-Events"; //NON-NLS
128  private final static Set<String> INGEST_MODULE_EVENT_NAMES = Stream.of(IngestModuleEvent.values()).map(IngestModuleEvent::toString).collect(Collectors.toSet());
129  private final static int MAX_ERROR_MESSAGE_POSTS = 200;
130  @GuardedBy("IngestManager.class")
131  private static IngestManager instance;
132  private final int numberOfFileIngestThreads;
133  private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L);
134  private final ExecutorService startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS;
135  @GuardedBy("startIngestJobFutures")
136  private final Map<Long, Future<Void>> startIngestJobFutures = new ConcurrentHashMap<>();
137  @GuardedBy("ingestJobsById")
138  private final Map<Long, IngestJob> ingestJobsById = new HashMap<>();
139  private final ExecutorService dataSourceLevelIngestJobTasksExecutor;
140  private final ExecutorService fileLevelIngestJobTasksExecutor;
141  private final ExecutorService dataArtifactIngestTasksExecutor;
142  private final ExecutorService analysisResultIngestTasksExecutor;
143  private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS;
147  private final AutopsyEventPublisher moduleEventPublisher = new AutopsyEventPublisher();
148  private final Object ingestMessageBoxLock = new Object();
149  private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L);
150  private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots = new ConcurrentHashMap<>();
151  private final ConcurrentHashMap<String, Long> ingestModuleRunTimes = new ConcurrentHashMap<>();
152  private volatile IngestMessageTopComponent ingestMessageBox;
153  private volatile boolean caseIsOpen;
154 
161  public synchronized static IngestManager getInstance() {
162  if (null == instance) {
163  instance = new IngestManager();
164  instance.subscribeToServiceMonitorEvents();
165  instance.subscribeToCaseEvents();
166  }
167  return instance;
168  }
169 
174  private IngestManager() {
175  dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS;
176  long threadId = nextIngestManagerTaskId.incrementAndGet();
177  dataSourceLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
178  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
179 
181  fileLevelIngestJobTasksExecutor = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS
182  for (int i = 0; i < numberOfFileIngestThreads; ++i) {
183  threadId = nextIngestManagerTaskId.incrementAndGet();
184  fileLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
185  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
186  }
187 
188  dataArtifactIngestTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-artifact-ingest-%d").build()); //NON-NLS;
189  threadId = nextIngestManagerTaskId.incrementAndGet();
190  dataArtifactIngestTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataArtifactIngestTaskQueue()));
191  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
192 
193  analysisResultIngestTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-analysis-result-ingest-%d").build()); //NON-NLS;
194  threadId = nextIngestManagerTaskId.incrementAndGet();
195  analysisResultIngestTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getAnalysisResultIngestTaskQueue()));
196  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
197  }
198 
204  PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> {
205  if (evt.getNewValue().equals(ServicesMonitor.ServiceStatus.DOWN.toString())) {
206  /*
207  * The application services considered to be key services are
208  * only necessary for multi-user cases.
209  */
210  try {
212  return;
213  }
214  } catch (NoCurrentCaseException noCaseOpenException) {
215  return;
216  }
217 
218  String serviceDisplayName = ServicesMonitor.Service.valueOf(evt.getPropertyName()).getDisplayName();
219  logger.log(Level.SEVERE, "Service {0} is down, cancelling all running ingest jobs", serviceDisplayName); //NON-NLS
221  EventQueue.invokeLater(new Runnable() {
222  @Override
223  public void run() {
224  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
225  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
226  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
227  JOptionPane.ERROR_MESSAGE);
228  }
229  });
230  }
232  }
233  };
234 
235  /*
236  * The key services for multi-user cases are currently the case database
237  * server and the Solr server. The Solr server is a key service not
238  * because search is essential, but because the coordination service
239  * (ZooKeeper) is running embedded within the Solr server.
240  */
241  Set<String> servicesList = new HashSet<>();
242  servicesList.add(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString());
243  servicesList.add(ServicesMonitor.Service.REMOTE_KEYWORD_SEARCH.toString());
244  this.servicesMonitor.addSubscriber(servicesList, propChangeListener);
245  }
246 
251  private void subscribeToCaseEvents() {
252  Case.addEventTypeSubscriber(EnumSet.of(Case.Events.CURRENT_CASE), (PropertyChangeEvent event) -> {
253  if (event.getNewValue() != null) {
254  handleCaseOpened();
255  } else {
256  handleCaseClosed();
257  }
258  });
259  }
260 
269  void handleCaseOpened() {
270  caseIsOpen = true;
272  try {
273  Case openedCase = Case.getCurrentCaseThrows();
274  String channelPrefix = openedCase.getName();
275  if (Case.CaseType.MULTI_USER_CASE == openedCase.getCaseType()) {
276  jobEventPublisher.openRemoteEventChannel(String.format(INGEST_JOB_EVENT_CHANNEL_NAME, channelPrefix));
277  moduleEventPublisher.openRemoteEventChannel(String.format(INGEST_MODULE_EVENT_CHANNEL_NAME, channelPrefix));
278  }
279  openedCase.getSleuthkitCase().registerForEvents(this);
280  } catch (NoCurrentCaseException | AutopsyEventException ex) {
281  logger.log(Level.SEVERE, "Failed to open remote events channel", ex); //NON-NLS
282  MessageNotifyUtil.Notify.error(NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.Title"),
283  NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.ErrMsg"));
284  }
285  }
286 
293  @Subscribe
294  void handleArtifactsPosted(Blackboard.ArtifactsPostedEvent tskEvent) {
295  /*
296  * Add any new data artifacts included in the event to the source ingest
297  * job for possible analysis.
298  */
299  List<DataArtifact> newDataArtifacts = new ArrayList<>();
300  List<AnalysisResult> newAnalysisResults = new ArrayList<>();
301  Collection<BlackboardArtifact> newArtifacts = tskEvent.getArtifacts();
302  for (BlackboardArtifact artifact : newArtifacts) {
303  if (artifact instanceof DataArtifact) {
304  newDataArtifacts.add((DataArtifact) artifact);
305  } else {
306  newAnalysisResults.add((AnalysisResult) artifact);
307  }
308  }
309  if (!newDataArtifacts.isEmpty() || !newAnalysisResults.isEmpty()) {
310  IngestJob ingestJob = null;
311  Optional<Long> ingestJobId = tskEvent.getIngestJobId();
312  if (ingestJobId.isPresent()) {
313  synchronized (ingestJobsById) {
314  ingestJob = ingestJobsById.get(ingestJobId.get());
315  }
316  } else {
317  /*
318  * There are four use cases where the ingest job ID returned by
319  * the event is expected be null:
320  *
321  * 1. The artifacts are being posted by a data source proccessor
322  * (DSP) module that runs before the ingest job is created,
323  * i.e., a DSP that does not support streaming ingest and has no
324  * noton of an ingest job ID. In this use case, the event is
325  * handled synchronously. The DSP calls
326  * Blackboard.postArtifacts(), which puts the event on the event
327  * bus to which this method subscribes, so the event will be
328  * handled here before the DSP completes and calls
329  * DataSourceProcessorCallback.done(). This means the code below
330  * will execute before the ingest job is created, so it will not
331  * find an ingest job to which to add the artifacts. However,
332  * the artifacts WILL be analyzed after the ingest job is
333  * started, when the ingest job executor, working in batch mode,
334  * schedules ingest tasks for all of the data artifacts in the
335  * case database. There is a slight risk that the wrong ingest
336  * job will be selected if multiple ingests of the same data
337  * source are in progress.
338  *
339  * 2. The artifacts were posted by an ingest module that either
340  * has not been updated to use the current
341  * Blackboard.postArtifacts() API, or is using it incorrectly.
342  * In this use case, the code below should be able to find the
343  * ingest job to which to add the artifacts via their data
344  * source. There is a slight risk that the wrong ingest job will
345  * be selected if multiple ingests of the same data source are
346  * in progress.
347  *
348  * 3. The portable case generator uses a
349  * CommunicationArtifactsHelper constructed with a null ingest
350  * job ID, and the CommunicatonsArtifactHelper posts artifacts.
351  * Ingest of that data source might be running, in which case
352  * the data artifact will be analyzed. It also might be analyzed
353  * by a subsequent ingest job for the data source. This is an
354  * acceptable edge case.
355  *
356  * 4. The user can manually create timeline events with the
357  * timeline tool, which posts the TSK_TL_EVENT data artifacts.
358  * The user selects the data source for these artifacts. Ingest
359  * of that data source might be running, in which case the data
360  * artifact will be analyzed. It also might be analyzed by a
361  * subsequent ingest job for the data source. This is an
362  * acceptable edge case.
363  *
364  * 5. The user can manually run ad hoc keyword searches,
365  * which post TSK_KEYWORD_HIT analysis results. Ingest
366  * of that data source might be running, in which case the analysis
367  * results will be analyzed. They also might be analyzed by a
368  * subsequent ingest job for the data source. This is an
369  * acceptable edge case.
370  */
371  BlackboardArtifact artifact = newArtifacts.iterator().next();
372  if (artifact != null) {
373  try {
374  Content artifactDataSource = artifact.getDataSource();
375  synchronized (ingestJobsById) {
376  for (IngestJob job : ingestJobsById.values()) {
377  Content dataSource = job.getDataSource();
378  if (artifactDataSource.getId() == dataSource.getId()) {
379  ingestJob = job;
380  break;
381  }
382  }
383  }
384  } catch (TskCoreException ex) {
385  logger.log(Level.SEVERE, String.format("Failed to get data source for blackboard artifact (object ID = %d)", artifact.getId()), ex); //NON-NLS
386  }
387  }
388  }
389  if (ingestJob != null) {
390  if (!newDataArtifacts.isEmpty()) {
391  ingestJob.addDataArtifacts(newDataArtifacts);
392  }
393  if (!newAnalysisResults.isEmpty()) {
394  ingestJob.addAnalysisResults(newAnalysisResults);
395  }
396  }
397  }
398 
399  /*
400  * Publish Autopsy events for the new artifacts, one event per artifact
401  * type.
402  */
403  for (BlackboardArtifact.Type artifactType : tskEvent.getArtifactTypes()) {
404  ModuleDataEvent legacyEvent = new ModuleDataEvent(tskEvent.getModuleName(), artifactType, tskEvent.getArtifacts(artifactType));
405  AutopsyEvent autopsyEvent = new BlackboardPostEvent(legacyEvent);
406  eventPublishingExecutor.submit(new PublishEventTask(autopsyEvent, moduleEventPublisher));
407  }
408  }
409 
419  void handleCaseClosed() {
420  /*
421  * TODO (JIRA-2227): IngestManager should wait for cancelled ingest jobs
422  * to complete when a case is closed.
423  */
424  cancelAllIngestJobs(IngestJob.CancellationReason.CASE_CLOSED);
425  Case.getCurrentCase().getSleuthkitCase().unregisterForEvents(this);
428  caseIsOpen = false;
430  }
431 
444  if (!(dataSource instanceof DataSource)) {
445  throw new IllegalArgumentException("dataSource argument does not implement the DataSource interface"); //NON-NLS
446  }
447  IngestJob job = new IngestJob(dataSource, IngestJob.Mode.STREAMING, settings);
448  IngestJobInputStream stream = new IngestJobInputStream(job);
449  if (stream.getIngestJobStartResult().getJob() != null) {
450  return stream;
451  } else if (stream.getIngestJobStartResult().getModuleErrors().isEmpty()) {
452  for (IngestModuleError error : stream.getIngestJobStartResult().getModuleErrors()) {
453  logger.log(Level.SEVERE, String.format("%s ingest module startup error for %s", error.getModuleDisplayName(), dataSource.getName()), error.getThrowable());
454  }
455  throw new TskCoreException("Error starting ingest modules");
456  } else {
457  throw new TskCoreException("Error starting ingest modules", stream.getIngestJobStartResult().getStartupException());
458  }
459  }
460 
469  }
470 
477  public void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
478  if (caseIsOpen) {
479  List<AbstractFile> emptyFilesSubset = new ArrayList<>();
480  for (Content dataSource : dataSources) {
481  queueIngestJob(dataSource, emptyFilesSubset, settings);
482  }
483  }
484  }
485 
495  public void queueIngestJob(Content dataSource, List<AbstractFile> files, IngestJobSettings settings) {
496  if (!(dataSource instanceof DataSource)) {
497  throw new IllegalArgumentException("dataSource argument does not implement the DataSource interface"); //NON-NLS
498  }
499  if (caseIsOpen) {
500  IngestJob job = new IngestJob((DataSource) dataSource, files, settings);
501  if (job.hasIngestPipeline()) {
502  long taskId = nextIngestManagerTaskId.incrementAndGet();
503  Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
504  synchronized (startIngestJobFutures) {
505  startIngestJobFutures.put(taskId, task);
506  }
507  }
508  }
509  }
510 
525  public IngestJobStartResult beginIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
526  List<DataSource> verifiedDataSources = new ArrayList<>();
527  for (Content content : dataSources) {
528  if (!(content instanceof DataSource)) {
529  throw new IllegalArgumentException("Content object in dataSources argument does not implement the DataSource interface"); //NON-NLS
530  }
531  DataSource verifiedDataSource = (DataSource) content;
532  verifiedDataSources.add(verifiedDataSource);
533  }
534  IngestJobStartResult startResult = null;
535  if (caseIsOpen) {
536  for (DataSource dataSource : verifiedDataSources) {
537  List<IngestJob> startedJobs = new ArrayList<>();
538  IngestJob job = new IngestJob(dataSource, IngestJob.Mode.BATCH, settings);
539  if (job.hasIngestPipeline()) {
540  startResult = startIngestJob(job);
541  if (startResult.getModuleErrors().isEmpty() && startResult.getStartupException() == null) {
542  startedJobs.add(job);
543  } else {
544  for (IngestJob jobToCancel : startedJobs) {
546  }
547  break;
548  }
549  } else {
550  startResult = new IngestJobStartResult(null, new IngestManagerException("No ingest pipeline created, likely due to no ingest modules being enabled"), null); //NON-NLS
551  break;
552  }
553  }
554  } else {
555  startResult = new IngestJobStartResult(null, new IngestManagerException("No case open"), null); //NON-NLS
556  }
557  return startResult;
558  }
559 
568  @NbBundle.Messages({
569  "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
570  "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
571  "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
572  "IngestManager.startupErr.dlgErrorList=Errors:"
573  })
574  IngestJobStartResult startIngestJob(IngestJob job) {
575 
576  // initialize IngestMessageInbox, if it hasn't been initialized yet. This can't be done in
577  // the constructor because that ends up freezing the UI on startup (JIRA-7345).
578  if (!GraphicsEnvironment.isHeadless()) {
579  if (SwingUtilities.isEventDispatchThread()) {
580  initIngestMessageInbox();
581  } else {
582  try {
583  SwingUtilities.invokeAndWait(() -> initIngestMessageInbox());
584  } catch (InterruptedException ex) {
585  // ignore interruptions
586  } catch (InvocationTargetException ex) {
587  logger.log(Level.WARNING, "There was an error starting ingest message inbox", ex);
588  }
589  }
590  }
591 
592  List<IngestModuleError> errors = null;
593  Case openCase;
594  try {
595  openCase = Case.getCurrentCaseThrows();
596  } catch (NoCurrentCaseException ex) {
597  return new IngestJobStartResult(null, new IngestManagerException("Exception while getting open case.", ex), Collections.<IngestModuleError>emptyList()); //NON-NLS
598  }
599  if (openCase.getCaseType() == Case.CaseType.MULTI_USER_CASE) {
600  try {
601  if (!servicesMonitor.getServiceStatus(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString()).equals(ServicesMonitor.ServiceStatus.UP.toString())) {
602  if (RuntimeProperties.runningWithGUI()) {
603  EventQueue.invokeLater(new Runnable() {
604  @Override
605  public void run() {
606  String serviceDisplayName = ServicesMonitor.Service.REMOTE_CASE_DATABASE.getDisplayName();
607  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
608  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
609  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
610  JOptionPane.ERROR_MESSAGE);
611  }
612  });
613  }
614  return new IngestJobStartResult(null, new IngestManagerException("Ingest aborted. Remote database is down"), Collections.<IngestModuleError>emptyList()); //NON-NLS
615  }
616  } catch (ServicesMonitor.ServicesMonitorException ex) {
617  return new IngestJobStartResult(null, new IngestManagerException("Database server is down", ex), Collections.<IngestModuleError>emptyList()); //NON-NLS
618  }
619  }
620 
621  if (!ingestMonitor.isRunning()) {
622  ingestMonitor.start();
623  }
624 
625  synchronized (ingestJobsById) {
626  ingestJobsById.put(job.getId(), job);
627  }
628  IngestManager.logger.log(Level.INFO, String.format("Starting ingest job %d at %s", job.getId(), new Date().getTime())); //NON-NLS
629  try {
630  errors = job.start();
631  } catch (InterruptedException ex) {
632  return new IngestJobStartResult(null, new IngestManagerException("Interrupted while starting ingest", ex), errors); //NON-NLS
633  }
634  if (errors.isEmpty()) {
635  this.fireIngestJobStarted(job.getId());
636  } else {
637  synchronized (ingestJobsById) {
638  this.ingestJobsById.remove(job.getId());
639  }
640  for (IngestModuleError error : errors) {
641  logger.log(Level.SEVERE, String.format("Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.getId()), error.getThrowable()); //NON-NLS
642  }
643  IngestManager.logger.log(Level.SEVERE, "Ingest job {0} could not be started", job.getId()); //NON-NLS
644  if (RuntimeProperties.runningWithGUI()) {
645  final StringBuilder message = new StringBuilder(1024);
646  message.append(Bundle.IngestManager_startupErr_dlgMsg()).append("\n"); //NON-NLS
647  message.append(Bundle.IngestManager_startupErr_dlgSolution()).append("\n\n"); //NON-NLS
648  message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append("\n"); //NON-NLS
649  for (IngestModuleError error : errors) {
650  String moduleName = error.getModuleDisplayName();
651  String errorMessage = error.getThrowable().getLocalizedMessage();
652  message.append(moduleName).append(": ").append(errorMessage).append("\n"); //NON-NLS
653  }
654  message.append("\n\n");
655  EventQueue.invokeLater(() -> {
656  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(), message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
657  });
658  }
659  return new IngestJobStartResult(null, new IngestManagerException("Errors occurred while starting ingest"), errors); //NON-NLS
660  }
661 
662  return new IngestJobStartResult(job, null, errors);
663  }
664 
670  void finishIngestJob(IngestJob job) {
671  long jobId = job.getId();
672  synchronized (ingestJobsById) {
673  ingestJobsById.remove(jobId);
674  }
675  if (!job.isCancelled()) {
676  IngestManager.logger.log(Level.INFO, String.format("Ingest job %d completed at %s", job.getId(), new Date().getTime())); //NON-NLS
677  fireIngestJobCompleted(jobId);
678  } else {
679  IngestManager.logger.log(Level.INFO, String.format("Ingest job %d cancelled at %s", job.getId(), new Date().getTime())); //NON-NLS
680  fireIngestJobCancelled(jobId);
681  }
682  }
683 
690  public boolean isIngestRunning() {
691  synchronized (ingestJobsById) {
692  return !ingestJobsById.isEmpty();
693  }
694  }
695 
702  synchronized (startIngestJobFutures) {
703  startIngestJobFutures.values().forEach((handle) -> {
704  handle.cancel(true);
705  });
706  }
707  synchronized (ingestJobsById) {
708  this.ingestJobsById.values().forEach((job) -> {
709  job.cancel(reason);
710  });
711  }
712  }
713 
719  public void addIngestJobEventListener(final PropertyChangeListener listener) {
720  jobEventPublisher.addSubscriber(INGEST_JOB_EVENT_NAMES, listener);
721  }
722 
730  public void addIngestJobEventListener(Set<IngestJobEvent> eventTypes, final PropertyChangeListener listener) {
731  eventTypes.forEach((IngestJobEvent event) -> {
732  jobEventPublisher.addSubscriber(event.toString(), listener);
733  });
734  }
735 
741  public void removeIngestJobEventListener(final PropertyChangeListener listener) {
742  jobEventPublisher.removeSubscriber(INGEST_JOB_EVENT_NAMES, listener);
743  }
744 
751  public void removeIngestJobEventListener(Set<IngestJobEvent> eventTypes, final PropertyChangeListener listener) {
752  eventTypes.forEach((IngestJobEvent event) -> {
753  jobEventPublisher.removeSubscriber(event.toString(), listener);
754  });
755  }
756 
762  public void addIngestModuleEventListener(final PropertyChangeListener listener) {
763  moduleEventPublisher.addSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
764  }
765 
773  public void addIngestModuleEventListener(Set<IngestModuleEvent> eventTypes, final PropertyChangeListener listener) {
774  eventTypes.forEach((IngestModuleEvent event) -> {
775  moduleEventPublisher.addSubscriber(event.toString(), listener);
776  });
777  }
778 
784  public void removeIngestModuleEventListener(final PropertyChangeListener listener) {
785  moduleEventPublisher.removeSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
786  }
787 
794  public void removeIngestModuleEventListener(Set<IngestModuleEvent> eventTypes, final PropertyChangeListener listener) {
795  moduleEventPublisher.removeSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
796  }
797 
803  void fireIngestJobStarted(long ingestJobId) {
804  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.STARTED.toString(), ingestJobId, null);
805  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
806  }
807 
813  void fireIngestJobCompleted(long ingestJobId) {
814  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
815  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
816  }
817 
823  void fireIngestJobCancelled(long ingestJobId) {
824  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
825  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
826  }
827 
835  void fireDataSourceAnalysisStarted(long ingestJobId, Content dataSource) {
836  AutopsyEvent event = new DataSourceAnalysisStartedEvent(ingestJobId, dataSource);
837  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
838  }
839 
847  void fireDataSourceAnalysisCompleted(long ingestJobId, Content dataSource) {
848  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
849  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
850  }
851 
859  void fireDataSourceAnalysisCancelled(long ingestJobId, Content dataSource) {
860  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
861  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
862  }
863 
870  void fireFileIngestDone(AbstractFile file) {
871  AutopsyEvent event = new FileAnalyzedEvent(file);
872  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
873  }
874 
882  void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
883  AutopsyEvent event = new ContentChangedEvent(moduleContentEvent);
884  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
885  }
886 
895  void initIngestMessageInbox() {
896  synchronized (this.ingestMessageBoxLock) {
897  ingestMessageBox = IngestMessageTopComponent.findInstance();
898  }
899  }
900 
906  void postIngestMessage(IngestMessage message) {
907  synchronized (this.ingestMessageBoxLock) {
908  if (ingestMessageBox != null && RuntimeProperties.runningWithGUI()) {
909  if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
910  ingestMessageBox.displayMessage(message);
911  } else {
912  long errorPosts = ingestErrorMessagePosts.incrementAndGet();
913  if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
914  ingestMessageBox.displayMessage(message);
915  } else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
916  IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
917  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
918  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
919  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.msg", MAX_ERROR_MESSAGE_POSTS));
920  ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
921  }
922  }
923  }
924  }
925  }
926 
927  /*
928  * Clears the ingest messages inbox.
929  */
930  private void clearIngestMessageBox() {
931  synchronized (this.ingestMessageBoxLock) {
932  if (null != ingestMessageBox) {
933  ingestMessageBox.clearMessages();
934  }
936  }
937  }
938 
949  void setIngestTaskProgress(IngestTask task, String currentModuleName) {
950  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
951  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobExecutor().getIngestJobId(), currentModuleName, task.getDataSource(), task.getContentName());
952  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
953  incrementModuleRunTime(prevSnap.getModuleDisplayName(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
954  }
955 
963  void setIngestTaskProgressCompleted(IngestTask task) {
964  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
965  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId());
966  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
967  incrementModuleRunTime(prevSnap.getModuleDisplayName(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
968  }
969 
976  void incrementModuleRunTime(String moduleDisplayName, Long duration) {
977  if (moduleDisplayName.equals("IDLE")) { //NON-NLS
978  return;
979  }
980 
981  synchronized (ingestModuleRunTimes) {
982  Long prevTimeL = ingestModuleRunTimes.get(moduleDisplayName);
983  long prevTime = 0;
984  if (prevTimeL != null) {
985  prevTime = prevTimeL;
986  }
987  prevTime += duration;
988  ingestModuleRunTimes.put(moduleDisplayName, prevTime);
989  }
990  }
991 
997  @Override
998  public Map<String, Long> getModuleRunTimes() {
999  synchronized (ingestModuleRunTimes) {
1000  Map<String, Long> times = new HashMap<>(ingestModuleRunTimes);
1001  return times;
1002  }
1003  }
1004 
1011  @Override
1012  public List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
1013  return new ArrayList<>(ingestThreadActivitySnapshots.values());
1014  }
1015 
1021  @Override
1022  public List<IngestJobProgressSnapshot> getIngestJobSnapshots() {
1023  List<IngestJobProgressSnapshot> snapShots = new ArrayList<>();
1024  synchronized (ingestJobsById) {
1025  ingestJobsById.values().forEach((job) -> {
1026  IngestJobProgressSnapshot snapshot = job.getDiagnosticStatsSnapshot();
1027  if (snapshot != null) {
1028  snapShots.add(snapshot);
1029  }
1030  });
1031  }
1032  return snapShots;
1033  }
1034 
1041  long getFreeDiskSpace() {
1042  if (ingestMonitor != null) {
1043  return ingestMonitor.getFreeSpace();
1044  } else {
1045  return -1;
1046  }
1047  }
1048 
1052  private final class StartIngestJobTask implements Callable<Void> {
1053 
1054  private final long threadId;
1055  private final IngestJob job;
1056  private ProgressHandle progress;
1057 
1058  StartIngestJobTask(long threadId, IngestJob job) {
1059  this.threadId = threadId;
1060  this.job = job;
1061  }
1062 
1063  @Override
1064  public Void call() {
1065  try {
1066  if (Thread.currentThread().isInterrupted()) {
1067  synchronized (ingestJobsById) {
1068  ingestJobsById.remove(job.getId());
1069  }
1070  return null;
1071  }
1072 
1074  final String displayName = NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.displayName");
1075  this.progress = ProgressHandle.createHandle(displayName, new Cancellable() {
1076  @Override
1077  public boolean cancel() {
1078  if (progress != null) {
1079  progress.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.cancelling", displayName));
1080  }
1081  synchronized (startIngestJobFutures) {
1082  Future<?> handle = startIngestJobFutures.remove(threadId);
1083  handle.cancel(true);
1084  }
1085  return true;
1086  }
1087  });
1088  progress.start();
1089  }
1090 
1091  startIngestJob(job);
1092  return null;
1093 
1094  } finally {
1095  if (null != progress) {
1096  progress.finish();
1097  }
1098  synchronized (startIngestJobFutures) {
1099  startIngestJobFutures.remove(threadId);
1100  }
1101  }
1102  }
1103 
1104  }
1105 
1109  private final class ExecuteIngestJobTasksTask implements Runnable {
1110 
1111  private final long threadId;
1112  private final BlockingIngestTaskQueue tasks;
1113 
1114  ExecuteIngestJobTasksTask(long threadId, BlockingIngestTaskQueue tasks) {
1115  this.threadId = threadId;
1116  this.tasks = tasks;
1117  }
1118 
1119  @Override
1120  public void run() {
1121  while (true) {
1122  try {
1123  IngestTask task = tasks.getNextTask(); // Blocks.
1124  task.execute(threadId);
1125  } catch (InterruptedException ex) {
1126  break;
1127  }
1128  if (Thread.currentThread().isInterrupted()) {
1129  break;
1130  }
1131  }
1132  }
1133  }
1134 
1138  private static final class PublishEventTask implements Runnable {
1139 
1140  private final AutopsyEvent event;
1142 
1151  this.event = event;
1152  this.publisher = publisher;
1153  }
1154 
1155  @Override
1156  public void run() {
1157  publisher.publish(event);
1158  }
1159 
1160  }
1161 
1165  @Immutable
1166  public static final class IngestThreadActivitySnapshot implements Serializable {
1167 
1168  private static final long serialVersionUID = 1L;
1169 
1170  private final long threadId;
1171  private final Date startTime;
1172  private final String moduleDisplayName;
1173  private final String dataSourceName;
1174  private final String fileName;
1175  private final long jobId;
1176 
1182  IngestThreadActivitySnapshot(long threadId) {
1183  this.threadId = threadId;
1184  startTime = new Date();
1185  this.moduleDisplayName = NbBundle.getMessage(this.getClass(), "IngestManager.IngestThreadActivitySnapshot.idleThread");
1186  this.dataSourceName = "";
1187  this.fileName = "";
1188  this.jobId = 0;
1189  }
1190 
1204  IngestThreadActivitySnapshot(long threadId, long jobId, String moduleDisplayName, Content dataSource) {
1205  this.threadId = threadId;
1206  this.jobId = jobId;
1207  startTime = new Date();
1208  this.moduleDisplayName = moduleDisplayName;
1209  this.dataSourceName = dataSource.getName();
1210  this.fileName = "";
1211  }
1212 
1228  IngestThreadActivitySnapshot(long threadId, long jobId, String moduleDisplayName, Content dataSource, String fileName) {
1229  this.threadId = threadId;
1230  this.jobId = jobId;
1231  startTime = new Date();
1232  this.moduleDisplayName = moduleDisplayName;
1233  this.dataSourceName = dataSource.getName();
1234  this.fileName = fileName;
1235  }
1236 
1243  long getIngestJobId() {
1244  return jobId;
1245  }
1246 
1252  long getThreadId() {
1253  return threadId;
1254  }
1255 
1261  Date getStartTime() {
1262  return new Date(startTime.getTime());
1263  }
1264 
1270  String getModuleDisplayName() {
1271  return moduleDisplayName;
1272  }
1273 
1280  String getDataSourceName() {
1281  return dataSourceName;
1282  }
1283 
1289  String getFileName() {
1290  return fileName;
1291  }
1292 
1293  }
1294 
1298  public enum IngestJobEvent {
1299 
1336  }
1337 
1341  public enum IngestModuleEvent {
1342 
1364  }
1365 
1369  public final static class IngestManagerException extends Exception {
1370 
1371  private static final long serialVersionUID = 1L;
1372 
1378  private IngestManagerException(String message) {
1379  super(message);
1380  }
1381 
1388  private IngestManagerException(String message, Throwable cause) {
1389  super(message, cause);
1390  }
1391  }
1392 
1401  @Deprecated
1402  public static void addPropertyChangeListener(final PropertyChangeListener listener) {
1405  }
1406 
1415  @Deprecated
1416  public static void removePropertyChangeListener(final PropertyChangeListener listener) {
1419  }
1420 
1431  @Deprecated
1432  public synchronized IngestJob startIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
1433  return beginIngestJob(dataSources, settings).getJob();
1434  }
1435 
1442  @Deprecated
1443  public void cancelAllIngestJobs() {
1445  }
1446 
1447 }
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
void addIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final Map< Long, Future< Void > > startIngestJobFutures
List< IngestJobProgressSnapshot > getIngestJobSnapshots()
void removeIngestModuleEventListener(final PropertyChangeListener listener)
IngestStream openIngestStream(DataSource dataSource, IngestJobSettings settings)
List< IngestThreadActivitySnapshot > getIngestThreadActivitySnapshots()
void queueIngestJob(Content dataSource, List< AbstractFile > files, IngestJobSettings settings)
static synchronized IngestManager getInstance()
void removeIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final ExecutorService dataSourceLevelIngestJobTasksExecutor
final ExecutorService dataArtifactIngestTasksExecutor
static void addPropertyChangeListener(final PropertyChangeListener listener)
IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static void removePropertyChangeListener(final PropertyChangeListener listener)
static final Set< String > INGEST_MODULE_EVENT_NAMES
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
void removeIngestJobEventListener(final PropertyChangeListener listener)
void addIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
final ExecutorService analysisResultIngestTasksExecutor
static final Set< String > INGEST_JOB_EVENT_NAMES
final AutopsyEventPublisher moduleEventPublisher
synchronized void openRemoteEventChannel(String channelName)
void addIngestJobEventListener(final PropertyChangeListener listener)
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void addIngestModuleEventListener(final PropertyChangeListener listener)
synchronized static Logger getLogger(String name)
Definition: Logger.java:124
static void addEventTypeSubscriber(Set< Events > eventTypes, PropertyChangeListener subscriber)
Definition: Case.java:712
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
IngestManager.IngestManagerException getStartupException()
final ExecutorService fileLevelIngestJobTasksExecutor
final Map< Long, IngestJob > ingestJobsById
void removeIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
final AutopsyEventPublisher jobEventPublisher

Copyright © 2012-2024 Sleuth Kit Labs. Generated on: Mon Feb 17 2025
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.