19 package org.sleuthkit.autopsy.ingest;
21 import static java.lang.Thread.sleep;
22 import java.time.DayOfWeek;
23 import java.time.LocalDateTime;
24 import java.time.temporal.ChronoUnit;
25 import java.util.ArrayList;
26 import java.util.Date;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.TimeUnit;
30 import java.util.logging.Level;
31 import org.openide.util.NbBundle;
44 abstract class IngestTaskPipeline<T
extends IngestTask> {
46 private static final Logger logger = Logger.getLogger(IngestTaskPipeline.class.getName());
47 private final IngestJobPipeline ingestJobPipeline;
48 private final List<IngestModuleTemplate> moduleTemplates;
49 private final List<PipelineModule<T>> modules;
50 private volatile Date startTime;
51 private volatile boolean running;
52 private volatile PipelineModule<T> currentModule;
66 IngestTaskPipeline(IngestJobPipeline ingestJobPipeline, List<IngestModuleTemplate> moduleTemplates) {
67 this.ingestJobPipeline = ingestJobPipeline;
68 this.moduleTemplates = moduleTemplates;
69 modules =
new ArrayList<>();
78 return modules.isEmpty();
96 List<IngestModuleError> startUp() {
97 createIngestModules(moduleTemplates);
98 return startUpIngestModules();
107 private void createIngestModules(List<IngestModuleTemplate> moduleTemplates) {
108 for (IngestModuleTemplate
template : moduleTemplates) {
109 Optional<PipelineModule<T>> module = acceptModuleTemplate(
template);
110 if (module.isPresent()) {
111 modules.add(module.get());
127 abstract Optional<PipelineModule<T>> acceptModuleTemplate(IngestModuleTemplate ingestModuleTemplate);
134 private List<IngestModuleError> startUpIngestModules() {
135 startTime =
new Date();
137 List<IngestModuleError> errors =
new ArrayList<>();
138 for (PipelineModule<T> module : modules) {
140 module.startUp(
new IngestJobContext(ingestJobPipeline));
141 }
catch (Throwable ex) {
142 errors.add(
new IngestModuleError(module.getDisplayName(), ex));
154 Date getStartTime() {
155 Date reportedStartTime = null;
156 if (startTime != null) {
157 reportedStartTime =
new Date(startTime.getTime());
159 return reportedStartTime;
170 abstract void prepareTask(T task)
throws IngestTaskPipelineException;
179 List<IngestModuleError> performTask(T task) {
180 List<IngestModuleError> errors =
new ArrayList<>();
181 if (!this.ingestJobPipeline.isCancelled()) {
183 if (ingestJobPipeline.isCancelled()) {
188 }
catch (IngestTaskPipelineException ex) {
189 errors.add(
new IngestModuleError(
"Ingest Task Pipeline", ex));
192 for (PipelineModule<T> module : modules) {
194 if (ingestJobPipeline.isCancelled()) {
198 currentModule = module;
199 currentModule.setProcessingStartTime();
200 module.performTask(ingestJobPipeline, task);
201 }
catch (Throwable ex) {
202 errors.add(
new IngestModuleError(module.getDisplayName(), ex));
204 if (ingestJobPipeline.isCancelled()) {
211 }
catch (IngestTaskPipelineException ex) {
212 errors.add(
new IngestModuleError(
"Ingest Task Pipeline", ex));
214 currentModule = null;
222 private void pauseIfScheduled() {
223 if (ScheduledIngestPauseSettings.getPauseEnabled() ==
true) {
230 LocalDateTime pauseStart = LocalDateTime.now();
231 DayOfWeek pauseDayOfWeek = ScheduledIngestPauseSettings.getPauseDayOfWeek();
232 while (pauseStart.getDayOfWeek() != pauseDayOfWeek) {
233 pauseStart = pauseStart.minusDays(1);
235 pauseStart = pauseStart.withHour(ScheduledIngestPauseSettings.getPauseStartTimeHour());
236 pauseStart = pauseStart.withMinute(ScheduledIngestPauseSettings.getPauseStartTimeMinute());
237 pauseStart = pauseStart.withSecond(0);
242 LocalDateTime pauseEnd = pauseStart.plusMinutes(ScheduledIngestPauseSettings.getPauseDurationMinutes());
250 LocalDateTime timeNow = LocalDateTime.now();
251 if ((timeNow.equals(pauseStart) || timeNow.isAfter(pauseStart)) && timeNow.isBefore(pauseEnd)) {
252 ingestJobPipeline.registerPausedIngestThread(Thread.currentThread());
254 long timeRemainingMillis = ChronoUnit.MILLIS.between(timeNow, pauseEnd);
255 logger.log(Level.INFO, String.format(
"%s pausing at %s for ~%d minutes", Thread.currentThread().getName(), LocalDateTime.now(), TimeUnit.MILLISECONDS.toMinutes(timeRemainingMillis)));
256 sleep(timeRemainingMillis);
257 logger.log(Level.INFO, String.format(
"%s resuming at %s", Thread.currentThread().getName(), LocalDateTime.now()));
258 }
catch (InterruptedException notLogged) {
259 logger.log(Level.INFO, String.format(
"%s resuming at %s due to sleep interrupt (ingest job canceled)", Thread.currentThread().getName(), LocalDateTime.now()));
261 ingestJobPipeline.unregisterPausedIngestThread(Thread.currentThread());
272 PipelineModule<T> getCurrentlyRunningModule() {
273 return currentModule;
284 abstract void completeTask(T task)
throws IngestTaskPipelineException;
291 List<IngestModuleError> shutDown() {
292 List<IngestModuleError> errors =
new ArrayList<>();
293 if (running ==
true) {
294 for (PipelineModule<T> module : modules) {
297 }
catch (Throwable ex) {
298 errors.add(
new IngestModuleError(module.getDisplayName(), ex));
299 String msg = ex.getMessage();
308 MessageNotifyUtil.Notify.error(NbBundle.getMessage(
this.getClass(),
"FileIngestPipeline.moduleError.title.text", module.getDisplayName()), msg);
321 static abstract class PipelineModule<T
extends IngestTask> implements IngestModule {
323 private final IngestModule module;
324 private final String displayName;
325 private volatile Date processingStartTime;
334 PipelineModule(IngestModule module, String displayName) {
335 this.module = module;
336 this.displayName = displayName;
337 this.processingStartTime =
new Date();
345 String getClassName() {
346 return module.getClass().getCanonicalName();
354 String getDisplayName() {
362 void setProcessingStartTime() {
363 processingStartTime =
new Date();
372 Date getProcessingStartTime() {
373 return new Date(processingStartTime.getTime());
377 public void startUp(IngestJobContext context)
throws IngestModuleException {
378 module.startUp(context);
391 abstract void performTask(IngestJobPipeline ingestJobPipeline, T task)
throws IngestModuleException;
394 public void shutDown() {
403 public static class IngestTaskPipelineException
extends Exception {
412 super(message, cause);
IngestTaskPipelineException(String message, Throwable cause)
static final long serialVersionUID
IngestTaskPipelineException(String message)