Autopsy  4.20.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
BatchProcessor.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2023 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package com.basistech.df.cybertriage.autopsy.malwarescan;
20 
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.LinkedBlockingQueue;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
32 import java.util.function.Consumer;
33 
39 public class BatchProcessor<T> {
40 
41  private ExecutorService processingExecutorService = Executors.newSingleThreadExecutor();
42 
43  private final BlockingQueue<T> batchingQueue;
44  private final int batchSize;
45  private final Consumer<List<T>> itemsConsumer;
46  private final long secondsTimeout;
47 
48  public BatchProcessor(int batchSize, long secondsTimeout, Consumer<List<T>> itemsConsumer) {
49  this.batchingQueue = new LinkedBlockingQueue<>(batchSize);
50  this.batchSize = batchSize;
51  this.itemsConsumer = itemsConsumer;
52  this.secondsTimeout = secondsTimeout;
53  }
54 
55  public synchronized void add(T item) throws InterruptedException {
56  batchingQueue.add(item);
57  if (batchingQueue.size() >= batchSize) {
59  }
60  }
61 
62  public synchronized void flushAndReset() throws InterruptedException {
63  // get any remaining
65 
66  // don't accept any new additions
67  processingExecutorService.shutdown();
68 
69  // await termination
70  processingExecutorService.awaitTermination(secondsTimeout, TimeUnit.SECONDS);
71 
72  // get new (not shut down executor)
73  processingExecutorService = Executors.newSingleThreadExecutor();
74  }
75 
76  private synchronized void asyncProcessBatch() throws InterruptedException {
77  if (!batchingQueue.isEmpty()) {
78  final List<T> processingList = new ArrayList<>();
79 
80  // transfer batching queue to processing queue
81  batchingQueue.drainTo(processingList);
82 
83  // submit to be processed
84  processingExecutorService.submit(() -> itemsConsumer.accept(processingList));
85  }
86  }
87 
88 }
BatchProcessor(int batchSize, long secondsTimeout, Consumer< List< T >> itemsConsumer)

Copyright © 2012-2022 Basis Technology. Generated on: Tue Aug 1 2023
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.