19 package com.basistech.df.cybertriage.autopsy.malwarescan;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32 import java.util.function.Consumer;
48 public BatchProcessor(
int batchSize,
long secondsTimeout, Consumer<List<T>> itemsConsumer) {
49 this.batchingQueue =
new LinkedBlockingQueue<>(
batchSize);
55 public synchronized void add(T item)
throws InterruptedException {
56 batchingQueue.add(item);
67 processingExecutorService.shutdown();
70 processingExecutorService.awaitTermination(secondsTimeout, TimeUnit.SECONDS);
73 processingExecutorService = Executors.newSingleThreadExecutor();
77 if (!batchingQueue.isEmpty()) {
78 final List<T> processingList =
new ArrayList<>();
81 batchingQueue.drainTo(processingList);
84 processingExecutorService.submit(() -> itemsConsumer.accept(processingList));
synchronized void asyncProcessBatch()
BatchProcessor(int batchSize, long secondsTimeout, Consumer< List< T >> itemsConsumer)
final Consumer< List< T > > itemsConsumer
synchronized void flushAndReset()
synchronized void add(T item)
ExecutorService processingExecutorService
final BlockingQueue< T > batchingQueue
final long secondsTimeout