Autopsy  4.13.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestManager.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2012-2019 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.io.Serializable;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.Date;
30 import java.util.EnumSet;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.atomic.AtomicLong;
42 import java.util.logging.Level;
43 import java.util.stream.Collectors;
44 import java.util.stream.Stream;
45 import javax.annotation.concurrent.GuardedBy;
46 import javax.annotation.concurrent.Immutable;
47 import javax.annotation.concurrent.ThreadSafe;
48 import javax.swing.JOptionPane;
49 import org.netbeans.api.progress.ProgressHandle;
50 import org.openide.util.Cancellable;
51 import org.openide.util.NbBundle;
52 import org.openide.windows.WindowManager;
68 import org.sleuthkit.datamodel.AbstractFile;
69 import org.sleuthkit.datamodel.Content;
70 
110 @ThreadSafe
112 
113  private final static Logger logger = Logger.getLogger(IngestManager.class.getName());
114  private final static String INGEST_JOB_EVENT_CHANNEL_NAME = "%s-Ingest-Job-Events"; //NON-NLS
115  private final static Set<String> INGEST_JOB_EVENT_NAMES = Stream.of(IngestJobEvent.values()).map(IngestJobEvent::toString).collect(Collectors.toSet());
116  private final static String INGEST_MODULE_EVENT_CHANNEL_NAME = "%s-Ingest-Module-Events"; //NON-NLS
117  private final static Set<String> INGEST_MODULE_EVENT_NAMES = Stream.of(IngestModuleEvent.values()).map(IngestModuleEvent::toString).collect(Collectors.toSet());
118  private final static int MAX_ERROR_MESSAGE_POSTS = 200;
119  @GuardedBy("IngestManager.class")
120  private static IngestManager instance;
121  private final int numberOfFileIngestThreads;
122  private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L);
123  private final ExecutorService startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS;
124  private final Map<Long, Future<Void>> startIngestJobFutures = new ConcurrentHashMap<>();
125  private final Map<Long, IngestJob> ingestJobsById = new HashMap<>();
126  private final ExecutorService dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS;
127  private final ExecutorService fileLevelIngestJobTasksExecutor;
128  private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS;
132  private final AutopsyEventPublisher moduleEventPublisher = new AutopsyEventPublisher();
133  private final Object ingestMessageBoxLock = new Object();
134  private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L);
135  private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots = new ConcurrentHashMap<>();
136  private final ConcurrentHashMap<String, Long> ingestModuleRunTimes = new ConcurrentHashMap<>();
137  private volatile IngestMessageTopComponent ingestMessageBox;
138  private volatile boolean caseIsOpen;
139 
146  public synchronized static IngestManager getInstance() {
147  if (null == instance) {
148  instance = new IngestManager();
149  instance.subscribeToServiceMonitorEvents();
150  instance.subscribeToCaseEvents();
151  }
152  return instance;
153  }
154 
159  private IngestManager() {
160  /*
161  * Submit a single Runnable ingest manager task for processing data
162  * source level ingest job tasks to the data source level ingest job
163  * tasks executor.
164  */
165  long threadId = nextIngestManagerTaskId.incrementAndGet();
166  dataSourceLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
167  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
168 
169  /*
170  * Submit a configurable number of Runnable ingest manager tasks for
171  * processing file level ingest job tasks to the file level ingest job
172  * tasks executor.
173  */
175  fileLevelIngestJobTasksExecutor = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS
176  for (int i = 0; i < numberOfFileIngestThreads; ++i) {
177  threadId = nextIngestManagerTaskId.incrementAndGet();
178  fileLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
179  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
180  }
181  }
182 
188  PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> {
189  if (evt.getNewValue().equals(ServicesMonitor.ServiceStatus.DOWN.toString())) {
190  /*
191  * The application services considered to be key services are
192  * only necessary for multi-user cases.
193  */
194  try {
196  return;
197  }
198  } catch (NoCurrentCaseException noCaseOpenException) {
199  return;
200  }
201 
202  String serviceDisplayName = ServicesMonitor.Service.valueOf(evt.getPropertyName()).getDisplayName();
203  logger.log(Level.SEVERE, "Service {0} is down, cancelling all running ingest jobs", serviceDisplayName); //NON-NLS
205  EventQueue.invokeLater(new Runnable() {
206  @Override
207  public void run() {
208  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
209  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
210  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
211  JOptionPane.ERROR_MESSAGE);
212  }
213  });
214  }
216  }
217  };
218 
219  /*
220  * The key services for multi-user cases are currently the case database
221  * server and the Solr server. The Solr server is a key service not
222  * because search is essential, but because the coordination service
223  * (ZooKeeper) is running embedded within the Solr server.
224  */
225  Set<String> servicesList = new HashSet<>();
226  servicesList.add(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString());
227  servicesList.add(ServicesMonitor.Service.REMOTE_KEYWORD_SEARCH.toString());
228  this.servicesMonitor.addSubscriber(servicesList, propChangeListener);
229  }
230 
235  private void subscribeToCaseEvents() {
236  Case.addEventTypeSubscriber(EnumSet.of(Case.Events.CURRENT_CASE), (PropertyChangeEvent event) -> {
237  if (event.getNewValue() != null) {
238  handleCaseOpened();
239  } else {
240  handleCaseClosed();
241  }
242  });
243  }
244 
245  /*
246  * Handles a current case opened event by clearing the ingest messages inbox
247  * and opening a remote event channel for the current case.
248  *
249  * Note that current case change events are published in a strictly
250  * serialized manner, i.e., one event at a time, synchronously.
251  */
252  void handleCaseOpened() {
253  caseIsOpen = true;
255  try {
256  Case openedCase = Case.getCurrentCaseThrows();
257  String channelPrefix = openedCase.getName();
258  if (Case.CaseType.MULTI_USER_CASE == openedCase.getCaseType()) {
259  jobEventPublisher.openRemoteEventChannel(String.format(INGEST_JOB_EVENT_CHANNEL_NAME, channelPrefix));
260  moduleEventPublisher.openRemoteEventChannel(String.format(INGEST_MODULE_EVENT_CHANNEL_NAME, channelPrefix));
261  }
262  } catch (NoCurrentCaseException | AutopsyEventException ex) {
263  logger.log(Level.SEVERE, "Failed to open remote events channel", ex); //NON-NLS
264  MessageNotifyUtil.Notify.error(NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.Title"),
265  NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.ErrMsg"));
266  }
267  }
268 
269  /*
270  * Handles a current case closed event by cancelling all ingest jobs for the
271  * case, closing the remote event channel for the case, and clearing the
272  * ingest messages inbox.
273  *
274  * Note that current case change events are published in a strictly
275  * serialized manner, i.e., one event at a time, synchronously.
276  */
277  void handleCaseClosed() {
278  /*
279  * TODO (JIRA-2227): IngestManager should wait for cancelled ingest jobs
280  * to complete when a case is closed.
281  */
282  this.cancelAllIngestJobs(IngestJob.CancellationReason.CASE_CLOSED);
285  caseIsOpen = false;
287  }
288 
297  }
298 
305  public void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
306  if (caseIsOpen) {
307  IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI());
308  if (job.hasIngestPipeline()) {
309  long taskId = nextIngestManagerTaskId.incrementAndGet();
310  Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
311  startIngestJobFutures.put(taskId, task);
312  }
313  }
314  }
315 
324  public void queueIngestJob(Content dataSource, List<AbstractFile> files, IngestJobSettings settings) {
325  if (caseIsOpen) {
326  IngestJob job = new IngestJob(dataSource, files, settings, RuntimeProperties.runningWithGUI());
327  if (job.hasIngestPipeline()) {
328  long taskId = nextIngestManagerTaskId.incrementAndGet();
329  Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
330  startIngestJobFutures.put(taskId, task);
331  }
332  }
333  }
334 
344  public IngestJobStartResult beginIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
345  if (caseIsOpen) {
346  IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI());
347  if (job.hasIngestPipeline()) {
348  return startIngestJob(job);
349  }
350  return new IngestJobStartResult(null, new IngestManagerException("No ingest pipeline created, likely due to no ingest modules being enabled"), null); //NON-NLS
351  }
352  return new IngestJobStartResult(null, new IngestManagerException("No case open"), null); //NON-NLS
353  }
354 
363  @NbBundle.Messages({
364  "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
365  "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
366  "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
367  "IngestManager.startupErr.dlgErrorList=Errors:"
368  })
370  List<IngestModuleError> errors = null;
371  Case openCase;
372  try {
373  openCase = Case.getCurrentCaseThrows();
374  } catch (NoCurrentCaseException ex) {
375  return new IngestJobStartResult(null, new IngestManagerException("Exception while getting open case.", ex), Collections.<IngestModuleError>emptyList()); //NON-NLS
376  }
377  if (openCase.getCaseType() == Case.CaseType.MULTI_USER_CASE) {
378  try {
381  EventQueue.invokeLater(new Runnable() {
382  @Override
383  public void run() {
384  String serviceDisplayName = ServicesMonitor.Service.REMOTE_CASE_DATABASE.getDisplayName();
385  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
386  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
387  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
388  JOptionPane.ERROR_MESSAGE);
389  }
390  });
391  }
392  return new IngestJobStartResult(null, new IngestManagerException("Ingest aborted. Remote database is down"), Collections.<IngestModuleError>emptyList()); //NON-NLS
393  }
395  return new IngestJobStartResult(null, new IngestManagerException("Database server is down", ex), Collections.<IngestModuleError>emptyList()); //NON-NLS
396  }
397  }
398 
399  if (!ingestMonitor.isRunning()) {
400  ingestMonitor.start();
401  }
402 
403  synchronized (ingestJobsById) {
404  ingestJobsById.put(job.getId(), job);
405  }
406  IngestManager.logger.log(Level.INFO, "Starting ingest job {0}", job.getId()); //NON-NLS
407  errors = job.start();
408  if (errors.isEmpty()) {
409  this.fireIngestJobStarted(job.getId());
410  } else {
411  synchronized (ingestJobsById) {
412  this.ingestJobsById.remove(job.getId());
413  }
414  for (IngestModuleError error : errors) {
415  logger.log(Level.SEVERE, String.format("Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.getId()), error.getThrowable()); //NON-NLS
416  }
417  IngestManager.logger.log(Level.SEVERE, "Ingest job {0} could not be started", job.getId()); //NON-NLS
419  final StringBuilder message = new StringBuilder(1024);
420  message.append(Bundle.IngestManager_startupErr_dlgMsg()).append("\n"); //NON-NLS
421  message.append(Bundle.IngestManager_startupErr_dlgSolution()).append("\n\n"); //NON-NLS
422  message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append("\n"); //NON-NLS
423  for (IngestModuleError error : errors) {
424  String moduleName = error.getModuleDisplayName();
425  String errorMessage = error.getThrowable().getLocalizedMessage();
426  message.append(moduleName).append(": ").append(errorMessage).append("\n"); //NON-NLS
427  }
428  message.append("\n\n");
429  EventQueue.invokeLater(() -> {
430  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(), message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
431  });
432  }
433  return new IngestJobStartResult(null, new IngestManagerException("Errors occurred while starting ingest"), errors); //NON-NLS
434  }
435 
436  return new IngestJobStartResult(job, null, errors);
437  }
438 
444  void finishIngestJob(IngestJob job) {
445  long jobId = job.getId();
446  synchronized (ingestJobsById) {
447  ingestJobsById.remove(jobId);
448  }
449  if (!job.isCancelled()) {
450  IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); //NON-NLS
451  fireIngestJobCompleted(jobId);
452  } else {
453  IngestManager.logger.log(Level.INFO, "Ingest job {0} cancelled", jobId); //NON-NLS
454  fireIngestJobCancelled(jobId);
455  }
456  }
457 
464  public boolean isIngestRunning() {
465  synchronized (ingestJobsById) {
466  return !ingestJobsById.isEmpty();
467  }
468  }
469 
476  startIngestJobFutures.values().forEach((handle) -> {
477  handle.cancel(true);
478  });
479  synchronized (ingestJobsById) {
480  this.ingestJobsById.values().forEach((job) -> {
481  job.cancel(reason);
482  });
483  }
484  }
485 
491  public void addIngestJobEventListener(final PropertyChangeListener listener) {
492  jobEventPublisher.addSubscriber(INGEST_JOB_EVENT_NAMES, listener);
493  }
494 
501  public void addIngestJobEventListener(Set<IngestJobEvent> eventTypes, final PropertyChangeListener listener) {
502  eventTypes.forEach((IngestJobEvent event) -> {
503  jobEventPublisher.addSubscriber(event.toString(), listener);
504  });
505  }
506 
512  public void removeIngestJobEventListener(final PropertyChangeListener listener) {
513  jobEventPublisher.removeSubscriber(INGEST_JOB_EVENT_NAMES, listener);
514  }
515 
522  public void removeIngestJobEventListener(Set<IngestJobEvent> eventTypes, final PropertyChangeListener listener) {
523  eventTypes.forEach((IngestJobEvent event) -> {
524  jobEventPublisher.removeSubscriber(event.toString(), listener);
525  });
526  }
527 
533  public void addIngestModuleEventListener(final PropertyChangeListener listener) {
534  moduleEventPublisher.addSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
535  }
536 
543  public void addIngestModuleEventListener(Set<IngestModuleEvent> eventTypes, final PropertyChangeListener listener) {
544  eventTypes.forEach((IngestModuleEvent event) -> {
545  moduleEventPublisher.addSubscriber(event.toString(), listener);
546  });
547  }
548 
554  public void removeIngestModuleEventListener(final PropertyChangeListener listener) {
555  moduleEventPublisher.removeSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
556  }
557 
564  public void removeIngestModuleEventListener(Set<IngestModuleEvent> eventTypes, final PropertyChangeListener listener) {
565  moduleEventPublisher.removeSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
566  }
567 
573  void fireIngestJobStarted(long ingestJobId) {
574  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.STARTED.toString(), ingestJobId, null);
575  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
576  }
577 
583  void fireIngestJobCompleted(long ingestJobId) {
584  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
585  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
586  }
587 
593  void fireIngestJobCancelled(long ingestJobId) {
594  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
595  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
596  }
597 
606  void fireDataSourceAnalysisStarted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
607  AutopsyEvent event = new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
608  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
609  }
610 
619  void fireDataSourceAnalysisCompleted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
620  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
621  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
622  }
623 
632  void fireDataSourceAnalysisCancelled(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
633  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
634  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
635  }
636 
643  void fireFileIngestDone(AbstractFile file) {
644  AutopsyEvent event = new FileAnalyzedEvent(file);
645  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
646  }
647 
655  void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
656  AutopsyEvent event = new BlackboardPostEvent(moduleDataEvent);
657  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
658  }
659 
667  void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
668  AutopsyEvent event = new ContentChangedEvent(moduleContentEvent);
669  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
670  }
671 
677  void initIngestMessageInbox() {
678  synchronized (this.ingestMessageBoxLock) {
679  ingestMessageBox = IngestMessageTopComponent.findInstance();
680  }
681  }
682 
688  void postIngestMessage(IngestMessage message) {
689  synchronized (this.ingestMessageBoxLock) {
690  if (ingestMessageBox != null && RuntimeProperties.runningWithGUI()) {
691  if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
692  ingestMessageBox.displayMessage(message);
693  } else {
694  long errorPosts = ingestErrorMessagePosts.incrementAndGet();
695  if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
696  ingestMessageBox.displayMessage(message);
697  } else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
698  IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
699  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
700  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
701  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.msg", MAX_ERROR_MESSAGE_POSTS));
702  ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
703  }
704  }
705  }
706  }
707  }
708 
709  /*
710  * Clears the ingest messages inbox.
711  */
712  private void clearIngestMessageBox() {
713  synchronized (this.ingestMessageBoxLock) {
714  if (null != ingestMessageBox) {
715  ingestMessageBox.clearMessages();
716  }
718  }
719  }
720 
732  void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
733  ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
734  }
735 
747  void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
748  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
749  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
750  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
751  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
752  }
753 
761  void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
762  ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId()));
763  }
764 
772  void setIngestTaskProgressCompleted(FileIngestTask task) {
773  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
774  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId());
775  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
776  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
777  }
778 
785  private void incrementModuleRunTime(String moduleDisplayName, Long duration) {
786  if (moduleDisplayName.equals("IDLE")) { //NON-NLS
787  return;
788  }
789 
790  synchronized (ingestModuleRunTimes) {
791  Long prevTimeL = ingestModuleRunTimes.get(moduleDisplayName);
792  long prevTime = 0;
793  if (prevTimeL != null) {
794  prevTime = prevTimeL;
795  }
796  prevTime += duration;
797  ingestModuleRunTimes.put(moduleDisplayName, prevTime);
798  }
799  }
800 
806  @Override
807  public Map<String, Long> getModuleRunTimes() {
808  synchronized (ingestModuleRunTimes) {
809  Map<String, Long> times = new HashMap<>(ingestModuleRunTimes);
810  return times;
811  }
812  }
813 
820  @Override
821  public List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
822  return new ArrayList<>(ingestThreadActivitySnapshots.values());
823  }
824 
830  @Override
832  List<DataSourceIngestJob.Snapshot> snapShots = new ArrayList<>();
833  synchronized (ingestJobsById) {
834  ingestJobsById.values().forEach((job) -> {
835  snapShots.addAll(job.getDataSourceIngestJobSnapshots());
836  });
837  }
838  return snapShots;
839  }
840 
847  long getFreeDiskSpace() {
848  if (ingestMonitor != null) {
849  return ingestMonitor.getFreeSpace();
850  } else {
851  return -1;
852  }
853  }
854 
858  private final class StartIngestJobTask implements Callable<Void> {
859 
860  private final long threadId;
861  private final IngestJob job;
862  private ProgressHandle progress;
863 
864  StartIngestJobTask(long threadId, IngestJob job) {
865  this.threadId = threadId;
866  this.job = job;
867  }
868 
869  @Override
870  public Void call() {
871  try {
872  if (Thread.currentThread().isInterrupted()) {
873  synchronized (ingestJobsById) {
874  ingestJobsById.remove(job.getId());
875  }
876  return null;
877  }
878 
880  final String displayName = NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.displayName");
881  this.progress = ProgressHandle.createHandle(displayName, new Cancellable() {
882  @Override
883  public boolean cancel() {
884  if (progress != null) {
885  progress.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.cancelling", displayName));
886  }
887  Future<?> handle = startIngestJobFutures.remove(threadId);
888  handle.cancel(true);
889  return true;
890  }
891  });
892  progress.start();
893  }
894 
895  startIngestJob(job);
896  return null;
897 
898  } finally {
899  if (null != progress) {
900  progress.finish();
901  }
902  startIngestJobFutures.remove(threadId);
903  }
904  }
905 
906  }
907 
911  private final class ExecuteIngestJobTasksTask implements Runnable {
912 
913  private final long threadId;
914  private final BlockingIngestTaskQueue tasks;
915 
916  ExecuteIngestJobTasksTask(long threadId, BlockingIngestTaskQueue tasks) {
917  this.threadId = threadId;
918  this.tasks = tasks;
919  }
920 
921  @Override
922  public void run() {
923  while (true) {
924  try {
925  IngestTask task = tasks.getNextTask(); // Blocks.
926  task.execute(threadId);
927  } catch (InterruptedException ex) {
928  break;
929  }
930  if (Thread.currentThread().isInterrupted()) {
931  break;
932  }
933  }
934  }
935  }
936 
940  private static final class PublishEventTask implements Runnable {
941 
942  private final AutopsyEvent event;
944 
953  this.event = event;
954  this.publisher = publisher;
955  }
956 
957  @Override
958  public void run() {
959  publisher.publish(event);
960  }
961 
962  }
963 
968  @Immutable
969  public static final class IngestThreadActivitySnapshot implements Serializable {
970 
971  private static final long serialVersionUID = 1L;
972 
973  private final long threadId;
974  private final Date startTime;
975  private final String activity;
976  private final String dataSourceName;
977  private final String fileName;
978  private final long jobId;
979 
987  IngestThreadActivitySnapshot(long threadId) {
988  this.threadId = threadId;
989  startTime = new Date();
990  this.activity = NbBundle.getMessage(this.getClass(), "IngestManager.IngestThreadActivitySnapshot.idleThread");
991  this.dataSourceName = "";
992  this.fileName = "";
993  this.jobId = 0;
994  }
995 
1006  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource) {
1007  this.threadId = threadId;
1008  this.jobId = jobId;
1009  startTime = new Date();
1010  this.activity = activity;
1011  this.dataSourceName = dataSource.getName();
1012  this.fileName = "";
1013  }
1014 
1027  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource, AbstractFile file) {
1028  this.threadId = threadId;
1029  this.jobId = jobId;
1030  startTime = new Date();
1031  this.activity = activity;
1032  this.dataSourceName = dataSource.getName();
1033  this.fileName = file.getName();
1034  }
1035 
1041  long getIngestJobId() {
1042  return jobId;
1043  }
1044 
1050  long getThreadId() {
1051  return threadId;
1052  }
1053 
1059  Date getStartTime() {
1060  return startTime;
1061  }
1062 
1068  String getActivity() {
1069  return activity;
1070  }
1071 
1079  String getDataSourceName() {
1080  return dataSourceName;
1081  }
1082 
1088  String getFileName() {
1089  return fileName;
1090  }
1091 
1092  }
1093 
1097  public enum IngestJobEvent {
1098 
1135  }
1136 
1140  public enum IngestModuleEvent {
1141 
1163  }
1164 
1168  public final static class IngestManagerException extends Exception {
1169 
1170  private static final long serialVersionUID = 1L;
1171 
1177  private IngestManagerException(String message) {
1178  super(message);
1179  }
1180 
1187  private IngestManagerException(String message, Throwable cause) {
1188  super(message, cause);
1189  }
1190  }
1191 
1200  @Deprecated
1201  public static void addPropertyChangeListener(final PropertyChangeListener listener) {
1204  }
1205 
1214  @Deprecated
1215  public static void removePropertyChangeListener(final PropertyChangeListener listener) {
1218  }
1219 
1230  @Deprecated
1231  public synchronized IngestJob startIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
1232  return beginIngestJob(dataSources, settings).getJob();
1233  }
1234 
1241  @Deprecated
1242  public void cancelAllIngestJobs() {
1244  }
1245 
1246 }
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
void addIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final Map< Long, Future< Void > > startIngestJobFutures
void removeIngestModuleEventListener(final PropertyChangeListener listener)
List< IngestThreadActivitySnapshot > getIngestThreadActivitySnapshots()
void queueIngestJob(Content dataSource, List< AbstractFile > files, IngestJobSettings settings)
static synchronized IngestManager getInstance()
IngestJobStartResult startIngestJob(IngestJob job)
void removeIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final ExecutorService dataSourceLevelIngestJobTasksExecutor
static void addPropertyChangeListener(final PropertyChangeListener listener)
IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static void removePropertyChangeListener(final PropertyChangeListener listener)
static final Set< String > INGEST_MODULE_EVENT_NAMES
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
void removeIngestJobEventListener(final PropertyChangeListener listener)
void addIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
static final Set< String > INGEST_JOB_EVENT_NAMES
final AutopsyEventPublisher moduleEventPublisher
List< DataSourceIngestJob.Snapshot > getIngestJobSnapshots()
synchronized void openRemoteEventChannel(String channelName)
void addIngestJobEventListener(final PropertyChangeListener listener)
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void addIngestModuleEventListener(final PropertyChangeListener listener)
synchronized static Logger getLogger(String name)
Definition: Logger.java:124
void incrementModuleRunTime(String moduleDisplayName, Long duration)
static void addEventTypeSubscriber(Set< Events > eventTypes, PropertyChangeListener subscriber)
Definition: Case.java:486
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
final ExecutorService fileLevelIngestJobTasksExecutor
final Map< Long, IngestJob > ingestJobsById
void removeIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
final AutopsyEventPublisher jobEventPublisher

Copyright © 2012-2019 Basis Technology. Generated on: Tue Jan 7 2020
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.