19 package org.sleuthkit.autopsy.keywordsearch;
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Iterator;
27 import java.util.List;
29 import java.util.Map.Entry;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.ScheduledThreadPoolExecutor;
36 import static java.util.concurrent.TimeUnit.MILLISECONDS;
37 import java.util.concurrent.atomic.AtomicLong;
38 import java.util.logging.Level;
39 import javax.swing.SwingUtilities;
40 import javax.swing.SwingWorker;
41 import org.netbeans.api.progress.aggregate.AggregateProgressFactory;
42 import org.netbeans.api.progress.aggregate.AggregateProgressHandle;
43 import org.netbeans.api.progress.aggregate.ProgressContributor;
44 import org.openide.util.Cancellable;
45 import org.openide.util.NbBundle;
46 import org.openide.util.NbBundle.Messages;
58 final class IngestSearchRunner {
60 private static final Logger logger = Logger.
getLogger(IngestSearchRunner.class.getName());
61 private static IngestSearchRunner instance = null;
62 private IngestServices services = IngestServices.getInstance();
63 private Ingester ingester = null;
64 private long currentUpdateIntervalMs;
65 private volatile boolean periodicSearchTaskRunning =
false;
66 private Future<?> jobProcessingTaskFuture;
67 private final ScheduledThreadPoolExecutor jobProcessingExecutor;
68 private static final int NUM_SEARCH_SCHEDULING_THREADS = 1;
69 private static final String SEARCH_SCHEDULER_THREAD_NAME =
"periodic-search-scheduler-%d";
72 private Map<Long, SearchJobInfo> jobs =
new ConcurrentHashMap<>();
74 IngestSearchRunner() {
75 currentUpdateIntervalMs = ((long) KeywordSearchSettings.getUpdateFrequency().getTime()) * 60 * 1000;
76 ingester = Ingester.getDefault();
77 jobProcessingExecutor =
new ScheduledThreadPoolExecutor(NUM_SEARCH_SCHEDULING_THREADS,
new ThreadFactoryBuilder().setNameFormat(SEARCH_SCHEDULER_THREAD_NAME).build());
84 public static synchronized IngestSearchRunner getInstance() {
85 if (instance == null) {
86 instance =
new IngestSearchRunner();
96 public synchronized void startJob(IngestJobContext jobContext, List<String> keywordListNames) {
97 long jobId = jobContext.getJobId();
98 if (jobs.containsKey(jobId) ==
false) {
99 logger.log(Level.INFO,
"Adding job {0}", jobId);
100 SearchJobInfo jobData =
new SearchJobInfo(jobContext, keywordListNames);
101 jobs.put(jobId, jobData);
105 jobs.get(jobId).incrementModuleReferenceCount();
108 if ((jobs.size() > 0) && (periodicSearchTaskRunning ==
false)) {
110 logger.log(Level.INFO,
"Resetting periodic search time out to default value");
111 currentUpdateIntervalMs = ((long) KeywordSearchSettings.getUpdateFrequency().getTime()) * 60 * 1000;
112 jobProcessingTaskFuture = jobProcessingExecutor.schedule(
new PeriodicSearchTask(), currentUpdateIntervalMs, MILLISECONDS);
113 periodicSearchTaskRunning =
true;
123 public synchronized void endJob(
long jobId) {
125 boolean readyForFinalSearch =
false;
126 job = jobs.get(jobId);
132 if (job.decrementModuleReferenceCount() == 0) {
134 readyForFinalSearch =
true;
137 if (readyForFinalSearch) {
138 logger.log(Level.INFO,
"Commiting search index before final search for search job {0}", job.getJobId());
143 if (jobs.isEmpty()) {
146 logger.log(Level.INFO,
"No more search jobs. Stopping periodic search task");
147 periodicSearchTaskRunning =
false;
148 jobProcessingTaskFuture.cancel(
true);
159 public synchronized void stopJob(
long jobId) {
160 logger.log(Level.INFO,
"Stopping search job {0}", jobId);
164 job = jobs.get(jobId);
170 IngestSearchRunner.Searcher currentSearcher = job.getCurrentSearcher();
171 if ((currentSearcher != null) && (!currentSearcher.isDone())) {
172 logger.log(Level.INFO,
"Cancelling search job {0}", jobId);
173 currentSearcher.cancel(
true);
178 if (jobs.isEmpty()) {
181 logger.log(Level.INFO,
"No more search jobs. Stopping periodic search task");
182 periodicSearchTaskRunning =
false;
183 jobProcessingTaskFuture.cancel(
true);
193 public synchronized void addKeywordListsToAllJobs(List<String> keywordListNames) {
194 for (String listName : keywordListNames) {
195 logger.log(Level.INFO,
"Adding keyword list {0} to all jobs", listName);
196 for (SearchJobInfo j : jobs.values()) {
197 j.addKeywordListName(listName);
205 private void commit() {
210 final int numIndexedFiles = KeywordSearch.getServer().queryNumIndexedFiles();
211 KeywordSearch.fireNumIndexedFilesChange(null, numIndexedFiles);
212 }
catch (NoOpenCoreException | KeywordSearchModuleException ex) {
213 logger.log(Level.SEVERE,
"Error executing Solr query to check number of indexed files", ex);
223 private void doFinalSearch(SearchJobInfo job) {
225 logger.log(Level.INFO,
"Starting final search for search job {0}", job.getJobId());
226 if (!job.getKeywordListNames().isEmpty()) {
229 logger.log(Level.INFO,
"Checking for previous search for search job {0} before executing final search", job.getJobId());
230 job.waitForCurrentWorker();
232 IngestSearchRunner.Searcher finalSearcher =
new IngestSearchRunner.Searcher(job,
true);
233 job.setCurrentSearcher(finalSearcher);
234 logger.log(Level.INFO,
"Kicking off final search for search job {0}", job.getJobId());
235 finalSearcher.execute();
238 logger.log(Level.INFO,
"Waiting for final search for search job {0}", job.getJobId());
240 logger.log(Level.INFO,
"Final search for search job {0} completed", job.getJobId());
242 }
catch (InterruptedException | CancellationException ex) {
243 logger.log(Level.INFO,
"Final search for search job {0} interrupted or cancelled", job.getJobId());
244 }
catch (ExecutionException ex) {
245 logger.log(Level.SEVERE, String.format(
"Final search for search job %d failed", job.getJobId()), ex);
260 if (jobs.isEmpty() || jobProcessingTaskFuture.isCancelled()) {
261 logger.log(Level.INFO,
"Exiting periodic search task");
262 periodicSearchTaskRunning =
false;
268 logger.log(Level.INFO,
"Starting periodic searches");
272 for (Iterator<Entry<Long, SearchJobInfo>> iterator = jobs.entrySet().iterator(); iterator.hasNext();) {
275 if (jobProcessingTaskFuture.isCancelled()) {
276 logger.log(Level.INFO,
"Search has been cancelled. Exiting periodic search task.");
277 periodicSearchTaskRunning =
false;
284 logger.log(Level.INFO,
"Executing periodic search for search job {0}", job.
getJobId());
293 }
catch (InterruptedException | ExecutionException ex) {
294 logger.log(Level.SEVERE,
"Error performing keyword search: {0}", ex.getMessage());
296 NbBundle.getMessage(this.getClass(),
297 "SearchRunner.Searcher.done.err.msg"), ex.getMessage()));
299 catch (java.util.concurrent.CancellationException ex) {
304 logger.log(Level.INFO,
"All periodic searches cumulatively took {0} secs", stopWatch.
getElapsedTimeSecs());
310 jobProcessingTaskFuture = jobProcessingExecutor.schedule(
new PeriodicSearchTask(), currentUpdateIntervalMs, MILLISECONDS);
319 if (lastSerchTimeSec * 1000 < currentUpdateIntervalMs / 4) {
323 currentUpdateIntervalMs = currentUpdateIntervalMs * 2;
324 logger.log(Level.WARNING,
"Last periodic search took {0} sec. Increasing search interval to {1} sec",
new Object[]{lastSerchTimeSec, currentUpdateIntervalMs/1000});
353 currentResults =
new HashMap<>();
354 workerRunning =
false;
375 if (!keywordListNames.contains(keywordListName)) {
376 keywordListNames.add(keywordListName);
381 return currentResults.get(k);
385 currentResults.put(k, resultsIDs);
393 workerRunning = flag;
405 moduleReferenceCount.incrementAndGet();
409 return moduleReferenceCount.decrementAndGet();
419 while (workerRunning) {
420 logger.log(Level.INFO,
"Waiting for previous worker to finish");
421 finalSearchLock.wait();
422 logger.log(Level.INFO,
"Notified previous worker finished");
432 logger.log(Level.INFO,
"Notifying after finishing search");
433 workerRunning =
false;
434 finalSearchLock.notify();
445 private final class Searcher extends SwingWorker<Object, Void> {
462 keywords =
new ArrayList<>();
463 keywordToList =
new HashMap<>();
464 keywordLists =
new ArrayList<>();
474 @Messages(
"SearchRunner.query.exception.msg=Error performing query:")
476 final String displayName = NbBundle.getMessage(this.getClass(),
"KeywordSearchIngestModule.doInBackGround.displayName")
477 + (finalRun ? (
" - " + NbBundle.getMessage(this.getClass(),
"KeywordSearchIngestModule.doInBackGround.finalizeMsg")) :
"");
478 final String pgDisplayName = displayName + (
" (" + NbBundle.getMessage(this.getClass(),
"KeywordSearchIngestModule.doInBackGround.pendingMsg") +
")");
479 progressGroup = AggregateProgressFactory.createSystemHandle(pgDisplayName, null,
new Cancellable() {
481 public boolean cancel() {
482 logger.log(Level.INFO,
"Cancelling the searcher by user.");
483 if (progressGroup != null) {
484 progressGroup.setDisplayName(displayName +
" " + NbBundle.getMessage(
this.getClass(),
"SearchRunner.doInBackGround.cancelMsg"));
486 progressGroup.finish();
487 return IngestSearchRunner.Searcher.this.cancel(
true);
493 ProgressContributor[] subProgresses =
new ProgressContributor[keywords.size()];
495 for (
Keyword keywordQuery : keywords) {
496 subProgresses[i] = AggregateProgressFactory.createProgressContributor(keywordQuery.getSearchTerm());
497 progressGroup.addContributor(subProgresses[i]);
501 progressGroup.start();
506 progressGroup.setDisplayName(displayName);
508 int keywordsSearched = 0;
510 for (
Keyword keyword : keywords) {
512 logger.log(Level.INFO,
"Cancel detected, bailing before new keyword processed: {0}", keyword.getSearchTerm());
516 final KeywordList keywordList = keywordToList.get(keyword);
520 if (keywordsSearched > 0) {
521 subProgresses[keywordsSearched - 1].finish();
524 KeywordSearchQuery keywordSearchQuery = KeywordSearchUtil.getQueryForKeyword(keyword, keywordList);
529 final KeywordQueryFilter dataSourceFilter =
new KeywordQueryFilter(KeywordQueryFilter.FilterType.DATA_SOURCE, job.
getDataSourceId());
530 keywordSearchQuery.addFilter(dataSourceFilter);
532 QueryResults queryResults;
536 queryResults = keywordSearchQuery.performQuery();
538 logger.log(Level.SEVERE,
"Error performing query: " + keyword.getSearchTerm(), ex);
544 }
catch (CancellationException e) {
545 logger.log(Level.INFO,
"Cancel detected, bailing during keyword query: {0}", keyword.getSearchTerm());
553 if (!newResults.getKeywords().isEmpty()) {
557 int totalUnits = newResults.getKeywords().size();
558 subProgresses[keywordsSearched].start(totalUnits);
559 int unitProgress = 0;
560 String queryDisplayStr = keyword.getSearchTerm();
561 if (queryDisplayStr.length() > 50) {
562 queryDisplayStr = queryDisplayStr.substring(0, 49) +
"...";
564 subProgresses[keywordsSearched].progress(keywordList.getName() +
": " + queryDisplayStr, unitProgress);
567 newResults.process(null, subProgresses[keywordsSearched],
this, keywordList.getIngestMessages(),
true);
572 subProgresses[keywordsSearched].progress(
"");
579 catch (Exception ex) {
580 logger.log(Level.WARNING,
"searcher exception occurred", ex);
585 logger.log(Level.INFO,
"Searcher took {0} secs to run (final = {1})",
new Object[]{stopWatch.getElapsedTimeSecs(), this.finalRun});
599 XmlKeywordSearchList loader = XmlKeywordSearchList.getCurrent();
602 keywordToList.clear();
603 keywordLists.clear();
605 for (String name : keywordListNames) {
607 keywordLists.add(list);
610 keywordToList.put(k, list);
621 SwingUtilities.invokeLater(
new Runnable() {
624 progressGroup.finish();
648 QueryResults newResults =
new QueryResults(queryResult.getQuery());
651 for (
Keyword keyword : queryResult.getKeywords()) {
654 List<KeywordHit> queryTermResults = queryResult.getResults(keyword);
658 Collections.sort(queryTermResults);
662 List<KeywordHit> newUniqueHits =
new ArrayList<>();
667 if (curTermResults == null) {
670 curTermResults =
new HashSet<>();
674 for (KeywordHit hit : queryTermResults) {
675 if (curTermResults.contains(hit.getSolrObjectId())) {
683 newUniqueHits.add(hit);
687 curTermResults.add(hit.getSolrObjectId());
696 newResults.addResult(keyword, newUniqueHits);
SearchJobInfo(IngestJobContext jobContext, List< String > keywordListNames)
long getElapsedTimeSecs()
List< String > keywordListNames
synchronized IngestSearchRunner.Searcher getCurrentSearcher()
Map< Keyword, KeywordList > keywordToList
static IngestMessage createErrorMessage(String source, String subject, String detailsHtml)
synchronized void addKeywordListName(String keywordListName)
AggregateProgressHandle progressGroup
void incrementModuleReferenceCount()
List< KeywordList > keywordLists
synchronized void setCurrentSearcher(IngestSearchRunner.Searcher searchRunner)
synchronized List< String > getKeywordListNames()
Logger getLogger(String moduleDisplayName)
long decrementModuleReferenceCount()
boolean isWorkerRunning()
IngestJobContext getJobContext()
void setWorkerRunning(boolean flag)
void waitForCurrentWorker()
List< String > keywordListNames
List< Keyword > getKeywords()
synchronized void addKeywordResults(Keyword k, Set< Long > resultsIDs)
AtomicLong moduleReferenceCount
final IngestJobContext jobContext
boolean fileIngestIsCancelled()
QueryResults filterResults(QueryResults queryResult)
final Object finalSearchLock
Map< Keyword, Set< Long > > currentResults
synchronized Set< Long > currentKeywordResults(Keyword k)
static void error(String title, String message)
void recalculateUpdateIntervalTime(long lastSerchTimeSec)
synchronized static Logger getLogger(String name)
IngestSearchRunner.Searcher currentSearcher
volatile boolean workerRunning