19 package org.sleuthkit.autopsy.ingest;
21 import static java.lang.Thread.sleep;
22 import java.time.DayOfWeek;
23 import java.time.LocalDateTime;
24 import java.time.temporal.ChronoUnit;
25 import java.util.ArrayList;
26 import java.util.Date;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.TimeUnit;
30 import java.util.logging.Level;
31 import org.openide.util.NbBundle;
43 abstract class IngestPipeline<T
extends IngestTask> {
52 private static final Logger logger = Logger.getLogger(IngestPipeline.class.getName());
53 private final IngestJobExecutor ingestJobExecutor;
54 private final List<IngestModuleTemplate> moduleTemplates;
55 private final List<PipelineModule<T>> modules;
56 private volatile Date startTime;
57 private volatile boolean running;
58 private volatile PipelineModule<T> currentModule;
70 IngestPipeline(IngestJobExecutor ingestJobExecutor, List<IngestModuleTemplate> moduleTemplates) {
71 this.ingestJobExecutor = ingestJobExecutor;
72 this.moduleTemplates = moduleTemplates;
73 modules =
new ArrayList<>();
83 return modules.isEmpty();
103 List<IngestModuleError> startUp() {
104 List<IngestModuleError> errors =
new ArrayList<>();
112 createIngestModules();
113 errors.addAll(startUpIngestModules());
115 errors.add(
new IngestModuleError(
"Ingest Task Pipeline",
new IngestPipelineException(
"Pipeline already started")));
124 private void createIngestModules() {
125 if (modules.isEmpty()) {
126 for (IngestModuleTemplate
template : moduleTemplates) {
127 Optional<PipelineModule<T>> module = acceptModuleTemplate(
template);
128 if (module.isPresent()) {
129 modules.add(module.get());
146 abstract Optional<PipelineModule<T>> acceptModuleTemplate(IngestModuleTemplate
template);
153 private List<IngestModuleError> startUpIngestModules() {
154 List<IngestModuleError> errors =
new ArrayList<>();
155 startTime =
new Date();
157 for (PipelineModule<T> module : modules) {
159 module.startUp(
new IngestJobContext(ingestJobExecutor));
160 }
catch (Throwable ex) {
167 errors.add(
new IngestModuleError(module.getDisplayName(), ex));
179 Date getStartTime() {
180 Date reportedStartTime = null;
181 if (startTime != null) {
182 reportedStartTime =
new Date(startTime.getTime());
184 return reportedStartTime;
195 List<IngestModuleError> performTask(T task) {
196 List<IngestModuleError> errors =
new ArrayList<>();
198 if (!ingestJobExecutor.isCancelled()) {
200 if (ingestJobExecutor.isCancelled()) {
204 prepareForTask(task);
205 }
catch (IngestPipelineException ex) {
206 errors.add(
new IngestModuleError(
"Ingest Task Pipeline", ex));
209 for (PipelineModule<T> module : modules) {
211 if (ingestJobExecutor.isCancelled()) {
215 currentModule = module;
216 currentModule.setProcessingStartTime();
217 module.process(ingestJobExecutor, task);
218 }
catch (Throwable ex) {
224 errors.add(
new IngestModuleError(module.getDisplayName(), ex));
226 if (ingestJobExecutor.isCancelled()) {
232 cleanUpAfterTask(task);
233 }
catch (IngestPipelineException ex) {
234 errors.add(
new IngestModuleError(
"Ingest Task Pipeline", ex));
237 errors.add(
new IngestModuleError(
"Ingest Task Pipeline",
new IngestPipelineException(
"Pipeline not started or shut down")));
239 currentModule = null;
249 private void pauseIfScheduled() {
250 if (ScheduledIngestPauseSettings.getPauseEnabled() ==
true) {
257 LocalDateTime pauseStart = LocalDateTime.now();
258 DayOfWeek pauseDayOfWeek = ScheduledIngestPauseSettings.getPauseDayOfWeek();
259 while (pauseStart.getDayOfWeek() != pauseDayOfWeek) {
260 pauseStart = pauseStart.minusDays(1);
262 pauseStart = pauseStart.withHour(ScheduledIngestPauseSettings.getPauseStartTimeHour());
263 pauseStart = pauseStart.withMinute(ScheduledIngestPauseSettings.getPauseStartTimeMinute());
264 pauseStart = pauseStart.withSecond(0);
269 LocalDateTime pauseEnd = pauseStart.plusMinutes(ScheduledIngestPauseSettings.getPauseDurationMinutes());
277 LocalDateTime timeNow = LocalDateTime.now();
278 if ((timeNow.equals(pauseStart) || timeNow.isAfter(pauseStart)) && timeNow.isBefore(pauseEnd)) {
279 ingestJobExecutor.registerPausedIngestThread(Thread.currentThread());
281 long timeRemainingMillis = ChronoUnit.MILLIS.between(timeNow, pauseEnd);
282 logger.log(Level.INFO, String.format(
"%s pausing at %s for ~%d minutes", Thread.currentThread().getName(), LocalDateTime.now(), TimeUnit.MILLISECONDS.toMinutes(timeRemainingMillis)));
283 sleep(timeRemainingMillis);
284 logger.log(Level.INFO, String.format(
"%s resuming at %s", Thread.currentThread().getName(), LocalDateTime.now()));
285 }
catch (InterruptedException notLogged) {
286 logger.log(Level.INFO, String.format(
"%s resuming at %s due to sleep interrupt (ingest job canceled)", Thread.currentThread().getName(), LocalDateTime.now()));
288 ingestJobExecutor.unregisterPausedIngestThread(Thread.currentThread());
303 abstract void prepareForTask(T task)
throws IngestPipelineException;
310 PipelineModule<T> getCurrentlyRunningModule() {
311 return currentModule;
319 List<IngestModuleError> shutDown() {
320 List<IngestModuleError> errors =
new ArrayList<>();
321 if (running ==
true) {
322 for (PipelineModule<T> module : modules) {
325 }
catch (Throwable ex) {
326 errors.add(
new IngestModuleError(module.getDisplayName(), ex));
327 String msg = ex.getMessage();
336 MessageNotifyUtil.Notify.error(NbBundle.getMessage(
this.getClass(),
"FileIngestPipeline.moduleError.title.text", module.getDisplayName()), msg);
354 abstract void cleanUpAfterTask(T task)
throws IngestPipelineException;
360 static abstract class PipelineModule<T
extends IngestTask> implements IngestModule {
362 private final IngestModule module;
363 private final String displayName;
364 private volatile Date processingStartTime;
374 PipelineModule(IngestModule module, String displayName) {
375 this.module = module;
376 this.displayName = displayName;
377 processingStartTime =
new Date();
385 String getClassName() {
386 return module.getClass().getCanonicalName();
394 String getDisplayName() {
402 void setProcessingStartTime() {
403 processingStartTime =
new Date();
412 Date getProcessingStartTime() {
413 return new Date(processingStartTime.getTime());
417 public void startUp(IngestJobContext context)
throws IngestModuleException {
418 module.startUp(context);
432 abstract void process(IngestJobExecutor ingestJobExecutor, T task)
throws IngestModuleException;
435 public void shutDown() {
444 static class IngestPipelineException
extends Exception {
446 private static final long serialVersionUID = 1L;
453 IngestPipelineException(String message) {
463 IngestPipelineException(String message, Throwable cause) {
464 super(message, cause);