19 package org.sleuthkit.autopsy.ingest;
21 import static java.lang.Thread.sleep;
22 import java.time.DayOfWeek;
23 import java.time.LocalDateTime;
24 import java.time.temporal.ChronoUnit;
25 import java.util.ArrayList;
26 import java.util.Date;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.TimeUnit;
30 import java.util.logging.Level;
31 import org.openide.util.NbBundle;
47 abstract class IngestTaskPipeline<T
extends IngestTask> {
49 private static final Logger logger = Logger.getLogger(IngestTaskPipeline.class.getName());
50 private final IngestJobPipeline ingestJobPipeline;
51 private final List<IngestModuleTemplate> moduleTemplates;
52 private final List<PipelineModule<T>> modules;
53 private volatile Date startTime;
54 private volatile boolean running;
55 private volatile PipelineModule<T> currentModule;
66 IngestTaskPipeline(IngestJobPipeline ingestPipeline, List<IngestModuleTemplate> moduleTemplates) {
67 this.ingestJobPipeline = ingestPipeline;
74 this.moduleTemplates = moduleTemplates;
75 modules =
new ArrayList<>();
85 return modules.isEmpty();
104 List<IngestModuleError> startUp() {
105 List<IngestModuleError> errors =
new ArrayList<>();
113 createIngestModules(moduleTemplates);
114 errors.addAll(startUpIngestModules());
116 errors.add(
new IngestModuleError(
"Ingest Task Pipeline",
new IngestTaskPipelineException(
"Pipeline already started")));
127 private void createIngestModules(List<IngestModuleTemplate> moduleTemplates) {
128 if (modules.isEmpty()) {
129 for (IngestModuleTemplate
template : moduleTemplates) {
130 Optional<PipelineModule<T>> module = acceptModuleTemplate(
template);
131 if (module.isPresent()) {
132 modules.add(module.get());
149 abstract Optional<PipelineModule<T>> acceptModuleTemplate(IngestModuleTemplate
template);
156 private List<IngestModuleError> startUpIngestModules() {
157 List<IngestModuleError> errors =
new ArrayList<>();
158 startTime =
new Date();
160 for (PipelineModule<T> module : modules) {
162 module.startUp(
new IngestJobContext(ingestJobPipeline));
163 }
catch (Throwable ex) {
170 errors.add(
new IngestModuleError(module.getDisplayName(), ex));
182 Date getStartTime() {
183 Date reportedStartTime = null;
184 if (startTime != null) {
185 reportedStartTime =
new Date(startTime.getTime());
187 return reportedStartTime;
198 List<IngestModuleError> executeTask(T task) {
199 List<IngestModuleError> errors =
new ArrayList<>();
201 if (!ingestJobPipeline.isCancelled()) {
203 if (ingestJobPipeline.isCancelled()) {
207 prepareForTask(task);
208 }
catch (IngestTaskPipelineException ex) {
209 errors.add(
new IngestModuleError(
"Ingest Task Pipeline", ex));
212 for (PipelineModule<T> module : modules) {
214 if (ingestJobPipeline.isCancelled()) {
218 currentModule = module;
219 currentModule.setProcessingStartTime();
220 module.executeTask(ingestJobPipeline, task);
221 }
catch (Throwable ex) {
228 errors.add(
new IngestModuleError(module.getDisplayName(), ex));
230 if (ingestJobPipeline.isCancelled()) {
236 cleanUpAfterTask(task);
237 }
catch (IngestTaskPipelineException ex) {
238 errors.add(
new IngestModuleError(
"Ingest Task Pipeline", ex));
241 errors.add(
new IngestModuleError(
"Ingest Task Pipeline",
new IngestTaskPipelineException(
"Pipeline not started or shut down")));
243 currentModule = null;
251 private void pauseIfScheduled() {
252 if (ScheduledIngestPauseSettings.getPauseEnabled() ==
true) {
259 LocalDateTime pauseStart = LocalDateTime.now();
260 DayOfWeek pauseDayOfWeek = ScheduledIngestPauseSettings.getPauseDayOfWeek();
261 while (pauseStart.getDayOfWeek() != pauseDayOfWeek) {
262 pauseStart = pauseStart.minusDays(1);
264 pauseStart = pauseStart.withHour(ScheduledIngestPauseSettings.getPauseStartTimeHour());
265 pauseStart = pauseStart.withMinute(ScheduledIngestPauseSettings.getPauseStartTimeMinute());
266 pauseStart = pauseStart.withSecond(0);
271 LocalDateTime pauseEnd = pauseStart.plusMinutes(ScheduledIngestPauseSettings.getPauseDurationMinutes());
279 LocalDateTime timeNow = LocalDateTime.now();
280 if ((timeNow.equals(pauseStart) || timeNow.isAfter(pauseStart)) && timeNow.isBefore(pauseEnd)) {
281 ingestJobPipeline.registerPausedIngestThread(Thread.currentThread());
283 long timeRemainingMillis = ChronoUnit.MILLIS.between(timeNow, pauseEnd);
284 logger.log(Level.INFO, String.format(
"%s pausing at %s for ~%d minutes", Thread.currentThread().getName(), LocalDateTime.now(), TimeUnit.MILLISECONDS.toMinutes(timeRemainingMillis)));
285 sleep(timeRemainingMillis);
286 logger.log(Level.INFO, String.format(
"%s resuming at %s", Thread.currentThread().getName(), LocalDateTime.now()));
287 }
catch (InterruptedException notLogged) {
288 logger.log(Level.INFO, String.format(
"%s resuming at %s due to sleep interrupt (ingest job canceled)", Thread.currentThread().getName(), LocalDateTime.now()));
290 ingestJobPipeline.unregisterPausedIngestThread(Thread.currentThread());
305 abstract void prepareForTask(T task)
throws IngestTaskPipelineException;
312 PipelineModule<T> getCurrentlyRunningModule() {
313 return currentModule;
321 List<IngestModuleError> shutDown() {
322 List<IngestModuleError> errors =
new ArrayList<>();
323 if (running ==
true) {
324 for (PipelineModule<T> module : modules) {
327 }
catch (Throwable ex) {
328 errors.add(
new IngestModuleError(module.getDisplayName(), ex));
329 String msg = ex.getMessage();
338 MessageNotifyUtil.Notify.error(NbBundle.getMessage(
this.getClass(),
"FileIngestPipeline.moduleError.title.text", module.getDisplayName()), msg);
356 abstract void cleanUpAfterTask(T task)
throws IngestTaskPipelineException;
365 static abstract class PipelineModule<T
extends IngestTask> implements IngestModule {
367 private final IngestModule module;
368 private final String displayName;
369 private volatile Date processingStartTime;
378 PipelineModule(IngestModule module, String displayName) {
379 this.module = module;
380 this.displayName = displayName;
381 this.processingStartTime =
new Date();
389 String getClassName() {
390 return module.getClass().getCanonicalName();
398 String getDisplayName() {
406 void setProcessingStartTime() {
407 processingStartTime =
new Date();
416 Date getProcessingStartTime() {
417 return new Date(processingStartTime.getTime());
421 public void startUp(IngestJobContext context)
throws IngestModuleException {
422 module.startUp(context);
436 abstract void executeTask(IngestJobPipeline ingestJobPipeline, T task)
throws IngestModuleException;
439 public void shutDown() {
448 public static class IngestTaskPipelineException
extends Exception {
468 super(message, cause);
IngestTaskPipelineException(String message, Throwable cause)
static final long serialVersionUID
IngestTaskPipelineException(String message)