Autopsy  4.21.0
Graphical digital forensics platform for The Sleuth Kit and other tools.
BatchProcessor.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2023 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package com.basistech.df.cybertriage.autopsy.malwarescan;
20 
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Consumer;
29 
35 class BatchProcessor<T> {
36 
37  private ExecutorService processingExecutorService = Executors.newSingleThreadExecutor();
38 
39  private final BlockingQueue<T> batchingQueue;
40  private final int batchSize;
41  private final Consumer<List<T>> itemsConsumer;
42  private final long secondsTimeout;
43 
44  public BatchProcessor(int batchSize, long secondsTimeout, Consumer<List<T>> itemsConsumer) {
45  this.batchingQueue = new LinkedBlockingQueue<>(batchSize);
46  this.batchSize = batchSize;
47  this.itemsConsumer = itemsConsumer;
48  this.secondsTimeout = secondsTimeout;
49  }
50 
51  public synchronized void add(T item) throws InterruptedException {
52  batchingQueue.add(item);
53  if (batchingQueue.size() >= batchSize) {
54  asyncProcessBatch();
55  }
56  }
57 
58  public synchronized void flushAndReset() throws InterruptedException {
59  // get any remaining
60  asyncProcessBatch();
61 
62  // don't accept any new additions
63  processingExecutorService.shutdown();
64 
65  // await termination
66  processingExecutorService.awaitTermination(secondsTimeout, TimeUnit.SECONDS);
67 
68  // get new (not shut down executor)
69  processingExecutorService = Executors.newSingleThreadExecutor();
70  }
71 
72  private synchronized void asyncProcessBatch() throws InterruptedException {
73  if (!batchingQueue.isEmpty()) {
74  final List<T> processingList = new ArrayList<>();
75 
76  // transfer batching queue to processing queue
77  batchingQueue.drainTo(processingList);
78 
79  // submit to be processed
80  processingExecutorService.submit(() -> itemsConsumer.accept(processingList));
81  }
82  }
83 
84 }

Copyright © 2012-2022 Basis Technology. Generated on: Tue Feb 6 2024
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.