Autopsy  4.17.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestManager.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2012-2019 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.awt.EventQueue;
23 import java.beans.PropertyChangeEvent;
24 import java.beans.PropertyChangeListener;
25 import java.io.Serializable;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.Date;
30 import java.util.EnumSet;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.atomic.AtomicLong;
42 import java.util.logging.Level;
43 import java.util.stream.Collectors;
44 import java.util.stream.Stream;
45 import javax.annotation.concurrent.GuardedBy;
46 import javax.annotation.concurrent.Immutable;
47 import javax.annotation.concurrent.ThreadSafe;
48 import javax.swing.JOptionPane;
49 import org.netbeans.api.progress.ProgressHandle;
50 import org.openide.util.Cancellable;
51 import org.openide.util.NbBundle;
52 import org.openide.windows.WindowManager;
68 import org.sleuthkit.datamodel.AbstractFile;
69 import org.sleuthkit.datamodel.Content;
70 import org.sleuthkit.datamodel.DataSource;
71 import org.sleuthkit.datamodel.TskCoreException;
72 
112 @ThreadSafe
114 
115  private final static Logger logger = Logger.getLogger(IngestManager.class.getName());
116  private final static String INGEST_JOB_EVENT_CHANNEL_NAME = "%s-Ingest-Job-Events"; //NON-NLS
117  private final static Set<String> INGEST_JOB_EVENT_NAMES = Stream.of(IngestJobEvent.values()).map(IngestJobEvent::toString).collect(Collectors.toSet());
118  private final static String INGEST_MODULE_EVENT_CHANNEL_NAME = "%s-Ingest-Module-Events"; //NON-NLS
119  private final static Set<String> INGEST_MODULE_EVENT_NAMES = Stream.of(IngestModuleEvent.values()).map(IngestModuleEvent::toString).collect(Collectors.toSet());
120  private final static int MAX_ERROR_MESSAGE_POSTS = 200;
121  @GuardedBy("IngestManager.class")
122  private static IngestManager instance;
123  private final int numberOfFileIngestThreads;
124  private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L);
125  private final ExecutorService startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS;
126  private final Map<Long, Future<Void>> startIngestJobFutures = new ConcurrentHashMap<>();
127  private final Map<Long, IngestJob> ingestJobsById = new HashMap<>();
128  private final ExecutorService dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS;
129  private final ExecutorService fileLevelIngestJobTasksExecutor;
130  private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS;
134  private final AutopsyEventPublisher moduleEventPublisher = new AutopsyEventPublisher();
135  private final Object ingestMessageBoxLock = new Object();
136  private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L);
137  private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots = new ConcurrentHashMap<>();
138  private final ConcurrentHashMap<String, Long> ingestModuleRunTimes = new ConcurrentHashMap<>();
139  private volatile IngestMessageTopComponent ingestMessageBox;
140  private volatile boolean caseIsOpen;
141 
148  public synchronized static IngestManager getInstance() {
149  if (null == instance) {
150  instance = new IngestManager();
151  instance.subscribeToServiceMonitorEvents();
152  instance.subscribeToCaseEvents();
153  }
154  return instance;
155  }
156 
161  private IngestManager() {
162  /*
163  * Submit a single Runnable ingest manager task for processing data
164  * source level ingest job tasks to the data source level ingest job
165  * tasks executor.
166  */
167  long threadId = nextIngestManagerTaskId.incrementAndGet();
168  dataSourceLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
169  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
170 
171  /*
172  * Submit a configurable number of Runnable ingest manager tasks for
173  * processing file level ingest job tasks to the file level ingest job
174  * tasks executor.
175  */
177  fileLevelIngestJobTasksExecutor = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS
178  for (int i = 0; i < numberOfFileIngestThreads; ++i) {
179  threadId = nextIngestManagerTaskId.incrementAndGet();
180  fileLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
181  ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
182  }
183  }
184 
190  PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> {
191  if (evt.getNewValue().equals(ServicesMonitor.ServiceStatus.DOWN.toString())) {
192  /*
193  * The application services considered to be key services are
194  * only necessary for multi-user cases.
195  */
196  try {
198  return;
199  }
200  } catch (NoCurrentCaseException noCaseOpenException) {
201  return;
202  }
203 
204  String serviceDisplayName = ServicesMonitor.Service.valueOf(evt.getPropertyName()).getDisplayName();
205  logger.log(Level.SEVERE, "Service {0} is down, cancelling all running ingest jobs", serviceDisplayName); //NON-NLS
207  EventQueue.invokeLater(new Runnable() {
208  @Override
209  public void run() {
210  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
211  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
212  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
213  JOptionPane.ERROR_MESSAGE);
214  }
215  });
216  }
218  }
219  };
220 
221  /*
222  * The key services for multi-user cases are currently the case database
223  * server and the Solr server. The Solr server is a key service not
224  * because search is essential, but because the coordination service
225  * (ZooKeeper) is running embedded within the Solr server.
226  */
227  Set<String> servicesList = new HashSet<>();
228  servicesList.add(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString());
229  servicesList.add(ServicesMonitor.Service.REMOTE_KEYWORD_SEARCH.toString());
230  this.servicesMonitor.addSubscriber(servicesList, propChangeListener);
231  }
232 
237  private void subscribeToCaseEvents() {
238  Case.addEventTypeSubscriber(EnumSet.of(Case.Events.CURRENT_CASE), (PropertyChangeEvent event) -> {
239  if (event.getNewValue() != null) {
240  handleCaseOpened();
241  } else {
242  handleCaseClosed();
243  }
244  });
245  }
246 
247  /*
248  * Handles a current case opened event by clearing the ingest messages inbox
249  * and opening a remote event channel for the current case.
250  *
251  * Note that current case change events are published in a strictly
252  * serialized manner, i.e., one event at a time, synchronously.
253  */
254  void handleCaseOpened() {
255  caseIsOpen = true;
257  try {
258  Case openedCase = Case.getCurrentCaseThrows();
259  String channelPrefix = openedCase.getName();
260  if (Case.CaseType.MULTI_USER_CASE == openedCase.getCaseType()) {
261  jobEventPublisher.openRemoteEventChannel(String.format(INGEST_JOB_EVENT_CHANNEL_NAME, channelPrefix));
262  moduleEventPublisher.openRemoteEventChannel(String.format(INGEST_MODULE_EVENT_CHANNEL_NAME, channelPrefix));
263  }
264  } catch (NoCurrentCaseException | AutopsyEventException ex) {
265  logger.log(Level.SEVERE, "Failed to open remote events channel", ex); //NON-NLS
266  MessageNotifyUtil.Notify.error(NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.Title"),
267  NbBundle.getMessage(IngestManager.class, "IngestManager.OpenEventChannel.Fail.ErrMsg"));
268  }
269  }
270 
271  /*
272  * Handles a current case closed event by cancelling all ingest jobs for the
273  * case, closing the remote event channel for the case, and clearing the
274  * ingest messages inbox.
275  *
276  * Note that current case change events are published in a strictly
277  * serialized manner, i.e., one event at a time, synchronously.
278  */
279  void handleCaseClosed() {
280  /*
281  * TODO (JIRA-2227): IngestManager should wait for cancelled ingest jobs
282  * to complete when a case is closed.
283  */
284  this.cancelAllIngestJobs(IngestJob.CancellationReason.CASE_CLOSED);
287  caseIsOpen = false;
289  }
290 
301  public IngestStream openIngestStream(DataSource dataSource, IngestJobSettings settings) throws TskCoreException {
302  IngestJob job = new IngestJob(dataSource, IngestJob.Mode.STREAMING, settings);
303  IngestJobInputStream stream = new IngestJobInputStream(job);
304  if (stream.getIngestJobStartResult().getJob() != null) {
305  return stream;
306  } else if (stream.getIngestJobStartResult().getModuleErrors().isEmpty()) {
307  for (IngestModuleError error : stream.getIngestJobStartResult().getModuleErrors()) {
308  logger.log(Level.SEVERE, String.format("%s ingest module startup error for %s", error.getModuleDisplayName(), dataSource.getName()), error.getThrowable());
309  }
310  throw new TskCoreException("Error starting ingest modules");
311  } else {
312  throw new TskCoreException("Error starting ingest modules", stream.getIngestJobStartResult().getStartupException());
313  }
314  }
315 
316 
325  }
326 
333  public void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
334  if (caseIsOpen) {
335  IngestJob job = new IngestJob(dataSources, settings);
336  if (job.hasIngestPipeline()) {
337  long taskId = nextIngestManagerTaskId.incrementAndGet();
338  Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
339  startIngestJobFutures.put(taskId, task);
340  }
341  }
342  }
343 
352  public void queueIngestJob(Content dataSource, List<AbstractFile> files, IngestJobSettings settings) {
353  if (caseIsOpen) {
354  IngestJob job = new IngestJob(dataSource, files, settings);
355  if (job.hasIngestPipeline()) {
356  long taskId = nextIngestManagerTaskId.incrementAndGet();
357  Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
358  startIngestJobFutures.put(taskId, task);
359  }
360  }
361  }
362 
372  public IngestJobStartResult beginIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
373  if (caseIsOpen) {
374  IngestJob job = new IngestJob(dataSources, settings);
375  if (job.hasIngestPipeline()) {
376  return startIngestJob(job);
377  }
378  return new IngestJobStartResult(null, new IngestManagerException("No ingest pipeline created, likely due to no ingest modules being enabled"), null); //NON-NLS
379  }
380  return new IngestJobStartResult(null, new IngestManagerException("No case open"), null); //NON-NLS
381  }
382 
391  @NbBundle.Messages({
392  "IngestManager.startupErr.dlgTitle=Ingest Module Startup Failure",
393  "IngestManager.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest cancelled.",
394  "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
395  "IngestManager.startupErr.dlgErrorList=Errors:"
396  })
397  IngestJobStartResult startIngestJob(IngestJob job) {
398  List<IngestModuleError> errors = null;
399  Case openCase;
400  try {
401  openCase = Case.getCurrentCaseThrows();
402  } catch (NoCurrentCaseException ex) {
403  return new IngestJobStartResult(null, new IngestManagerException("Exception while getting open case.", ex), Collections.<IngestModuleError>emptyList()); //NON-NLS
404  }
405  if (openCase.getCaseType() == Case.CaseType.MULTI_USER_CASE) {
406  try {
407  if (!servicesMonitor.getServiceStatus(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString()).equals(ServicesMonitor.ServiceStatus.UP.toString())) {
408  if (RuntimeProperties.runningWithGUI()) {
409  EventQueue.invokeLater(new Runnable() {
410  @Override
411  public void run() {
412  String serviceDisplayName = ServicesMonitor.Service.REMOTE_CASE_DATABASE.getDisplayName();
413  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(),
414  NbBundle.getMessage(this.getClass(), "IngestManager.cancellingIngest.msgDlg.text"),
415  NbBundle.getMessage(this.getClass(), "IngestManager.serviceIsDown.msgDlg.text", serviceDisplayName),
416  JOptionPane.ERROR_MESSAGE);
417  }
418  });
419  }
420  return new IngestJobStartResult(null, new IngestManagerException("Ingest aborted. Remote database is down"), Collections.<IngestModuleError>emptyList()); //NON-NLS
421  }
422  } catch (ServicesMonitor.ServicesMonitorException ex) {
423  return new IngestJobStartResult(null, new IngestManagerException("Database server is down", ex), Collections.<IngestModuleError>emptyList()); //NON-NLS
424  }
425  }
426 
427  if (!ingestMonitor.isRunning()) {
428  ingestMonitor.start();
429  }
430 
431  synchronized (ingestJobsById) {
432  ingestJobsById.put(job.getId(), job);
433  }
434  IngestManager.logger.log(Level.INFO, "Starting ingest job {0}", job.getId()); //NON-NLS
435  errors = job.start();
436  if (errors.isEmpty()) {
437  this.fireIngestJobStarted(job.getId());
438  } else {
439  synchronized (ingestJobsById) {
440  this.ingestJobsById.remove(job.getId());
441  }
442  for (IngestModuleError error : errors) {
443  logger.log(Level.SEVERE, String.format("Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.getId()), error.getThrowable()); //NON-NLS
444  }
445  IngestManager.logger.log(Level.SEVERE, "Ingest job {0} could not be started", job.getId()); //NON-NLS
446  if (RuntimeProperties.runningWithGUI()) {
447  final StringBuilder message = new StringBuilder(1024);
448  message.append(Bundle.IngestManager_startupErr_dlgMsg()).append("\n"); //NON-NLS
449  message.append(Bundle.IngestManager_startupErr_dlgSolution()).append("\n\n"); //NON-NLS
450  message.append(Bundle.IngestManager_startupErr_dlgErrorList()).append("\n"); //NON-NLS
451  for (IngestModuleError error : errors) {
452  String moduleName = error.getModuleDisplayName();
453  String errorMessage = error.getThrowable().getLocalizedMessage();
454  message.append(moduleName).append(": ").append(errorMessage).append("\n"); //NON-NLS
455  }
456  message.append("\n\n");
457  EventQueue.invokeLater(() -> {
458  JOptionPane.showMessageDialog(WindowManager.getDefault().getMainWindow(), message, Bundle.IngestManager_startupErr_dlgTitle(), JOptionPane.ERROR_MESSAGE);
459  });
460  }
461  return new IngestJobStartResult(null, new IngestManagerException("Errors occurred while starting ingest"), errors); //NON-NLS
462  }
463 
464  return new IngestJobStartResult(job, null, errors);
465  }
466 
472  void finishIngestJob(IngestJob job) {
473  long jobId = job.getId();
474  synchronized (ingestJobsById) {
475  ingestJobsById.remove(jobId);
476  }
477  if (!job.isCancelled()) {
478  IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); //NON-NLS
479  fireIngestJobCompleted(jobId);
480  } else {
481  IngestManager.logger.log(Level.INFO, "Ingest job {0} cancelled", jobId); //NON-NLS
482  fireIngestJobCancelled(jobId);
483  }
484  }
485 
492  public boolean isIngestRunning() {
493  synchronized (ingestJobsById) {
494  return !ingestJobsById.isEmpty();
495  }
496  }
497 
504  startIngestJobFutures.values().forEach((handle) -> {
505  handle.cancel(true);
506  });
507  synchronized (ingestJobsById) {
508  this.ingestJobsById.values().forEach((job) -> {
509  job.cancel(reason);
510  });
511  }
512  }
513 
519  public void addIngestJobEventListener(final PropertyChangeListener listener) {
520  jobEventPublisher.addSubscriber(INGEST_JOB_EVENT_NAMES, listener);
521  }
522 
529  public void addIngestJobEventListener(Set<IngestJobEvent> eventTypes, final PropertyChangeListener listener) {
530  eventTypes.forEach((IngestJobEvent event) -> {
531  jobEventPublisher.addSubscriber(event.toString(), listener);
532  });
533  }
534 
540  public void removeIngestJobEventListener(final PropertyChangeListener listener) {
541  jobEventPublisher.removeSubscriber(INGEST_JOB_EVENT_NAMES, listener);
542  }
543 
550  public void removeIngestJobEventListener(Set<IngestJobEvent> eventTypes, final PropertyChangeListener listener) {
551  eventTypes.forEach((IngestJobEvent event) -> {
552  jobEventPublisher.removeSubscriber(event.toString(), listener);
553  });
554  }
555 
561  public void addIngestModuleEventListener(final PropertyChangeListener listener) {
562  moduleEventPublisher.addSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
563  }
564 
571  public void addIngestModuleEventListener(Set<IngestModuleEvent> eventTypes, final PropertyChangeListener listener) {
572  eventTypes.forEach((IngestModuleEvent event) -> {
573  moduleEventPublisher.addSubscriber(event.toString(), listener);
574  });
575  }
576 
582  public void removeIngestModuleEventListener(final PropertyChangeListener listener) {
583  moduleEventPublisher.removeSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
584  }
585 
592  public void removeIngestModuleEventListener(Set<IngestModuleEvent> eventTypes, final PropertyChangeListener listener) {
593  moduleEventPublisher.removeSubscriber(INGEST_MODULE_EVENT_NAMES, listener);
594  }
595 
601  void fireIngestJobStarted(long ingestJobId) {
602  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.STARTED.toString(), ingestJobId, null);
603  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
604  }
605 
611  void fireIngestJobCompleted(long ingestJobId) {
612  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.COMPLETED.toString(), ingestJobId, null);
613  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
614  }
615 
621  void fireIngestJobCancelled(long ingestJobId) {
622  AutopsyEvent event = new AutopsyEvent(IngestJobEvent.CANCELLED.toString(), ingestJobId, null);
623  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
624  }
625 
634  void fireDataSourceAnalysisStarted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
635  AutopsyEvent event = new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
636  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
637  }
638 
647  void fireDataSourceAnalysisCompleted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
648  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_COMPLETED);
649  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
650  }
651 
660  void fireDataSourceAnalysisCancelled(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
661  AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.ANALYSIS_CANCELLED);
662  eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
663  }
664 
671  void fireFileIngestDone(AbstractFile file) {
672  AutopsyEvent event = new FileAnalyzedEvent(file);
673  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
674  }
675 
683  void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
684  AutopsyEvent event = new BlackboardPostEvent(moduleDataEvent);
685  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
686  }
687 
695  void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
696  AutopsyEvent event = new ContentChangedEvent(moduleContentEvent);
697  eventPublishingExecutor.submit(new PublishEventTask(event, moduleEventPublisher));
698  }
699 
705  void initIngestMessageInbox() {
706  synchronized (this.ingestMessageBoxLock) {
707  ingestMessageBox = IngestMessageTopComponent.findInstance();
708  }
709  }
710 
716  void postIngestMessage(IngestMessage message) {
717  synchronized (this.ingestMessageBoxLock) {
718  if (ingestMessageBox != null && RuntimeProperties.runningWithGUI()) {
719  if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
720  ingestMessageBox.displayMessage(message);
721  } else {
722  long errorPosts = ingestErrorMessagePosts.incrementAndGet();
723  if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
724  ingestMessageBox.displayMessage(message);
725  } else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
726  IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
727  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
728  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
729  NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.msg", MAX_ERROR_MESSAGE_POSTS));
730  ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
731  }
732  }
733  }
734  }
735  }
736 
737  /*
738  * Clears the ingest messages inbox.
739  */
740  private void clearIngestMessageBox() {
741  synchronized (this.ingestMessageBoxLock) {
742  if (null != ingestMessageBox) {
743  ingestMessageBox.clearMessages();
744  }
746  }
747  }
748 
760  void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
761  ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), ingestModuleDisplayName, task.getDataSource()));
762  }
763 
775  void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
776  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
777  IngestThreadActivitySnapshot newSnap;
778  try {
779  newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
780  } catch (TskCoreException ex) {
781  // In practice, this task would never have been enqueued or processed since the file
782  // lookup would have failed.
783  newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), ingestModuleDisplayName, task.getDataSource());
784  }
785  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
786  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
787  }
788 
796  void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
797  ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId()));
798  }
799 
807  void setIngestTaskProgressCompleted(FileIngestTask task) {
808  IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
809  IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId());
810  ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
811  incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
812  }
813 
820  private void incrementModuleRunTime(String moduleDisplayName, Long duration) {
821  if (moduleDisplayName.equals("IDLE")) { //NON-NLS
822  return;
823  }
824 
825  synchronized (ingestModuleRunTimes) {
826  Long prevTimeL = ingestModuleRunTimes.get(moduleDisplayName);
827  long prevTime = 0;
828  if (prevTimeL != null) {
829  prevTime = prevTimeL;
830  }
831  prevTime += duration;
832  ingestModuleRunTimes.put(moduleDisplayName, prevTime);
833  }
834  }
835 
841  @Override
842  public Map<String, Long> getModuleRunTimes() {
843  synchronized (ingestModuleRunTimes) {
844  Map<String, Long> times = new HashMap<>(ingestModuleRunTimes);
845  return times;
846  }
847  }
848 
855  @Override
856  public List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
857  return new ArrayList<>(ingestThreadActivitySnapshots.values());
858  }
859 
865  @Override
866  public List<Snapshot> getIngestJobSnapshots() {
867  List<Snapshot> snapShots = new ArrayList<>();
868  synchronized (ingestJobsById) {
869  ingestJobsById.values().forEach((job) -> {
870  snapShots.addAll(job.getDataSourceIngestJobSnapshots());
871  });
872  }
873  return snapShots;
874  }
875 
882  long getFreeDiskSpace() {
883  if (ingestMonitor != null) {
884  return ingestMonitor.getFreeSpace();
885  } else {
886  return -1;
887  }
888  }
889 
893  private final class StartIngestJobTask implements Callable<Void> {
894 
895  private final long threadId;
896  private final IngestJob job;
897  private ProgressHandle progress;
898 
899  StartIngestJobTask(long threadId, IngestJob job) {
900  this.threadId = threadId;
901  this.job = job;
902  }
903 
904  @Override
905  public Void call() {
906  try {
907  if (Thread.currentThread().isInterrupted()) {
908  synchronized (ingestJobsById) {
909  ingestJobsById.remove(job.getId());
910  }
911  return null;
912  }
913 
915  final String displayName = NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.displayName");
916  this.progress = ProgressHandle.createHandle(displayName, new Cancellable() {
917  @Override
918  public boolean cancel() {
919  if (progress != null) {
920  progress.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.cancelling", displayName));
921  }
922  Future<?> handle = startIngestJobFutures.remove(threadId);
923  handle.cancel(true);
924  return true;
925  }
926  });
927  progress.start();
928  }
929 
930  startIngestJob(job);
931  return null;
932 
933  } finally {
934  if (null != progress) {
935  progress.finish();
936  }
937  startIngestJobFutures.remove(threadId);
938  }
939  }
940 
941  }
942 
946  private final class ExecuteIngestJobTasksTask implements Runnable {
947 
948  private final long threadId;
949  private final BlockingIngestTaskQueue tasks;
950 
951  ExecuteIngestJobTasksTask(long threadId, BlockingIngestTaskQueue tasks) {
952  this.threadId = threadId;
953  this.tasks = tasks;
954  }
955 
956  @Override
957  public void run() {
958  while (true) {
959  try {
960  IngestTask task = tasks.getNextTask(); // Blocks.
961  task.execute(threadId);
962  } catch (InterruptedException ex) {
963  break;
964  }
965  if (Thread.currentThread().isInterrupted()) {
966  break;
967  }
968  }
969  }
970  }
971 
975  private static final class PublishEventTask implements Runnable {
976 
977  private final AutopsyEvent event;
979 
988  this.event = event;
989  this.publisher = publisher;
990  }
991 
992  @Override
993  public void run() {
994  publisher.publish(event);
995  }
996 
997  }
998 
1003  @Immutable
1004  public static final class IngestThreadActivitySnapshot implements Serializable {
1005 
1006  private static final long serialVersionUID = 1L;
1007 
1008  private final long threadId;
1009  private final Date startTime;
1010  private final String activity;
1011  private final String dataSourceName;
1012  private final String fileName;
1013  private final long jobId;
1014 
1022  IngestThreadActivitySnapshot(long threadId) {
1023  this.threadId = threadId;
1024  startTime = new Date();
1025  this.activity = NbBundle.getMessage(this.getClass(), "IngestManager.IngestThreadActivitySnapshot.idleThread");
1026  this.dataSourceName = "";
1027  this.fileName = "";
1028  this.jobId = 0;
1029  }
1030 
1041  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource) {
1042  this.threadId = threadId;
1043  this.jobId = jobId;
1044  startTime = new Date();
1045  this.activity = activity;
1046  this.dataSourceName = dataSource.getName();
1047  this.fileName = "";
1048  }
1049 
1062  IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource, AbstractFile file) {
1063  this.threadId = threadId;
1064  this.jobId = jobId;
1065  startTime = new Date();
1066  this.activity = activity;
1067  this.dataSourceName = dataSource.getName();
1068  this.fileName = file.getName();
1069  }
1070 
1076  long getIngestJobId() {
1077  return jobId;
1078  }
1079 
1085  long getThreadId() {
1086  return threadId;
1087  }
1088 
1094  Date getStartTime() {
1095  return startTime;
1096  }
1097 
1103  String getActivity() {
1104  return activity;
1105  }
1106 
1114  String getDataSourceName() {
1115  return dataSourceName;
1116  }
1117 
1123  String getFileName() {
1124  return fileName;
1125  }
1126 
1127  }
1128 
1132  public enum IngestJobEvent {
1133 
1170  }
1171 
1175  public enum IngestModuleEvent {
1176 
1198  }
1199 
1203  public final static class IngestManagerException extends Exception {
1204 
1205  private static final long serialVersionUID = 1L;
1206 
1212  private IngestManagerException(String message) {
1213  super(message);
1214  }
1215 
1222  private IngestManagerException(String message, Throwable cause) {
1223  super(message, cause);
1224  }
1225  }
1226 
1235  @Deprecated
1236  public static void addPropertyChangeListener(final PropertyChangeListener listener) {
1239  }
1240 
1249  @Deprecated
1250  public static void removePropertyChangeListener(final PropertyChangeListener listener) {
1253  }
1254 
1265  @Deprecated
1266  public synchronized IngestJob startIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
1267  return beginIngestJob(dataSources, settings).getJob();
1268  }
1269 
1276  @Deprecated
1277  public void cancelAllIngestJobs() {
1279  }
1280 
1281 }
final ConcurrentHashMap< String, Long > ingestModuleRunTimes
void addIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final Map< Long, Future< Void > > startIngestJobFutures
void removeIngestModuleEventListener(final PropertyChangeListener listener)
IngestStream openIngestStream(DataSource dataSource, IngestJobSettings settings)
List< IngestThreadActivitySnapshot > getIngestThreadActivitySnapshots()
void queueIngestJob(Content dataSource, List< AbstractFile > files, IngestJobSettings settings)
static synchronized IngestManager getInstance()
void removeIngestModuleEventListener(Set< IngestModuleEvent > eventTypes, final PropertyChangeListener listener)
final ExecutorService dataSourceLevelIngestJobTasksExecutor
static void addPropertyChangeListener(final PropertyChangeListener listener)
IngestJobStartResult beginIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void addSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
static void removePropertyChangeListener(final PropertyChangeListener listener)
static final Set< String > INGEST_MODULE_EVENT_NAMES
volatile IngestMessageTopComponent ingestMessageBox
void addSubscriber(PropertyChangeListener subscriber)
void removeIngestJobEventListener(final PropertyChangeListener listener)
void addIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
static final Set< String > INGEST_JOB_EVENT_NAMES
final AutopsyEventPublisher moduleEventPublisher
synchronized void openRemoteEventChannel(String channelName)
void addIngestJobEventListener(final PropertyChangeListener listener)
void queueIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
void removeSubscriber(Set< String > eventNames, PropertyChangeListener subscriber)
void addIngestModuleEventListener(final PropertyChangeListener listener)
synchronized static Logger getLogger(String name)
Definition: Logger.java:124
void incrementModuleRunTime(String moduleDisplayName, Long duration)
static void addEventTypeSubscriber(Set< Events > eventTypes, PropertyChangeListener subscriber)
Definition: Case.java:491
synchronized IngestJob startIngestJob(Collection< Content > dataSources, IngestJobSettings settings)
final ConcurrentHashMap< Long, IngestThreadActivitySnapshot > ingestThreadActivitySnapshots
void cancelAllIngestJobs(IngestJob.CancellationReason reason)
final ExecutorService fileLevelIngestJobTasksExecutor
final Map< Long, IngestJob > ingestJobsById
void removeIngestJobEventListener(Set< IngestJobEvent > eventTypes, final PropertyChangeListener listener)
final AutopsyEventPublisher jobEventPublisher

Copyright © 2012-2021 Basis Technology. Generated on: Tue Jan 19 2021
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.