Autopsy  4.18.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestManager.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2012-2021 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.io.Serializable;
26 import java.lang.reflect.InvocationTargetException;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.Date;
31 import java.util.EnumSet;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.atomic.AtomicLong;
43 import java.util.logging.Level;
44 import java.util.stream.Collectors;
45 import java.util.stream.Stream;
46 import javax.annotation.concurrent.GuardedBy;
47 import javax.annotation.concurrent.Immutable;
48 import javax.annotation.concurrent.ThreadSafe;
49 import javax.swing.JOptionPane;
50 import javax.swing.SwingUtilities;
51 import org.netbeans.api.progress.ProgressHandle;
52 import org.openide.util.Cancellable;
53 import org.openide.util.NbBundle;
54 import org.openide.windows.WindowManager;
70 import org.sleuthkit.datamodel.AbstractFile;
71 import org.sleuthkit.datamodel.Content;
72 import org.sleuthkit.datamodel.DataSource;
73 import org.sleuthkit.datamodel.TskCoreException;
74 
114 @ThreadSafe
116 
117  private final static Logger logger = Logger.getLogger(IngestManager.class.getName());
118  private final static String INGEST_JOB_EVENT_CHANNEL_NAME = "%s-Ingest-Job-Events"; //NON-NLS
119  private final static Set<String> INGEST_JOB_EVENT_NAMES = Stream.of(IngestJobEvent.values()).map(IngestJobEvent::toString).collect(Collectors.toSet());
120  private final static String INGEST_MODULE_EVENT_CHANNEL_NAME = "%s-Ingest-Module-Events"; //NON-NLS
121  private final static Set<String> INGEST_MODULE_EVENT_NAMES = Stream.of(IngestModuleEvent.values()).map(IngestModuleEvent::toString).collect(Collectors.toSet());
122  private final static int MAX_ERROR_MESSAGE_POSTS = 200;
123  @GuardedBy("IngestManager.class")
124  private static IngestManager instance;
125  private final int numberOfFileIngestThreads;
126  private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L);
127  private final ExecutorService startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS;
128  @GuardedBy("startIngestJobFutures")
129  private final Map<Long, Future<Void>> startIngestJobFutures = new ConcurrentHashMap<>();
130  @GuardedBy("ingestJobsById")
131  private final Map<Long, IngestJob> ingestJobsById = new HashMap<>();
132  private final ExecutorService dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS;
133  private final ExecutorService fileLevelIngestJobTasksExecutor;
134  private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS;
138  private final AutopsyEventPublisher moduleEventPublisher = new AutopsyEventPublisher();
139  private final Object ingestMessageBoxLock = new Object();
140  private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L);
141  private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots = new ConcurrentHashMap<>();
142  private final ConcurrentHashMap<String, Long> ingestModuleRunTimes = new ConcurrentHashMap<>();
143  private volatile IngestMessageTopComponent ingestMessageBox;
144  private volatile boolean caseIsOpen;
145 
152  public synchronized static IngestManager getInstance() {
153  if (null == instance) {
154  instance = new IngestManager();
155  instance.subscribeToServiceMonitorEvents();
156  instance.subscribeToCaseEvents();
157  }
158  return instance;
159  }
160 
165  private IngestManager() {
166  /*
167  * Submit a single Runnable ingest manager task for processing data
168  * source level ingest job tasks to the data source level ingest job
169  * tasks executor.
170  */
171  long threadId = nextIngestManagerTaskId.incrementAndGet();
172  dataSourceLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
173  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
174 
175  /*
176  * Submit a configurable number of Runnable ingest manager tasks for
177  * processing file level ingest job tasks to the file level ingest job
178  * tasks executor.
179  */
181  fileLevelIngestJobTasksExecutor = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS
182  for (int i = 0; i < numberOfFileIngestThreads; ++i) {
183  threadId = nextIngestManagerTaskId.incrementAndGet();
184  fileLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
185  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
186  }
187  }
188 
194  PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> {
195  if (evt.getNewValue().equals(ServicesMonitor.ServiceStatus.DOWN.toString())) {
196  /*
197  * The application services considered to be key services are
198  * only necessary for multi-user cases.
199  */
200  try {
202  return;
203  }
204  } catch (NoCurrentCaseException noCaseOpenException) {
205  return;
206  }
207 
208  String serviceDisplayName = ServicesMonitor.Service.valueOf(evt.getPropertyName()).getDisplayName();
209  logger.log(Level.SEVERE, "Service {0} is down, cancelling all running ingest jobs", serviceDisplayName); //NON-NLS
211  EventQueue.invokeLater(new Runnable() {
212  @Override
213  public void run() {
214  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
215  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
216  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
217  JOptionPane.ERROR_MESSAGE);
218  }
219  });
220  }
222  }
223  };
224 
225  /*
226  * The key services for multi-user cases are currently the case database
227  * server and the Solr server. The Solr server is a key service not
228  * because search is essential, but because the coordination service
229  * (ZooKeeper) is running embedded within the Solr server.
230  */
231  Set<String> servicesList = new HashSet<>();
232  servicesList.add(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString());
233  servicesList.add(ServicesMonitor.Service.REMOTE_KEYWORD_SEARCH.toString());
234  this.servicesMonitor.addSubscriber(servicesList, propChangeListener);
235  }
236 
241  private void subscribeToCaseEvents() {
242  Case.addEventTypeSubscriber(EnumSet.of(Case.Events.CURRENT_CASE), (PropertyChangeEvent event) -> {
243  if (event.getNewValue() != null) {
244  handleCaseOpened();
245  } else {
246  handleCaseClosed();
247  }
248  });
249  }
250 
251  /*
252  * Handles a current case opened event by clearing the ingest messages inbox
253  * and opening a remote event channel for the current case.
254  *
255  * Note that current case change events are published in a strictly
256  * serialized manner, i.e., one event at a time, synchronously.
257  */
258  void handleCaseOpened() {
259  caseIsOpen = true;
261  try {
262  Case openedCase = Case.getCurrentCaseThrows();
263  String channelPrefix = openedCase.getName();
264  if (Case.CaseType.MULTI_USER_CASE == openedCase.getCaseType()) {
265  jobEventPublisher.openRemoteEventChannel(String.format(INGEST_JOB_EVENT_CHANNEL_NAME, channelPrefix));
266  moduleEventPublisher.openRemoteEventChannel(String.format(INGEST_MODULE_EVENT_CHANNEL_NAME, channelPrefix));
267  }
268  } catch (NoCurrentCaseException | AutopsyEventException ex) {
269  logger.log(Level.SEVERE, "Failed to open remote events channel", ex); //NON-NLS
270  MessageNotifyUtil.Notify.error(NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.Title"),
271  NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.ErrMsg"));
272  }
273  }
274 
275  /*
276  * Handles a current case closed event by cancelling all ingest jobs for the
277  * case, closing the remote event channel for the case, and clearing the
278  * ingest messages inbox.
279  *
280  * Note that current case change events are published in a strictly
281  * serialized manner, i.e., one event at a time, synchronously.
282  */
283  void handleCaseClosed() {
284  /*
285  * TODO (JIRA-2227): IngestManager should wait for cancelled ingest jobs
286  * to complete when a case is closed.
287  */
288  this.cancelAllIngestJobs(IngestJob.CancellationReason.CASE_CLOSED);
291  caseIsOpen = false;
293  }
294 
306  public IngestStream openIngestStream(DataSource dataSource, IngestJobSettings settings) throws TskCoreException {
307  IngestJob job = new IngestJob(dataSource, IngestJob.Mode.STREAMING, settings);
308  IngestJobInputStream stream = new IngestJobInputStream(job);
309  if (stream.getIngestJobStartResult().getJob() != null) {
310  return stream;
311  } else if (stream.getIngestJobStartResult().getModuleErrors().isEmpty()) {
312  for (IngestModuleError error : stream.getIngestJobStartResult().getModuleErrors()) {
313  logger.log(Level.SEVERE, String.format("%s ingest module startup error for %s", error.getModuleDisplayName(), dataSource.getName()), error.getThrowable());
314  }
315  throw new TskCoreException("Error starting ingest modules");
316  } else {
317  throw new TskCoreException("Error starting ingest modules", stream.getIngestJobStartResult().getStartupException());
318  }
319  }
320 
329  }
330 
337  public void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
338  if (caseIsOpen) {
339  IngestJob job = new IngestJob(dataSources, settings);
340  if (job.hasIngestPipeline()) {
341  long taskId = nextIngestManagerTaskId.incrementAndGet();
342  Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
343  synchronized (startIngestJobFutures) {
344  startIngestJobFutures.put(taskId, task);
345  }
346  }
347  }
348  }
349 
358  public void queueIngestJob(Content dataSource, List<AbstractFile> files, IngestJobSettings settings) {
359  if (caseIsOpen) {
360  IngestJob job = new IngestJob(dataSource, files, settings);
361  if (job.hasIngestPipeline()) {
362  long taskId = nextIngestManagerTaskId.incrementAndGet();
363  Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
364  synchronized (startIngestJobFutures) {
365  startIngestJobFutures.put(taskId, task);
366  }
367  }
368  }
369  }
370 
380  public IngestJobStartResult beginIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
381  if (caseIsOpen) {
382  IngestJob job = new IngestJob(dataSources, settings);
383  if (job.hasIngestPipeline()) {
384  return startIngestJob(job);
385  }
386  return new IngestJobStartResult(null, new IngestManagerException("No ingest pipeline created, likely due to no ingest modules being enabled"), null); //NON-NLS
387  }
388  return new IngestJobStartResult(null, new IngestManagerException("No case open"), null); //NON-NLS
389  }
390 
399  @NbBundle.Messages({
400  "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
401  "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
402  "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
403  "IngestManager.startupErr.dlgErrorList=Errors:"
404  })
405  IngestJobStartResult startIngestJob(IngestJob job) {
406 
407  // initialize IngestMessageInbox, if it hasn't been initialized yet. This can't be done in
408  // the constructor because that ends up freezing the UI on startup (JIRA-7345).
409  if (SwingUtilities.isEventDispatchThread()) {
410  initIngestMessageInbox();
411  } else {
412  try {
413  SwingUtilities.invokeAndWait(() -> initIngestMessageInbox());
414  } catch (InterruptedException ex) {
415  // ignore interruptions
416  } catch (InvocationTargetException ex) {
417  logger.log(Level.WARNING, "There was an error starting ingest message inbox", ex);
418  }
419  }
420 
421  List<IngestModuleError> errors = null;
422  Case openCase;
423  try {
424  openCase = Case.getCurrentCaseThrows();
425  } catch (NoCurrentCaseException ex) {
426  return new IngestJobStartResult(null, new IngestManagerException("Exception while getting open case.", ex), Collections.<IngestModuleError>emptyList()); //NON-NLS
427  }
428  if (openCase.getCaseType() == Case.CaseType.MULTI_USER_CASE) {
429  try {
430  if (!servicesMonitor.getServiceStatus(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString()).equals(ServicesMonitor.ServiceStatus.UP.toString())) {
431  if (RuntimeProperties.runningWithGUI()) {
432  EventQueue.invokeLater(new Runnable() {
433  @Override
434  public void run() {
435  String serviceDisplayName = ServicesMonitor.Service.REMOTE_CASE_DATABASE.getDisplayName();
436  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
437  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
438  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
439  JOptionPane.ERROR_MESSAGE);
440  }
441  });
442  }
443  return new IngestJobStartResult(null, new IngestManagerException("Ingest aborted. Remote database is down"), Collections.<IngestModuleError>emptyList()); //NON-NLS
444  }
445  } catch (ServicesMonitor.ServicesMonitorException ex) {
446  return new IngestJobStartResult(null, new IngestManagerException("Database server is down", ex), Collections.<IngestModuleError>emptyList()); //NON-NLS
447  }
448  }
449 
450  if (!ingestMonitor.isRunning()) {
451  ingestMonitor.start();
452  }
453 
454  synchronized (ingestJobsById) {
455  ingestJobsById.put(job.getId(), job);
456  }
457  IngestManager.logger.log(Level.INFO, "Starting ingest job {0}", job.getId()); //NON-NLS
458  errors = job.start();
459  if (errors.isEmpty()) {
460  this.fireIngestJobStarted(job.getId());
461  } else {
462  synchronized (ingestJobsById) {
463  this.ingestJobsById.remove(job.getId());
464  }
465  for (IngestModuleError error : errors) {
466  logger.log(Level.SEVERE, String.format("Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.getId()), error.getThrowable()); //NON-NLS
467  }
468  IngestManager.logger.log(Level.SEVERE, "Ingest job {0} could not be started", job.getId()); //NON-NLS
469  if (RuntimeProperties.runningWithGUI()) {
470  final StringBuilder message = new StringBuilder(1024);
471  message.append(Bundle.IngestManager_startupErr_dlgMsg()).append("\n"); //NON-NLS
472  message.append(Bundle.IngestManager_startupErr_dlgSolution()).append("\n\n"); //NON-NLS
473  message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append("\n"); //NON-NLS
474  for (IngestModuleError error : errors) {
475  String moduleName = error.getModuleDisplayName();
476  String errorMessage = error.getThrowable().getLocalizedMessage();
477  message.append(moduleName).append(": ").append(errorMessage).append("\n"); //NON-NLS
478  }
479  message.append("\n\n");
480  EventQueue.invokeLater(() -> {
481  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(), message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
482  });
483  }
484  return new IngestJobStartResult(null, new IngestManagerException("Errors occurred while starting ingest"), errors); //NON-NLS
485  }
486 
487  return new IngestJobStartResult(job, null, errors);
488  }
489 
495  void finishIngestJob(IngestJob job) {
496  long jobId = job.getId();
497  synchronized (ingestJobsById) {
498  ingestJobsById.remove(jobId);
499  }
500  if (!job.isCancelled()) {
501  IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); //NON-NLS
502  fireIngestJobCompleted(jobId);
503  } else {
504  IngestManager.logger.log(Level.INFO, "Ingest job {0} cancelled", jobId); //NON-NLS
505  fireIngestJobCancelled(jobId);
506  }
507  }
508 
515  public boolean isIngestRunning() {
516  synchronized (ingestJobsById) {
517  return !ingestJobsById.isEmpty();
518  }
519  }
520 
527  synchronized (startIngestJobFutures) {
528  startIngestJobFutures.values().forEach((handle) -> {
529  handle.cancel(true);
530  });
531  }
532  synchronized (ingestJobsById) {
533  this.ingestJobsById.values().forEach((job) -> {
534  job.cancel(reason);
535  });
536  }
537  }
538 
544  public void addIngestJobEventListener(final PropertyChangeListener listener) {
545  jobEventPublisher.addSubscriber(INGEST_JOB_EVENT_NAMES, listener);
546  }
547 
555  public void addIngestJobEventListener(Set<IngestJobEvent> eventTypes, final PropertyChangeListener listener) {
556  eventTypes.forEach((IngestJobEvent event) -> {
557  jobEventPublisher.addSubscriber(event.toString(), listener);
558  });
559  }
560 
566  public void removeIngestJobEventListener(final PropertyChangeListener listener) {
567  jobEventPublisher.removeSubscriber(INGEST_JOB_EVENT_NAMES, listener);
568  }
569 
576  public void removeIngestJobEventListener(Set<IngestJobEvent> eventTypes, final PropertyChangeListener listener) {
577  eventTypes.forEach((IngestJobEvent event) -> {
578  jobEventPublisher.removeSubscriber(event.toString(), listener);
579  });
580  }
581 
587  public void addIngestModuleEventListener(final PropertyChangeListener listener) {
588  moduleEventPublisher.addSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
589  }
590 
598  public void addIngestModuleEventListener(Set<IngestModuleEvent> eventTypes, final PropertyChangeListener listener) {
599  eventTypes.forEach((IngestModuleEvent event) -> {
600  moduleEventPublisher.addSubscriber(event.toString(), listener);
601  });
602  }
603 
609  public void removeIngestModuleEventListener(final PropertyChangeListener listener) {
610  moduleEventPublisher.removeSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
611  }
612 
619  public void removeIngestModuleEventListener(Set<IngestModuleEvent> eventTypes, final PropertyChangeListener listener) {
620  moduleEventPublisher.removeSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
621  }
622 
628  void fireIngestJobStarted(long ingestJobId) {
629  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.STARTED.toString(), ingestJobId, null);
630  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
631  }
632 
638  void fireIngestJobCompleted(long ingestJobId) {
639  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
640  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
641  }
642 
648  void fireIngestJobCancelled(long ingestJobId) {
649  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
650  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
651  }
652 
661  void fireDataSourceAnalysisStarted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
662  AutopsyEvent event = new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
663  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
664  }
665 
674  void fireDataSourceAnalysisCompleted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
675  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
676  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
677  }
678 
687  void fireDataSourceAnalysisCancelled(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
688  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
689  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
690  }
691 
698  void fireFileIngestDone(AbstractFile file) {
699  AutopsyEvent event = new FileAnalyzedEvent(file);
700  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
701  }
702 
710  void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
711  AutopsyEvent event = new BlackboardPostEvent(moduleDataEvent);
712  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
713  }
714 
722  void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
723  AutopsyEvent event = new ContentChangedEvent(moduleContentEvent);
724  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
725  }
726 
735  void initIngestMessageInbox() {
736  synchronized (this.ingestMessageBoxLock) {
737  ingestMessageBox = IngestMessageTopComponent.findInstance();
738  }
739  }
740 
746  void postIngestMessage(IngestMessage message) {
747  synchronized (this.ingestMessageBoxLock) {
748  if (ingestMessageBox != null && RuntimeProperties.runningWithGUI()) {
749  if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
750  ingestMessageBox.displayMessage(message);
751  } else {
752  long errorPosts = ingestErrorMessagePosts.incrementAndGet();
753  if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
754  ingestMessageBox.displayMessage(message);
755  } else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
756  IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
757  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
758  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
759  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.msg", MAX_ERROR_MESSAGE_POSTS));
760  ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
761  }
762  }
763  }
764  }
765  }
766 
767  /*
768  * Clears the ingest messages inbox.
769  */
770  private void clearIngestMessageBox() {
771  synchronized (this.ingestMessageBoxLock) {
772  if (null != ingestMessageBox) {
773  ingestMessageBox.clearMessages();
774  }
776  }
777  }
778 
787  void setIngestTaskProgress(DataSourceIngestTask task, String currentModuleName) {
788  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
789  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), currentModuleName, task.getDataSource());
790  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
791 
792  /*
793  * Update the total run time for the PREVIOUS ingest module in the
794  * pipeline, which has now finished its processing for the task.
795  */
796  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
797  }
798 
807  void setIngestTaskProgress(FileIngestTask task, String currentModuleName) {
808  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
809  IngestThreadActivitySnapshot newSnap;
810  try {
811  newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), currentModuleName, task.getDataSource(), task.getFile());
812  } catch (TskCoreException ex) {
813  logger.log(Level.SEVERE, "Error getting file from file ingest task", ex);
814  newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), currentModuleName, task.getDataSource());
815  }
816  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
817 
818  /*
819  * Update the total run time for the PREVIOUS ingest module in the
820  * pipeline, which has now finished its processing for the task.
821  */
822  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
823  }
824 
830  void setIngestTaskProgressCompleted(IngestTask task) {
831  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
832  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId());
833  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
834 
835  /*
836  * Update the total run time for the LAST ingest module in the pipeline,
837  * which has now finished its processing for the task.
838  */
839  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
840  }
841 
848  void incrementModuleRunTime(String moduleDisplayName, Long duration) {
849  if (moduleDisplayName.equals("IDLE")) { //NON-NLS
850  return;
851  }
852 
853  synchronized (ingestModuleRunTimes) {
854  Long prevTimeL = ingestModuleRunTimes.get(moduleDisplayName);
855  long prevTime = 0;
856  if (prevTimeL != null) {
857  prevTime = prevTimeL;
858  }
859  prevTime += duration;
860  ingestModuleRunTimes.put(moduleDisplayName, prevTime);
861  }
862  }
863 
869  @Override
870  public Map<String, Long> getModuleRunTimes() {
871  synchronized (ingestModuleRunTimes) {
872  Map<String, Long> times = new HashMap<>(ingestModuleRunTimes);
873  return times;
874  }
875  }
876 
883  @Override
884  public List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
885  return new ArrayList<>(ingestThreadActivitySnapshots.values());
886  }
887 
893  @Override
894  public List<Snapshot> getIngestJobSnapshots() {
895  List<Snapshot> snapShots = new ArrayList<>();
896  synchronized (ingestJobsById) {
897  ingestJobsById.values().forEach((job) -> {
898  snapShots.addAll(job.getDataSourceIngestJobSnapshots());
899  });
900  }
901  return snapShots;
902  }
903 
910  long getFreeDiskSpace() {
911  if (ingestMonitor != null) {
912  return ingestMonitor.getFreeSpace();
913  } else {
914  return -1;
915  }
916  }
917 
921  private final class StartIngestJobTask implements Callable<Void> {
922 
923  private final long threadId;
924  private final IngestJob job;
925  private ProgressHandle progress;
926 
927  StartIngestJobTask(long threadId, IngestJob job) {
928  this.threadId = threadId;
929  this.job = job;
930  }
931 
932  @Override
933  public Void call() {
934  try {
935  if (Thread.currentThread().isInterrupted()) {
936  synchronized (ingestJobsById) {
937  ingestJobsById.remove(job.getId());
938  }
939  return null;
940  }
941 
943  final String displayName = NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.displayName");
944  this.progress = ProgressHandle.createHandle(displayName, new Cancellable() {
945  @Override
946  public boolean cancel() {
947  if (progress != null) {
948  progress.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.cancelling", displayName));
949  }
950  synchronized (startIngestJobFutures) {
951  Future<?> handle = startIngestJobFutures.remove(threadId);
952  handle.cancel(true);
953  }
954  return true;
955  }
956  });
957  progress.start();
958  }
959 
960  startIngestJob(job);
961  return null;
962 
963  } finally {
964  if (null != progress) {
965  progress.finish();
966  }
967  synchronized (startIngestJobFutures) {
968  startIngestJobFutures.remove(threadId);
969  }
970  }
971  }
972 
973  }
974 
978  private final class ExecuteIngestJobTasksTask implements Runnable {
979 
980  private final long threadId;
981  private final BlockingIngestTaskQueue tasks;
982 
983  ExecuteIngestJobTasksTask(long threadId, BlockingIngestTaskQueue tasks) {
984  this.threadId = threadId;
985  this.tasks = tasks;
986  }
987 
988  @Override
989  public void run() {
990  while (true) {
991  try {
992  IngestTask task = tasks.getNextTask(); // Blocks.
993  task.execute(threadId);
994  } catch (InterruptedException ex) {
995  break;
996  }
997  if (Thread.currentThread().isInterrupted()) {
998  break;
999  }
1000  }
1001  }
1002  }
1003 
1007  private static final class PublishEventTask implements Runnable {
1008 
1009  private final AutopsyEvent event;
1011 
1020  this.event = event;
1021  this.publisher = publisher;
1022  }
1023 
1024  @Override
1025  public void run() {
1026  publisher.publish(event);
1027  }
1028 
1029  }
1030 
1035  @Immutable
1036  public static final class IngestThreadActivitySnapshot implements Serializable {
1037 
1038  private static final long serialVersionUID = 1L;
1039 
1040  private final long threadId;
1041  private final Date startTime;
1042  private final String activity;
1043  private final String dataSourceName;
1044  private final String fileName;
1045  private final long jobId;
1046 
1054  IngestThreadActivitySnapshot(long threadId) {
1055  this.threadId = threadId;
1056  startTime = new Date();
1057  this.activity = NbBundle.getMessage(this.getClass(), "IngestManager.IngestThreadActivitySnapshot.idleThread");
1058  this.dataSourceName = "";
1059  this.fileName = "";
1060  this.jobId = 0;
1061  }
1062 
1073  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource) {
1074  this.threadId = threadId;
1075  this.jobId = jobId;
1076  startTime = new Date();
1077  this.activity = activity;
1078  this.dataSourceName = dataSource.getName();
1079  this.fileName = "";
1080  }
1081 
1094  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource, AbstractFile file) {
1095  this.threadId = threadId;
1096  this.jobId = jobId;
1097  startTime = new Date();
1098  this.activity = activity;
1099  this.dataSourceName = dataSource.getName();
1100  this.fileName = file.getName();
1101  }
1102 
1108  long getIngestJobId() {
1109  return jobId;
1110  }
1111 
1117  long getThreadId() {
1118  return threadId;
1119  }
1120 
1126  Date getStartTime() {
1127  return startTime;
1128  }
1129 
1135  String getActivity() {
1136  return activity;
1137  }
1138 
1146  String getDataSourceName() {
1147  return dataSourceName;
1148  }
1149 
1155  String getFileName() {
1156  return fileName;
1157  }
1158 
1159  }
1160 
1164  public enum IngestJobEvent {
1165 
1202  }
1203 
1207  public enum IngestModuleEvent {
1208 
1230  }
1231 
1235  public final static class IngestManagerException extends Exception {
1236 
1237  private static final long serialVersionUID = 1L;
1238 
1244  private IngestManagerException(String message) {
1245  super(message);
1246  }
1247 
1254  private IngestManagerException(String message, Throwable cause) {
1255  super(message, cause);
1256  }
1257  }
1258 
1267  @Deprecated
1268  public static void addPropertyChangeListener(final PropertyChangeListener listener) {
1271  }
1272 
1281  @Deprecated
1282  public static void removePropertyChangeListener(final PropertyChangeListener listener) {
1285  }
1286 
1297  @Deprecated
1298  public synchronized IngestJob startIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
1299  return beginIngestJob(dataSources, settings).getJob();
1300  }
1301 
1308  @Deprecated
1309  public void cancelAllIngestJobs() {
1311  }
1312 
1313 }
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
void addIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final Map< Long, Future< Void > > startIngestJobFutures
void removeIngestModuleEventListener(final PropertyChangeListener listener)
IngestStream openIngestStream(DataSource dataSource, IngestJobSettings settings)
List< IngestThreadActivitySnapshot > getIngestThreadActivitySnapshots()
void queueIngestJob(Content dataSource, List< AbstractFile > files, IngestJobSettings settings)
static synchronized IngestManager getInstance()
void removeIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final ExecutorService dataSourceLevelIngestJobTasksExecutor
static void addPropertyChangeListener(final PropertyChangeListener listener)
IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static void removePropertyChangeListener(final PropertyChangeListener listener)
static final Set< String > INGEST_MODULE_EVENT_NAMES
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
void removeIngestJobEventListener(final PropertyChangeListener listener)
void addIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
static final Set< String > INGEST_JOB_EVENT_NAMES
final AutopsyEventPublisher moduleEventPublisher
synchronized void openRemoteEventChannel(String channelName)
void addIngestJobEventListener(final PropertyChangeListener listener)
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void addIngestModuleEventListener(final PropertyChangeListener listener)
synchronized static Logger getLogger(String name)
Definition: Logger.java:124
static void addEventTypeSubscriber(Set< Events > eventTypes, PropertyChangeListener subscriber)
Definition: Case.java:676
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
final ExecutorService fileLevelIngestJobTasksExecutor
final Map< Long, IngestJob > ingestJobsById
void removeIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
final AutopsyEventPublisher jobEventPublisher

Copyright © 2012-2021 Basis Technology. Generated on: Thu Jul 8 2021
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.