Autopsy  4.18.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
IngestTaskPipeline.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2021 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.ingest;
20 
21 import static java.lang.Thread.sleep;
22 import java.time.DayOfWeek;
23 import java.time.LocalDateTime;
24 import java.time.temporal.ChronoUnit;
25 import java.util.ArrayList;
26 import java.util.Date;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.TimeUnit;
30 import java.util.logging.Level;
31 import org.openide.util.NbBundle;
34 
44 abstract class IngestTaskPipeline<T extends IngestTask> {
45 
46  private static final Logger logger = Logger.getLogger(IngestTaskPipeline.class.getName());
47  private final IngestJobPipeline ingestJobPipeline;
48  private final List<IngestModuleTemplate> moduleTemplates;
49  private final List<PipelineModule<T>> modules;
50  private volatile Date startTime;
51  private volatile boolean running;
52  private volatile PipelineModule<T> currentModule;
53 
66  IngestTaskPipeline(IngestJobPipeline ingestJobPipeline, List<IngestModuleTemplate> moduleTemplates) {
67  this.ingestJobPipeline = ingestJobPipeline;
68  this.moduleTemplates = moduleTemplates;
69  modules = new ArrayList<>();
70  }
71 
77  boolean isEmpty() {
78  return modules.isEmpty();
79  }
80 
87  boolean isRunning() {
88  return running;
89  }
90 
96  List<IngestModuleError> startUp() {
97  createIngestModules(moduleTemplates);
98  return startUpIngestModules();
99  }
100 
107  private void createIngestModules(List<IngestModuleTemplate> moduleTemplates) {
108  for (IngestModuleTemplate template : moduleTemplates) {
109  Optional<PipelineModule<T>> module = acceptModuleTemplate(template);
110  if (module.isPresent()) {
111  modules.add(module.get());
112  }
113  }
114  }
115 
127  abstract Optional<PipelineModule<T>> acceptModuleTemplate(IngestModuleTemplate ingestModuleTemplate);
128 
134  private List<IngestModuleError> startUpIngestModules() {
135  startTime = new Date();
136  running = true;
137  List<IngestModuleError> errors = new ArrayList<>();
138  for (PipelineModule<T> module : modules) {
139  try {
140  module.startUp(new IngestJobContext(ingestJobPipeline));
141  } catch (Throwable ex) { // Catch-all exception firewall
142  errors.add(new IngestModuleError(module.getDisplayName(), ex));
143  }
144  }
145  return errors;
146  }
147 
154  Date getStartTime() {
155  Date reportedStartTime = null;
156  if (startTime != null) {
157  reportedStartTime = new Date(startTime.getTime());
158  }
159  return reportedStartTime;
160  }
161 
170  abstract void prepareTask(T task) throws IngestTaskPipelineException;
171 
179  List<IngestModuleError> performTask(T task) {
180  List<IngestModuleError> errors = new ArrayList<>();
181  if (!this.ingestJobPipeline.isCancelled()) {
182  pauseIfScheduled();
183  if (ingestJobPipeline.isCancelled()) {
184  return errors;
185  }
186  try {
187  prepareTask(task);
188  } catch (IngestTaskPipelineException ex) {
189  errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS
190  return errors;
191  }
192  for (PipelineModule<T> module : modules) {
193  pauseIfScheduled();
194  if (ingestJobPipeline.isCancelled()) {
195  break;
196  }
197  try {
198  currentModule = module;
199  currentModule.setProcessingStartTime();
200  module.performTask(ingestJobPipeline, task);
201  } catch (Throwable ex) { // Catch-all exception firewall
202  errors.add(new IngestModuleError(module.getDisplayName(), ex));
203  }
204  if (ingestJobPipeline.isCancelled()) {
205  break;
206  }
207  }
208  }
209  try {
210  completeTask(task);
211  } catch (IngestTaskPipelineException ex) {
212  errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS
213  }
214  currentModule = null;
215  return errors;
216  }
217 
222  private void pauseIfScheduled() {
223  if (ScheduledIngestPauseSettings.getPauseEnabled() == true) {
224  /*
225  * Calculate the date/time for the scheduled pause start by
226  * "normalizing" the day of week to the current week and then
227  * adjusting the hour and minute to match the scheduled hour and
228  * minute.
229  */
230  LocalDateTime pauseStart = LocalDateTime.now();
231  DayOfWeek pauseDayOfWeek = ScheduledIngestPauseSettings.getPauseDayOfWeek();
232  while (pauseStart.getDayOfWeek() != pauseDayOfWeek) {
233  pauseStart = pauseStart.minusDays(1);
234  }
235  pauseStart = pauseStart.withHour(ScheduledIngestPauseSettings.getPauseStartTimeHour());
236  pauseStart = pauseStart.withMinute(ScheduledIngestPauseSettings.getPauseStartTimeMinute());
237  pauseStart = pauseStart.withSecond(0);
238 
239  /*
240  * Calculate the pause end date/time.
241  */
242  LocalDateTime pauseEnd = pauseStart.plusMinutes(ScheduledIngestPauseSettings.getPauseDurationMinutes());
243 
244  /*
245  * Check whether the current date/time is in the pause interval. If
246  * it is, register the ingest thread this code is running in so it
247  * can be interrupted if the job is canceled, and sleep until
248  * whatever time remains in the pause interval has expired.
249  */
250  LocalDateTime timeNow = LocalDateTime.now();
251  if ((timeNow.equals(pauseStart) || timeNow.isAfter(pauseStart)) && timeNow.isBefore(pauseEnd)) {
252  ingestJobPipeline.registerPausedIngestThread(Thread.currentThread());
253  try {
254  long timeRemainingMillis = ChronoUnit.MILLIS.between(timeNow, pauseEnd);
255  logger.log(Level.INFO, String.format("%s pausing at %s for ~%d minutes", Thread.currentThread().getName(), LocalDateTime.now(), TimeUnit.MILLISECONDS.toMinutes(timeRemainingMillis)));
256  sleep(timeRemainingMillis);
257  logger.log(Level.INFO, String.format("%s resuming at %s", Thread.currentThread().getName(), LocalDateTime.now()));
258  } catch (InterruptedException notLogged) {
259  logger.log(Level.INFO, String.format("%s resuming at %s due to sleep interrupt (ingest job canceled)", Thread.currentThread().getName(), LocalDateTime.now()));
260  } finally {
261  ingestJobPipeline.unregisterPausedIngestThread(Thread.currentThread());
262  }
263  }
264  }
265  }
266 
272  PipelineModule<T> getCurrentlyRunningModule() {
273  return currentModule;
274  }
275 
284  abstract void completeTask(T task) throws IngestTaskPipelineException;
285 
291  List<IngestModuleError> shutDown() {
292  List<IngestModuleError> errors = new ArrayList<>();
293  if (running == true) {
294  for (PipelineModule<T> module : modules) {
295  try {
296  module.shutDown();
297  } catch (Throwable ex) { // Catch-all exception firewall
298  errors.add(new IngestModuleError(module.getDisplayName(), ex));
299  String msg = ex.getMessage();
300  if (msg == null) {
301  /*
302  * Jython run-time errors don't seem to have a message,
303  * but have details in the string returned by
304  * toString().
305  */
306  msg = ex.toString();
307  }
308  MessageNotifyUtil.Notify.error(NbBundle.getMessage(this.getClass(), "FileIngestPipeline.moduleError.title.text", module.getDisplayName()), msg);
309  }
310  }
311  }
312  running = false;
313  return errors;
314 
315  }
316 
321  static abstract class PipelineModule<T extends IngestTask> implements IngestModule {
322 
323  private final IngestModule module;
324  private final String displayName;
325  private volatile Date processingStartTime;
326 
334  PipelineModule(IngestModule module, String displayName) {
335  this.module = module;
336  this.displayName = displayName;
337  this.processingStartTime = new Date();
338  }
339 
345  String getClassName() {
346  return module.getClass().getCanonicalName();
347  }
348 
354  String getDisplayName() {
355  return displayName;
356  }
357 
362  void setProcessingStartTime() {
363  processingStartTime = new Date();
364  }
365 
372  Date getProcessingStartTime() {
373  return new Date(processingStartTime.getTime());
374  }
375 
376  @Override
377  public void startUp(IngestJobContext context) throws IngestModuleException {
378  module.startUp(context);
379  }
380 
391  abstract void performTask(IngestJobPipeline ingestJobPipeline, T task) throws IngestModuleException;
392 
393  @Override
394  public void shutDown() {
395  module.shutDown();
396  }
397 
398  }
399 
403  public static class IngestTaskPipelineException extends Exception {
404 
405  private static final long serialVersionUID = 1L;
406 
407  public IngestTaskPipelineException(String message) {
408  super(message);
409  }
410 
411  public IngestTaskPipelineException(String message, Throwable cause) {
412  super(message, cause);
413  }
414  }
415 
416 }

Copyright © 2012-2021 Basis Technology. Generated on: Thu Jul 8 2021
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.