19 package com.basistech.df.cybertriage.autopsy.malwarescan;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Consumer;
35 class BatchProcessor<T> {
37 private ExecutorService processingExecutorService = Executors.newSingleThreadExecutor();
39 private final BlockingQueue<T> batchingQueue;
40 private final int batchSize;
41 private final Consumer<List<T>> itemsConsumer;
42 private final long secondsTimeout;
44 public BatchProcessor(
int batchSize,
long secondsTimeout, Consumer<List<T>> itemsConsumer) {
45 this.batchingQueue =
new LinkedBlockingQueue<>(batchSize);
46 this.batchSize = batchSize;
47 this.itemsConsumer = itemsConsumer;
48 this.secondsTimeout = secondsTimeout;
51 public synchronized void add(T item)
throws InterruptedException {
52 batchingQueue.add(item);
53 if (batchingQueue.size() >= batchSize) {
58 public synchronized void flushAndReset()
throws InterruptedException {
63 processingExecutorService.shutdown();
66 processingExecutorService.awaitTermination(secondsTimeout, TimeUnit.SECONDS);
69 processingExecutorService = Executors.newSingleThreadExecutor();
72 private synchronized void asyncProcessBatch()
throws InterruptedException {
73 if (!batchingQueue.isEmpty()) {
74 final List<T> processingList =
new ArrayList<>();
77 batchingQueue.drainTo(processingList);
80 processingExecutorService.submit(() -> itemsConsumer.accept(processingList));