Tika
snoop.data.analyzers.tika
#
Tasks to run Apache Tika on documents to extract text and metadata.
Module defines helper methods to work with the Tika HTTP server, as well as converting their output format into our set of fields.
The corrected mime type is sent along with the file, since if we don't, Tika will usually fail while running "file" and not being able to use the result.
We keep a hard-coded list of what mime types to send to Tika. We should probably send (almost) everything and let them surprise us instead.
Attributes#
TIKA_EXPECT_FAIL_ABOVE_FILE_SIZE
#
Turn unexpected failures into permanent ones for arguments above this size.
Tika may run out of memory or otherwise fail on very large files, causing the wrong type of error.
TIKA_MIN_SPEED_BPS
#
Minimum reference speed for this task. Saved as 10% of the Average Success
Speed in the Admin UI. The timeout is calculated using this value, the request
file size, and the previous TIMEOUT_BASE
constant.
TIKA_TIMEOUT_BASE
#
Minimum number of seconds to wait for this service.
TIKA_TIMEOUT_MAX
#
Maximum number of seconds to wait for this service. For tika we set 24h.
Functions#
call_tika_server(endpoint, data, content_type, data_size)
#
Executes HTTP PUT request to Tika server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
endpoint |
|
the endpoint to be appended to snoop.defaultsettings.SNOOP_TIKA_URL. |
required |
data |
|
the request object to be added to the PUT request |
required |
content_type |
|
content type detected by our libmagic implementation. If not supplied, Tika will run
its own |
required |
Source code in snoop/data/analyzers/tika.py
@tracer.wrap_function()
def call_tika_server(endpoint, data, content_type, data_size):
"""Executes HTTP PUT request to Tika server.
Args:
endpoint: the endpoint to be appended to [snoop.defaultsettings.SNOOP_TIKA_URL][].
data: the request object to be added to the PUT request
content_type: content type detected by our libmagic implementation. If not supplied, Tika will run
its own `libmagic` on it, and if that fails it will stop processing the request.
"""
timeout = min(TIKA_TIMEOUT_MAX,
int(TIKA_TIMEOUT_BASE + data_size / TIKA_MIN_SPEED_BPS))
session = requests.Session()
url = urljoin(settings.SNOOP_TIKA_URL, endpoint)
resp = session.put(url, data=data, headers={'Content-Type': content_type}, timeout=timeout)
if resp.status_code == 422:
raise SnoopTaskBroken("tika returned http 422, corrupt", "tika_http_422")
if resp.status_code == 415:
raise SnoopTaskBroken("tika returned http 415, unsupported media type", "tika_http_415")
# When running OOM, Tika returns 404 (from load balancer after crash), 500, 502 and any other
# combination of status codes. We mark this as Broken instead of a normal failure to continue normal
# processing in case of Tika OOM.
if 400 <= resp.status_code < 600 and data_size > TIKA_EXPECT_FAIL_ABOVE_FILE_SIZE:
raise SnoopTaskBroken(
f"tika returned http {resp.status_code} while running on large file",
"tika_error_on_large_file",
)
if (resp.status_code != 200
or resp.headers['Content-Type'] != 'application/json'):
raise SnoopTaskBroken(
f"tika returned unexpected response http {resp.status_code}",
"tika_http_" + str(resp.status_code),
)
return resp
can_process(blob)
#
Checks if Tika can process this blob's mime type.
Source code in snoop/data/analyzers/tika.py
def can_process(blob):
"""Checks if Tika can process this blob's mime type."""
if blob.mime_type in ALL_TIKA_MIME_TYPES:
return True
return False
convert_for_indexing(rmeta_obj)
#
Convert the dict returned by Tika's rmeta
endpoint into a list of K: V
strings that we can
directly index into Elasticsearch. Also returns a list of keys present, to be indexed as keywords.
Tika returns over 500 different fields for our test data, and the ES maximum field count is 1000. So we folde them all into one single field.
Because Elasticsearch 6 requires all values in a field to be of a same type, we must convert all the
dict values to a single type (in our case, string). We replace the main text
fields if they exist with
None (keys called
X-TIKA:content`). We also truncate all values to 4K chars to avoid any other
duplication with main text fields and keep this of a lower size.
Source code in snoop/data/analyzers/tika.py
def convert_for_indexing(rmeta_obj):
"""Convert the dict returned by Tika's `rmeta` endpoint into a list of `K: V` strings that we can
directly index into Elasticsearch. Also returns a list of keys present, to be indexed as keywords.
Tika returns over 500 different fields for our test data, and the ES maximum field count is 1000.
So we folde them all into one single field.
Because Elasticsearch 6 requires all values in a field to be of a same type, we must convert all the
dict values to a single type (in our case, string). We replace the main `text`fields if they exist with
`None (keys called `X-TIKA:content`). We also truncate all values to 4K chars to avoid any other
duplication with main text fields and keep this of a lower size.
"""
REMOVE_KEYS = {'X-TIKA:content', 'Message:Raw-Header'}
TRUNCATE_LIMIT = 2 ** 12
def iterate_obj(obj, path=""):
if isinstance(obj, list):
for x in obj:
yield from iterate_obj(x, path)
elif isinstance(obj, dict):
for x in obj:
yield from iterate_obj(obj[x], path + '.' + x)
else:
# skip first . in path
path = path[1:]
# remove keys for text and email headers (as they're handled separately)
if not any(path.startswith(x) for x in REMOVE_KEYS):
yield path, str(obj)[:TRUNCATE_LIMIT].strip()
# skip first item
rmeta_obj = rmeta_obj[0]
return {'tika': [path + ': ' + value for path, value in iterate_obj(rmeta_obj)],
'tika-key': list(set(path for path, _ in iterate_obj(rmeta_obj)))}
get_date_created(rmeta)
#
Extract date created from returned Tika metadata.
The date can show up under different keys (depending on mime type and internal Tika analyzer), so we have to try them all and return the first hit.
Source code in snoop/data/analyzers/tika.py
def get_date_created(rmeta):
"""Extract date created from returned Tika metadata.
The date can show up under different keys (depending on mime type and internal Tika analyzer), so we
have to try them all and return the first hit.
"""
FIELDS_CREATED = ['Creation-Date', 'dcterms:created', 'meta:created',
'created']
for field in FIELDS_CREATED:
value = rmeta[0].get(field)
if value:
return zulu(parser.parse(value))
get_date_modified(rmeta)
#
Extract date modified from returned Tika metadata.
The date can show up under different keys (depending on mime type and internal Tika analyzer), so we have to try them all and return the first hit.
Source code in snoop/data/analyzers/tika.py
def get_date_modified(rmeta):
"""Extract date modified from returned Tika metadata.
The date can show up under different keys (depending on mime type and internal Tika analyzer), so we
have to try them all and return the first hit.
"""
FIELDS_MODIFIED = ['Last-Modified', 'Last-Saved-Date', 'dcterms:modified',
'meta:modified', 'created']
for field in FIELDS_MODIFIED:
value = rmeta[0].get(field)
if value:
return zulu(parser.parse(value))
rmeta(blob)
#
Task to run Tika on a given Blob.
Source code in snoop/data/analyzers/tika.py
@snoop_task('tika.rmeta', queue='tika', version=3)
@returns_json_blob
def rmeta(blob):
"""Task to run Tika on a given Blob."""
with blob.open(need_fileno=True) as f:
resp = call_tika_server('rmeta/text', f, blob.content_type, blob.size)
return resp.json()