Autopsy  4.19.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestTaskPipeline.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2021 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import static java.lang.Thread.sleep;
22 import java.time.DayOfWeek;
23 import java.time.LocalDateTime;
24 import java.time.temporal.ChronoUnit;
25 import java.util.ArrayList;
26 import java.util.Date;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.TimeUnit;
30 import java.util.logging.Level;
31 import org.openide.util.NbBundle;
34 
47 abstract class IngestTaskPipeline<T extends IngestTask> {
48 
49  private static final Logger logger = Logger.getLogger(IngestTaskPipeline.class.getName());
50  private final IngestJobPipeline ingestJobPipeline;
51  private final List<IngestModuleTemplate> moduleTemplates;
52  private final List<PipelineModule<T>> modules;
53  private volatile Date startTime;
54  private volatile boolean running;
55  private volatile PipelineModule<T> currentModule;
56 
66  IngestTaskPipeline(IngestJobPipeline ingestPipeline, List<IngestModuleTemplate> moduleTemplates) {
67  this.ingestJobPipeline = ingestPipeline;
68  /*
69  * The creation of ingest modules from the ingest module templates has
70  * been deliberately deferred to the startUp() method so that any and
71  * all errors in module construction or start up can be reported to the
72  * client code.
73  */
74  this.moduleTemplates = moduleTemplates;
75  modules = new ArrayList<>();
76  }
77 
84  boolean isEmpty() {
85  return modules.isEmpty();
86  }
87 
94  boolean isRunning() {
95  return running;
96  }
97 
104  List<IngestModuleError> startUp() {
105  List<IngestModuleError> errors = new ArrayList<>();
106  if (!running) {
107  /*
108  * The creation of ingest modules from the ingest module templates
109  * has been deliberately deferred to the startUp() method so that
110  * any and all errors in module construction or start up can be
111  * reported to the client code.
112  */
113  createIngestModules(moduleTemplates);
114  errors.addAll(startUpIngestModules());
115  } else {
116  errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestTaskPipelineException("Pipeline already started"))); //NON-NLS
117  }
118  return errors;
119  }
120 
127  private void createIngestModules(List<IngestModuleTemplate> moduleTemplates) {
128  if (modules.isEmpty()) {
129  for (IngestModuleTemplate template : moduleTemplates) {
130  Optional<PipelineModule<T>> module = acceptModuleTemplate(template);
131  if (module.isPresent()) {
132  modules.add(module.get());
133  }
134  }
135  }
136  }
137 
149  abstract Optional<PipelineModule<T>> acceptModuleTemplate(IngestModuleTemplate template);
150 
156  private List<IngestModuleError> startUpIngestModules() {
157  List<IngestModuleError> errors = new ArrayList<>();
158  startTime = new Date();
159  running = true;
160  for (PipelineModule<T> module : modules) {
161  try {
162  module.startUp(new IngestJobContext(ingestJobPipeline));
163  } catch (Throwable ex) {
164  /*
165  * A catch-all exception firewall. Start up errors for all of
166  * the ingest modules, whether checked exceptions or runtime
167  * exceptions, are reported to allow correction of all of the
168  * error conditions in one go.
169  */
170  errors.add(new IngestModuleError(module.getDisplayName(), ex));
171  }
172  }
173  return errors;
174  }
175 
182  Date getStartTime() {
183  Date reportedStartTime = null;
184  if (startTime != null) {
185  reportedStartTime = new Date(startTime.getTime());
186  }
187  return reportedStartTime;
188  }
189 
198  List<IngestModuleError> executeTask(T task) {
199  List<IngestModuleError> errors = new ArrayList<>();
200  if (running) {
201  if (!ingestJobPipeline.isCancelled()) {
202  pauseIfScheduled();
203  if (ingestJobPipeline.isCancelled()) {
204  return errors;
205  }
206  try {
207  prepareForTask(task);
208  } catch (IngestTaskPipelineException ex) {
209  errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS
210  return errors;
211  }
212  for (PipelineModule<T> module : modules) {
213  pauseIfScheduled();
214  if (ingestJobPipeline.isCancelled()) {
215  break;
216  }
217  try {
218  currentModule = module;
219  currentModule.setProcessingStartTime();
220  module.executeTask(ingestJobPipeline, task);
221  } catch (Throwable ex) {
222  /*
223  * A catch-all exception firewall. Note that a runtime
224  * exception from a single module does not stop
225  * processing of the task by the other modules in the
226  * pipeline.
227  */
228  errors.add(new IngestModuleError(module.getDisplayName(), ex));
229  }
230  if (ingestJobPipeline.isCancelled()) {
231  break;
232  }
233  }
234  }
235  try {
236  cleanUpAfterTask(task);
237  } catch (IngestTaskPipelineException ex) {
238  errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS
239  }
240  } else {
241  errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestTaskPipelineException("Pipeline not started or shut down"))); //NON-NLS
242  }
243  currentModule = null;
244  return errors;
245  }
246 
251  private void pauseIfScheduled() {
252  if (ScheduledIngestPauseSettings.getPauseEnabled() == true) {
253  /*
254  * Calculate the date/time for the scheduled pause start by
255  * "normalizing" the day of week to the current week and then
256  * adjusting the hour and minute to match the scheduled hour and
257  * minute.
258  */
259  LocalDateTime pauseStart = LocalDateTime.now();
260  DayOfWeek pauseDayOfWeek = ScheduledIngestPauseSettings.getPauseDayOfWeek();
261  while (pauseStart.getDayOfWeek() != pauseDayOfWeek) {
262  pauseStart = pauseStart.minusDays(1);
263  }
264  pauseStart = pauseStart.withHour(ScheduledIngestPauseSettings.getPauseStartTimeHour());
265  pauseStart = pauseStart.withMinute(ScheduledIngestPauseSettings.getPauseStartTimeMinute());
266  pauseStart = pauseStart.withSecond(0);
267 
268  /*
269  * Calculate the pause end date/time.
270  */
271  LocalDateTime pauseEnd = pauseStart.plusMinutes(ScheduledIngestPauseSettings.getPauseDurationMinutes());
272 
273  /*
274  * Check whether the current date/time is in the pause interval. If
275  * it is, register the ingest thread this code is running in so it
276  * can be interrupted if the job is canceled, and sleep until
277  * whatever time remains in the pause interval has expired.
278  */
279  LocalDateTime timeNow = LocalDateTime.now();
280  if ((timeNow.equals(pauseStart) || timeNow.isAfter(pauseStart)) && timeNow.isBefore(pauseEnd)) {
281  ingestJobPipeline.registerPausedIngestThread(Thread.currentThread());
282  try {
283  long timeRemainingMillis = ChronoUnit.MILLIS.between(timeNow, pauseEnd);
284  logger.log(Level.INFO, String.format("%s pausing at %s for ~%d minutes", Thread.currentThread().getName(), LocalDateTime.now(), TimeUnit.MILLISECONDS.toMinutes(timeRemainingMillis)));
285  sleep(timeRemainingMillis);
286  logger.log(Level.INFO, String.format("%s resuming at %s", Thread.currentThread().getName(), LocalDateTime.now()));
287  } catch (InterruptedException notLogged) {
288  logger.log(Level.INFO, String.format("%s resuming at %s due to sleep interrupt (ingest job canceled)", Thread.currentThread().getName(), LocalDateTime.now()));
289  } finally {
290  ingestJobPipeline.unregisterPausedIngestThread(Thread.currentThread());
291  }
292  }
293  }
294  }
295 
305  abstract void prepareForTask(T task) throws IngestTaskPipelineException;
306 
312  PipelineModule<T> getCurrentlyRunningModule() {
313  return currentModule;
314  }
315 
321  List<IngestModuleError> shutDown() {
322  List<IngestModuleError> errors = new ArrayList<>();
323  if (running == true) {
324  for (PipelineModule<T> module : modules) {
325  try {
326  module.shutDown();
327  } catch (Throwable ex) { // Catch-all exception firewall
328  errors.add(new IngestModuleError(module.getDisplayName(), ex));
329  String msg = ex.getMessage();
330  if (msg == null) {
331  /*
332  * Jython run-time errors don't seem to have a message,
333  * but have details in the string returned by
334  * toString().
335  */
336  msg = ex.toString();
337  }
338  MessageNotifyUtil.Notify.error(NbBundle.getMessage(this.getClass(), "FileIngestPipeline.moduleError.title.text", module.getDisplayName()), msg);
339  }
340  }
341  }
342  running = false;
343  return errors;
344 
345  }
346 
356  abstract void cleanUpAfterTask(T task) throws IngestTaskPipelineException;
357 
365  static abstract class PipelineModule<T extends IngestTask> implements IngestModule {
366 
367  private final IngestModule module;
368  private final String displayName;
369  private volatile Date processingStartTime;
370 
378  PipelineModule(IngestModule module, String displayName) {
379  this.module = module;
380  this.displayName = displayName;
381  this.processingStartTime = new Date();
382  }
383 
389  String getClassName() {
390  return module.getClass().getCanonicalName();
391  }
392 
398  String getDisplayName() {
399  return displayName;
400  }
401 
406  void setProcessingStartTime() {
407  processingStartTime = new Date();
408  }
409 
416  Date getProcessingStartTime() {
417  return new Date(processingStartTime.getTime());
418  }
419 
420  @Override
421  public void startUp(IngestJobContext context) throws IngestModuleException {
422  module.startUp(context);
423  }
424 
436  abstract void executeTask(IngestJobPipeline ingestJobPipeline, T task) throws IngestModuleException;
437 
438  @Override
439  public void shutDown() {
440  module.shutDown();
441  }
442 
443  }
444 
448  public static class IngestTaskPipelineException extends Exception {
449 
450  private static final long serialVersionUID = 1L;
451 
457  public IngestTaskPipelineException(String message) {
458  super(message);
459  }
460 
467  public IngestTaskPipelineException(String message, Throwable cause) {
468  super(message, cause);
469  }
470 
471  }
472 
473 }

Copyright © 2012-2021 Basis Technology. Generated on: Fri Aug 6 2021
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.