Autopsy  4.21.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestPipeline.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2021 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import static java.lang.Thread.sleep;
22 import java.time.DayOfWeek;
23 import java.time.LocalDateTime;
24 import java.time.temporal.ChronoUnit;
25 import java.util.ArrayList;
26 import java.util.Date;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.TimeUnit;
30 import java.util.logging.Level;
31 import org.openide.util.NbBundle;
34 
43 abstract class IngestPipeline<T extends IngestTask> {
44 
45  /*
46  * NOTE ON MULTI-THREADING POLICY: This class is primarily designed for use
47  * by one thread at a time. There are a few status fields that are volatile
48  * to ensure visibility to threads making ingest progress snapshots, but
49  * methods such as startUp(), performTask() and shutDown() are not
50  * synchronized.
51  */
52  private static final Logger logger = Logger.getLogger(IngestPipeline.class.getName());
53  private final IngestJobExecutor ingestJobExecutor;
54  private final List<IngestModuleTemplate> moduleTemplates;
55  private final List<PipelineModule<T>> modules;
56  private volatile Date startTime;
57  private volatile boolean running;
58  private volatile PipelineModule<T> currentModule;
59 
70  IngestPipeline(IngestJobExecutor ingestJobExecutor, List<IngestModuleTemplate> moduleTemplates) {
71  this.ingestJobExecutor = ingestJobExecutor;
72  this.moduleTemplates = moduleTemplates;
73  modules = new ArrayList<>();
74  }
75 
82  boolean isEmpty() {
83  return modules.isEmpty();
84  }
85 
93  boolean isRunning() {
94  return running;
95  }
96 
103  List<IngestModuleError> startUp() {
104  List<IngestModuleError> errors = new ArrayList<>();
105  if (!running) {
106  /*
107  * The creation of ingest modules from the ingest module templates
108  * has been deliberately deferred to the startUp() method so that
109  * any and all errors in module construction or start up can be
110  * reported to the client code.
111  */
112  createIngestModules();
113  errors.addAll(startUpIngestModules());
114  } else {
115  errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestPipelineException("Pipeline already started"))); //NON-NLS
116  }
117  return errors;
118  }
119 
124  private void createIngestModules() {
125  if (modules.isEmpty()) {
126  for (IngestModuleTemplate template : moduleTemplates) {
127  Optional<PipelineModule<T>> module = acceptModuleTemplate(template);
128  if (module.isPresent()) {
129  modules.add(module.get());
130  }
131  }
132  }
133  }
134 
146  abstract Optional<PipelineModule<T>> acceptModuleTemplate(IngestModuleTemplate template);
147 
153  private List<IngestModuleError> startUpIngestModules() {
154  List<IngestModuleError> errors = new ArrayList<>();
155  startTime = new Date();
156  running = true;
157  for (PipelineModule<T> module : modules) {
158  try {
159  module.startUp(new IngestJobContext(ingestJobExecutor));
160  } catch (Throwable ex) {
161  /*
162  * A catch-all exception firewall. Start up errors for all of
163  * the ingest modules, whether checked exceptions or runtime
164  * exceptions, are reported to allow correction of all of the
165  * error conditions in one go.
166  */
167  errors.add(new IngestModuleError(module.getDisplayName(), ex));
168  }
169  }
170  return errors;
171  }
172 
179  Date getStartTime() {
180  Date reportedStartTime = null;
181  if (startTime != null) {
182  reportedStartTime = new Date(startTime.getTime());
183  }
184  return reportedStartTime;
185  }
186 
195  List<IngestModuleError> performTask(T task) {
196  List<IngestModuleError> errors = new ArrayList<>();
197  if (running) {
198  if (!ingestJobExecutor.isCancelled()) {
199  pauseIfScheduled();
200  if (ingestJobExecutor.isCancelled()) {
201  return errors;
202  }
203  try {
204  prepareForTask(task);
205  } catch (IngestPipelineException ex) {
206  errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS
207  return errors;
208  }
209  for (PipelineModule<T> module : modules) {
210  pauseIfScheduled();
211  if (ingestJobExecutor.isCancelled()) {
212  break;
213  }
214  try {
215  currentModule = module;
216  currentModule.setProcessingStartTime();
217  module.process(ingestJobExecutor, task);
218  } catch (Throwable ex) { // Catch-all exception firewall
219  /*
220  * Note that an exception from a module does not stop
221  * processing of the task by the other modules in the
222  * pipeline.
223  */
224  errors.add(new IngestModuleError(module.getDisplayName(), ex));
225  }
226  if (ingestJobExecutor.isCancelled()) {
227  break;
228  }
229  }
230  }
231  try {
232  cleanUpAfterTask(task);
233  } catch (IngestPipelineException ex) {
234  errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS
235  }
236  } else {
237  errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestPipelineException("Pipeline not started or shut down"))); //NON-NLS
238  }
239  currentModule = null;
240  return errors;
241  }
242 
249  private void pauseIfScheduled() {
250  if (ScheduledIngestPauseSettings.getPauseEnabled() == true) {
251  /*
252  * Calculate the date/time for the scheduled pause start by
253  * "normalizing" the day of week to the current week and then
254  * adjusting the hour and minute to match the scheduled hour and
255  * minute.
256  */
257  LocalDateTime pauseStart = LocalDateTime.now();
258  DayOfWeek pauseDayOfWeek = ScheduledIngestPauseSettings.getPauseDayOfWeek();
259  while (pauseStart.getDayOfWeek() != pauseDayOfWeek) {
260  pauseStart = pauseStart.minusDays(1);
261  }
262  pauseStart = pauseStart.withHour(ScheduledIngestPauseSettings.getPauseStartTimeHour());
263  pauseStart = pauseStart.withMinute(ScheduledIngestPauseSettings.getPauseStartTimeMinute());
264  pauseStart = pauseStart.withSecond(0);
265 
266  /*
267  * Calculate the pause end date/time.
268  */
269  LocalDateTime pauseEnd = pauseStart.plusMinutes(ScheduledIngestPauseSettings.getPauseDurationMinutes());
270 
271  /*
272  * Check whether the current date/time is in the pause interval. If
273  * it is, register the ingest thread this code is running in so it
274  * can be interrupted if the job is canceled, and sleep until
275  * whatever time remains in the pause interval has expired.
276  */
277  LocalDateTime timeNow = LocalDateTime.now();
278  if ((timeNow.equals(pauseStart) || timeNow.isAfter(pauseStart)) && timeNow.isBefore(pauseEnd)) {
279  ingestJobExecutor.registerPausedIngestThread(Thread.currentThread());
280  try {
281  long timeRemainingMillis = ChronoUnit.MILLIS.between(timeNow, pauseEnd);
282  logger.log(Level.INFO, String.format("%s pausing at %s for ~%d minutes", Thread.currentThread().getName(), LocalDateTime.now(), TimeUnit.MILLISECONDS.toMinutes(timeRemainingMillis)));
283  sleep(timeRemainingMillis);
284  logger.log(Level.INFO, String.format("%s resuming at %s", Thread.currentThread().getName(), LocalDateTime.now()));
285  } catch (InterruptedException notLogged) {
286  logger.log(Level.INFO, String.format("%s resuming at %s due to sleep interrupt (ingest job canceled)", Thread.currentThread().getName(), LocalDateTime.now()));
287  } finally {
288  ingestJobExecutor.unregisterPausedIngestThread(Thread.currentThread());
289  }
290  }
291  }
292  }
293 
303  abstract void prepareForTask(T task) throws IngestPipelineException;
304 
310  PipelineModule<T> getCurrentlyRunningModule() {
311  return currentModule;
312  }
313 
319  List<IngestModuleError> shutDown() {
320  List<IngestModuleError> errors = new ArrayList<>();
321  if (running == true) {
322  for (PipelineModule<T> module : modules) {
323  try {
324  module.shutDown();
325  } catch (Throwable ex) { // Catch-all exception firewall
326  errors.add(new IngestModuleError(module.getDisplayName(), ex));
327  String msg = ex.getMessage();
328  if (msg == null) {
329  /*
330  * Jython run-time errors don't seem to have a message,
331  * but have details in the string returned by
332  * toString().
333  */
334  msg = ex.toString();
335  }
336  MessageNotifyUtil.Notify.error(NbBundle.getMessage(this.getClass(), "FileIngestPipeline.moduleError.title.text", module.getDisplayName()), msg);
337  }
338  }
339  }
340  running = false;
341  return errors;
342 
343  }
344 
354  abstract void cleanUpAfterTask(T task) throws IngestPipelineException;
355 
360  static abstract class PipelineModule<T extends IngestTask> implements IngestModule {
361 
362  private final IngestModule module;
363  private final String displayName;
364  private volatile Date processingStartTime;
365 
374  PipelineModule(IngestModule module, String displayName) {
375  this.module = module;
376  this.displayName = displayName;
377  processingStartTime = new Date();
378  }
379 
385  String getClassName() {
386  return module.getClass().getCanonicalName();
387  }
388 
394  String getDisplayName() {
395  return displayName;
396  }
397 
402  void setProcessingStartTime() {
403  processingStartTime = new Date();
404  }
405 
412  Date getProcessingStartTime() {
413  return new Date(processingStartTime.getTime());
414  }
415 
416  @Override
417  public void startUp(IngestJobContext context) throws IngestModuleException {
418  module.startUp(context);
419  }
420 
432  abstract void process(IngestJobExecutor ingestJobExecutor, T task) throws IngestModuleException;
433 
434  @Override
435  public void shutDown() {
436  module.shutDown();
437  }
438 
439  }
440 
444  static class IngestPipelineException extends Exception {
445 
446  private static final long serialVersionUID = 1L;
447 
453  IngestPipelineException(String message) {
454  super(message);
455  }
456 
463  IngestPipelineException(String message, Throwable cause) {
464  super(message, cause);
465  }
466 
467  }
468 
469 }

Copyright © 2012-2022 Basis Technology. Generated on: Tue Feb 6 2024
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.