Autopsy  3.1
Graphical digital forensics platform for The Sleuth Kit and other tools.
Ingester.java
Go to the documentation of this file.
1 /*
2  * Autopsy Forensic Browser
3  *
4  * Copyright 2011 Basis Technology Corp.
5  * Contact: carrier <at> sleuthkit <dot> org
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  */
19 package org.sleuthkit.autopsy.keywordsearch;
20 
21 import java.io.ByteArrayInputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.Reader;
25 import java.io.UnsupportedEncodingException;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.logging.Level;
35 import org.apache.solr.client.solrj.SolrRequest.METHOD;
36 import org.apache.solr.client.solrj.SolrServerException;
37 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
38 import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
39 import org.apache.solr.common.SolrException;
40 import org.apache.solr.common.SolrException.ErrorCode;
41 import org.apache.solr.common.util.ContentStream;
42 import org.apache.solr.common.SolrInputDocument;
43 import org.openide.util.Exceptions;
44 import org.openide.util.NbBundle;
61 
65 class Ingester {
66 
67  private static final Logger logger = Logger.getLogger(Ingester.class.getName());
68  private volatile boolean uncommitedIngests = false;
69  private final ExecutorService upRequestExecutor = Executors.newSingleThreadExecutor();
70  private final Server solrServer = KeywordSearch.getServer();
71  private final GetContentFieldsV getContentFieldsV = new GetContentFieldsV();
72  private static Ingester instance;
73 
74  //for ingesting chunk as SolrInputDocument (non-content-streaming, by-pass tika)
75  //TODO use a streaming way to add content to /update handler
76  private static final int MAX_DOC_CHUNK_SIZE = 1024*1024;
77  private static final String docContentEncoding = "UTF-8"; //NON-NLS
78 
79 
80  private Ingester() {
81  }
82 
83  public static synchronized Ingester getDefault() {
84  if (instance == null) {
85  instance = new Ingester();
86  }
87  return instance;
88  }
89 
90  @Override
91  @SuppressWarnings("FinalizeDeclaration")
92  protected void finalize() throws Throwable {
93  super.finalize();
94 
95  // Warn if files might have been left uncommited.
96  if (uncommitedIngests) {
97  logger.warning("Ingester was used to add files that it never committed."); //NON-NLS
98  }
99  }
100 
109  void ingest(AbstractFileStringContentStream afscs) throws IngesterException {
110  Map<String, String> params = getContentFields(afscs.getSourceContent());
111  ingest(afscs, params, afscs.getSourceContent().getSize());
112  }
113 
125  void ingest(TextExtractor fe) throws IngesterException {
126  Map<String, String> params = getContentFields(fe.getSourceFile());
127 
128  params.put(Server.Schema.NUM_CHUNKS.toString(), Integer.toString(fe.getNumChunks()));
129 
130  ingest(new NullContentStream(fe.getSourceFile()), params, 0);
131  }
132 
144  void ingest(AbstractFileChunk fec, ByteContentStream bcs, int size) throws IngesterException {
145  AbstractContent sourceContent = bcs.getSourceContent();
146  Map<String, String> params = getContentFields(sourceContent);
147 
148  //overwrite id with the chunk id
149  params.put(Server.Schema.ID.toString(),
150  Server.getChunkIdString(sourceContent.getId(), fec.getChunkId()));
151 
152  ingest(bcs, params, size);
153  }
154 
167  void ingest(AbstractFile file, boolean ingestContent) throws IngesterException {
168  if (ingestContent == false || file.isDir()) {
169  ingest(new NullContentStream(file), getContentFields(file), 0);
170  } else {
171  ingest(new FscContentStream(file), getContentFields(file), file.getSize());
172  }
173  }
174 
181  private Map<String, String> getContentFields(AbstractContent fsc) {
182  return fsc.accept(getContentFieldsV);
183  }
184 
188  private class GetContentFieldsV extends ContentVisitor.Default<Map<String, String>> {
189 
190  @Override
191  protected Map<String, String> defaultVisit(Content cntnt) {
192  return new HashMap<String, String>();
193  }
194 
195  @Override
196  public Map<String, String> visit(File f) {
197  Map<String, String> params = getCommonFields(f);
198  getCommonFileContentFields(params, f);
199  return params;
200  }
201 
202  @Override
203  public Map<String, String> visit(DerivedFile df) {
204  Map<String, String> params = getCommonFields(df);
205  getCommonFileContentFields(params, df);
206  return params;
207  }
208 
209  @Override
210  public Map<String, String> visit(Directory d) {
211  Map<String, String> params = getCommonFields(d);
212  getCommonFileContentFields(params, d);
213  return params;
214  }
215 
216  @Override
217  public Map<String, String> visit(LayoutFile lf) {
218  // layout files do not have times
219  return getCommonFields(lf);
220  }
221 
222  @Override
223  public Map<String, String> visit(LocalFile lf) {
224  Map<String, String> params = getCommonFields(lf);
225  getCommonFileContentFields(params, lf);
226  return params;
227  }
228 
229  private Map<String, String> getCommonFileContentFields(Map<String, String> params, AbstractFile file) {
230  params.put(Server.Schema.CTIME.toString(), ContentUtils.getStringTimeISO8601(file.getCtime(), file));
231  params.put(Server.Schema.ATIME.toString(), ContentUtils.getStringTimeISO8601(file.getAtime(), file));
232  params.put(Server.Schema.MTIME.toString(), ContentUtils.getStringTimeISO8601(file.getMtime(), file));
233  params.put(Server.Schema.CRTIME.toString(), ContentUtils.getStringTimeISO8601(file.getCrtime(), file));
234  return params;
235  }
236 
237 
238  private Map<String, String> getCommonFields(AbstractFile af) {
239  Map<String, String> params = new HashMap<String, String>();
240  params.put(Server.Schema.ID.toString(), Long.toString(af.getId()));
241  long dataSourceId = -1;
242  try {
243  dataSourceId = af.getDataSource().getId();
244  params.put(Server.Schema.IMAGE_ID.toString(), Long.toString(dataSourceId));
245  } catch (TskCoreException ex) {
246  logger.log(Level.SEVERE, "Could not get data source id to properly index the file " + af.getId()); //NON-NLS
247  params.put(Server.Schema.IMAGE_ID.toString(), Long.toString(-1));
248  }
249 
250  params.put(Server.Schema.FILE_NAME.toString(), af.getName());
251  return params;
252  }
253  }
254 
255 
271  void ingest(ContentStream cs, Map<String, String> fields, final long size) throws IngesterException {
272 
273  if (fields.get(Server.Schema.IMAGE_ID.toString()) == null) {
274  //skip the file, image id unknown
275  String msg = NbBundle.getMessage(this.getClass(),
276  "Ingester.ingest.exception.unknownImgId.msg", cs.getName());
277  logger.log(Level.SEVERE, msg);
278  throw new IngesterException(msg);
279  }
280 
281  final byte[] docChunkContentBuf = new byte[MAX_DOC_CHUNK_SIZE];
282  SolrInputDocument updateDoc = new SolrInputDocument();
283 
284  for (String key : fields.keySet()) {
285  updateDoc.addField(key, fields.get(key));
286  }
287 
288  //using size here, but we are no longer ingesting entire files
289  //size is normally a chunk size, up to 1MB
290 
291  if (size > 0) {
292 
293  InputStream is = null;
294  int read = 0;
295  try {
296  is = cs.getStream();
297  read = is.read(docChunkContentBuf);
298  } catch (IOException ex) {
299  throw new IngesterException(
300  NbBundle.getMessage(this.getClass(), "Ingester.ingest.exception.cantReadStream.msg",
301  cs.getName()));
302  } finally {
303  try {
304  is.close();
305  } catch (IOException ex) {
306  logger.log(Level.WARNING, "Could not close input stream after reading content, " + cs.getName(), ex); //NON-NLS
307  }
308  }
309 
310  if (read != 0) {
311  String s = "";
312  try {
313  s = new String(docChunkContentBuf, 0, read, docContentEncoding);
314  } catch (UnsupportedEncodingException ex) {
315  Exceptions.printStackTrace(ex);
316  }
317  updateDoc.addField(Server.Schema.CONTENT.toString(), s);
318  } else {
319  updateDoc.addField(Server.Schema.CONTENT.toString(), "");
320  }
321  }
322  else {
323  //no content, such as case when 0th chunk indexed
324  updateDoc.addField(Server.Schema.CONTENT.toString(), "");
325  }
326 
327 
328  try {
329  //TODO consider timeout thread, or vary socket timeout based on size of indexed content
330  solrServer.addDocument(updateDoc);
331  uncommitedIngests = true;
332  } catch (KeywordSearchModuleException ex) {
333  throw new IngesterException(
334  NbBundle.getMessage(this.getClass(), "Ingester.ingest.exception.err.msg", cs.getName()), ex);
335  }
336 
337 
338  }
339 
352  private void ingestExtract(ContentStream cs, Map<String, String> fields, final long size) throws IngesterException {
353  final ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update/extract"); //NON-NLS
354  up.addContentStream(cs);
355  setFields(up, fields);
356  up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
357 
358  final String contentType = cs.getContentType();
359  if (contentType != null && !contentType.trim().equals("")) {
360  up.setParam("stream.contentType", contentType); //NON-NLS
361  }
362 
363  //logger.log(Level.INFO, "Ingesting " + fields.get("file_name"));
364  up.setParam("commit", "false"); //NON-NLS
365 
366  final Future<?> f = upRequestExecutor.submit(new UpRequestTask(up));
367 
368  try {
369  f.get(getTimeout(size), TimeUnit.SECONDS);
370  } catch (TimeoutException te) {
371  logger.log(Level.WARNING, "Solr timeout encountered, trying to restart Solr"); //NON-NLS
372  //restart may be needed to recover from some error conditions
373  hardSolrRestart();
374  throw new IngesterException(
375  NbBundle.getMessage(this.getClass(), "Ingester.ingestExtract.exception.solrTimeout.msg",
376  fields.get("id"), fields.get("file_name"))); //NON-NLS
377  } catch (Exception e) {
378  throw new IngesterException(
379  NbBundle.getMessage(this.getClass(), "Ingester.ingestExtract.exception.probPostToSolr.msg",
380  fields.get("id"), fields.get("file_name")), e); //NON-NLS
381  }
382  uncommitedIngests = true;
383  }
384 
388  private void hardSolrRestart() {
389  try {
390  solrServer.closeCore();
391  } catch (KeywordSearchModuleException ex) {
392  logger.log(Level.WARNING, "Cannot close core", ex); //NON-NLS
393  }
394 
395  solrServer.stop();
396 
397  try {
398  solrServer.start();
399  } catch (KeywordSearchModuleException ex) {
400  logger.log(Level.WARNING, "Cannot start", ex); //NON-NLS
401  } catch (SolrServerNoPortException ex) {
402  logger.log(Level.WARNING, "Cannot start server with this port", ex); //NON-NLS
403  }
404 
405  try {
406  solrServer.openCore();
407  } catch (KeywordSearchModuleException ex) {
408  logger.log(Level.WARNING, "Cannot open core", ex); //NON-NLS
409  }
410  }
411 
418  static int getTimeout(long size) {
419  if (size < 1024 * 1024L) //1MB
420  {
421  return 60;
422  } else if (size < 10 * 1024 * 1024L) //10MB
423  {
424  return 1200;
425  } else if (size < 100 * 1024 * 1024L) //100MB
426  {
427  return 3600;
428  } else {
429  return 3 * 3600;
430  }
431 
432  }
433 
434  private class UpRequestTask implements Runnable {
435 
436  ContentStreamUpdateRequest up;
437 
438  UpRequestTask(ContentStreamUpdateRequest up) {
439  this.up = up;
440  }
441 
442  @Override
443  public void run() {
444  try {
445  up.setMethod(METHOD.POST);
446  solrServer.request(up);
447  } catch (NoOpenCoreException ex) {
448  throw new RuntimeException(
449  NbBundle.getMessage(this.getClass(), "Ingester.UpReqestTask.run.exception.sorlNotAvail.msg"), ex);
450  } catch (IllegalStateException ex) {
451  // problems with content
452  throw new RuntimeException(
453  NbBundle.getMessage(this.getClass(), "Ingester.UpRequestTask.run.exception.probReadFile.msg"), ex);
454  } catch (SolrServerException ex) {
455  // If there's a problem talking to Solr, something is fundamentally
456  // wrong with ingest
457  throw new RuntimeException(
458  NbBundle.getMessage(this.getClass(), "Ingester.UpRequestTask.run.exception.solrProb.msg"), ex);
459  } catch (SolrException ex) {
460  // Tika problems result in an unchecked SolrException
461  ErrorCode ec = ErrorCode.getErrorCode(ex.code());
462 
463  // When Tika has problems with a document, it throws a server error
464  // but it's okay to continue with other documents
465  if (ec.equals(ErrorCode.SERVER_ERROR)) {
466  throw new RuntimeException(NbBundle.getMessage(this.getClass(),
467  "Ingester.UpRequestTask.run.exception.probPostToSolr.msg",
468  ec),
469  ex);
470  } else {
471  // shouldn't get any other error codes
472  throw ex;
473  }
474  }
475 
476  }
477  }
478 
483  void commit() {
484  try {
485  solrServer.commit();
486  uncommitedIngests = false;
487  } catch (NoOpenCoreException ex) {
488  logger.log(Level.WARNING, "Error commiting index", ex); //NON-NLS
489  } catch (SolrServerException ex) {
490  logger.log(Level.WARNING, "Error commiting index", ex); //NON-NLS
491  }
492  }
493 
500  private static void setFields(ContentStreamUpdateRequest up, Map<String, String> fields) {
501  for (Entry<String, String> field : fields.entrySet()) {
502  up.setParam("literal." + field.getKey(), field.getValue()); //NON-NLS
503  }
504  }
505 
509  private static class FscContentStream implements ContentStream {
510 
511  private AbstractFile f;
512 
514  this.f = f;
515  }
516 
517  @Override
518  public String getName() {
519  return f.getName();
520  }
521 
522  @Override
523  public String getSourceInfo() {
524  return NbBundle.getMessage(this.getClass(), "Ingester.FscContentStream.getSrcInfo", f.getId());
525  }
526 
527  @Override
528  public String getContentType() {
529  return null;
530  }
531 
532  @Override
533  public Long getSize() {
534  return f.getSize();
535  }
536 
537  @Override
538  public InputStream getStream() throws IOException {
539  return new ReadContentInputStream(f);
540  }
541 
542  @Override
543  public Reader getReader() throws IOException {
544  throw new UnsupportedOperationException(
545  NbBundle.getMessage(this.getClass(), "Ingester.FscContentStream.getReader"));
546  }
547  }
548 
552  private static class NullContentStream implements ContentStream {
553 
554  AbstractContent aContent;
555 
557  this.aContent = aContent;
558  }
559 
560  @Override
561  public String getName() {
562  return aContent.getName();
563  }
564 
565  @Override
566  public String getSourceInfo() {
567  return NbBundle.getMessage(this.getClass(), "Ingester.NullContentStream.getSrcInfo.text", aContent.getId());
568  }
569 
570  @Override
571  public String getContentType() {
572  return null;
573  }
574 
575  @Override
576  public Long getSize() {
577  return 0L;
578  }
579 
580  @Override
581  public InputStream getStream() throws IOException {
582  return new ByteArrayInputStream(new byte[0]);
583  }
584 
585  @Override
586  public Reader getReader() throws IOException {
587  throw new UnsupportedOperationException(
588  NbBundle.getMessage(this.getClass(), "Ingester.NullContentStream.getReader"));
589  }
590  }
591 
596  static class IngesterException extends Exception {
597 
598  IngesterException(String message, Throwable ex) {
599  super(message, ex);
600  }
601 
602  IngesterException(String message) {
603  super(message);
604  }
605  }
606 }
static String getStringTimeISO8601(long epochSeconds, TimeZone tzone)
Map< String, String > getCommonFields(AbstractFile af)
Definition: Ingester.java:238
Map< String, String > getCommonFileContentFields(Map< String, String > params, AbstractFile file)
Definition: Ingester.java:229

Copyright © 2012-2015 Basis Technology. Generated on: Mon Oct 19 2015
This work is licensed under a Creative Commons Attribution-Share Alike 3.0 United States License.