Digests
snoop.data.digests
#
Processing pipeline steps for a single document.
After all the files on disk, inside archives, emails and other containers are all ingested and de-duplicated
by the snoop.data.filesystem
set of tasks, they end up here in the launch()
Task. Inside this task we
decide what kinds of data and metadata extraction tasks we want to run for the document. We queue them all,
then we queue a gather()
task that combines their output, and finally we queue the index()
task to
upload the result into Elasticsearch.
This module also handles generating the different representations for File, Directory and Digest (de-duplicated document) rows in the database; these are used both in API response generation and when indexing data into Elasticsearch.
Attributes#
ES_MAX_INTEGER
#
The max integer field available in ES 6.8.
MAX_DOC_FILE_COUNT
#
The max number of file paths to be added to a de-duplicated document.
Functions#
bulk_index(batch)
#
Task used to send many documents to Elasticsearch.
End of the processing pipeline for any document.
Source code in snoop/data/digests.py
@snoop_task('digests.bulk_index', bulk=True, version=13, queue='digests')
def bulk_index(batch):
"""Task used to send many documents to Elasticsearch.
End of the processing pipeline for any document.
"""
# list of (task, body) tuples to send to ES as a single batch request
result = {}
documents_to_index = []
if not batch:
return result
task_query = (
models.Task.objects
.filter(pk__in=[t.pk for t in batch])
# Annotate important parameters. Since our only batch task is digests.index(),
# we only need to annotate the following:
# - dependency digests_gather --> status
# - digest object --> ID
# - digest tags --> count
# - digests_gather status (between success and broken)
.annotate(digest_id=Subquery(
models.Digest.objects
.filter(blob=OuterRef('blob_arg'))
.values('pk')[:1]
))
.annotate(digest_gather_status=Subquery(
models.TaskDependency.objects
.filter(next=OuterRef('pk'), name='digests_gather')
.values('prev__status')[:1]
))
)
batch = list(task_query.all())
if not batch:
return result
for task in batch:
# this is only needed to see if tags exist
blob = task.blob_arg
first_file = _get_first_file(blob)
if not first_file:
log.info("Skipping document with no file: %s", blob)
result[blob.pk] = False
continue
if task.digest_gather_status != models.Task.STATUS_SUCCESS:
# Generate stub object when gather task is broken (no results).
# This is needed to find results for which processing has failed.
digest = None
content = _get_document_content(None, first_file)
content.setdefault('broken', []).append('processing_failed')
else:
# Generate body from full result set
digest = models.Digest.objects.get(blob=blob)
content = _get_document_content(digest)
tags_exist = models.DocumentUserTag.objects.filter(digest=task.digest_id).exists()
if tags_exist:
# inject tags at indexing stage, so the private ones won't get spilled in
# the document/file endpoints
content.update(_get_tags(task.digest_id))
version = _get_document_version(digest)
body = dict(content, _hoover={'version': version})
# es 6.8 "integer" has max size 2^31-1
# and we managed to set "size" as an "integer" field
# instead of a long field
size = body.get('size', 0)
if size > ES_MAX_INTEGER:
body['size'] = ES_MAX_INTEGER
log.debug('Bulk Task %s uploading body with keys = %s', task, ", ".join(sorted(list(body.keys()))))
documents_to_index.append((task, body))
if not documents_to_index:
return result
rv = indexing.bulk_index([(task.blob_arg.pk, body) for task, body in documents_to_index])
for x in rv['items']:
blob = x['index']['_id']
ok = 200 <= x['index']['status'] < 300
result[blob] = ok
for task, body in documents_to_index:
tags_count = Count(models.DocumentUserTag.objects.filter(digest=task.digest_id))
if tags_count:
_set_tags_timestamps(task.digest_id, body)
return result
can_read_text(blob)
#
Check if document with blob can be read directly to extract text.
This returns True
even for application/octet-stream
, to attempting to extract text from files with
no mime type found. This sometimes happens for long files.
Source code in snoop/data/digests.py
def can_read_text(blob):
"""Check if document with blob can be read directly to extract text.
This returns `True` even for `application/octet-stream`, to attempting to extract text from files with
no mime type found. This sometimes happens for long files.
"""
EXTRA_TEXT_MIME_TYPES = {
"application/json",
"application/octet-stream",
"application/csv",
"application/tab-separated-values",
}
return blob.mime_type.startswith('text') or \
(blob.mime_type in EXTRA_TEXT_MIME_TYPES and blob.mime_encoding != 'binary')
detect_encoding(blob)
#
Try to detect the encoding for a given blob.
Will return the encoding, if it finds one with a confidence > 0.7. Otherwise it will return None.
Source code in snoop/data/digests.py
def detect_encoding(blob):
"""Try to detect the encoding for a given blob.
Will return the encoding, if it finds one with a confidence > 0.7.
Otherwise it will return None.
"""
with blob.open() as f:
first_4k = read_exactly(f, 4 * 2 ** 10)
detect_result = chardet.detect(first_4k)
confidence = detect_result.get('confidence', 0)
# the confidence limit is chosen arbitrarily. We found documents producing 0.79 so we lowered
# it from 0.8 to 0.7
if confidence < 0.7:
log.warning(f'low confidence when guessing character encoding: {confidence}')
return None
else:
return detect_result.get('encoding') or 'latin1'
directory_id(directory)
#
Returns an ID of the form _directory_$ID
to represent a snoop.data.models.Directory.
This ID is used to cross-link objects in the API.
Source code in snoop/data/digests.py
def directory_id(directory):
"""Returns an ID of the form `_directory_$ID` to represent a [snoop.data.models.Directory][].
This ID is used to cross-link objects in the API.
"""
return f'_directory_{directory.pk}'
file_id(file_query)
#
Returns an ID of the form _file_$ID
to represent a snoop.data.models.File.
This ID is used to cross-link objects in the API.
Source code in snoop/data/digests.py
def file_id(file_query):
"""Returns an ID of the form `_file_$ID` to represent a [snoop.data.models.File][].
This ID is used to cross-link objects in the API.
"""
return f'_file_{file_query.pk}'
full_path(file)
#
Returns full path of File or Directory, relative to collection root.
//
is used to mark files from inside containers (archives). This happens naturally when iterating
objects, since all container files will contain a single directory with name = ''
. See
snoop.data.models.Directory.container_file for more details.
Source code in snoop/data/digests.py
def full_path(file):
"""Returns full path of File or Directory, relative to collection root.
`//` is used to mark files from inside containers (archives). This happens naturally when iterating
objects, since all container files will contain a single directory with `name = ''`. See
[snoop.data.models.Directory.container_file][] for more details.
"""
node = file
elements = [file.name]
while node.parent:
node = node.parent
elements.append(node.name)
return '/'.join(reversed(elements))
gather(blob, **depends_on)
#
Combines and serializes the results of the various dependencies into a single snoop.data.models.Digest instance.
Source code in snoop/data/digests.py
@snoop_task('digests.gather', version=9, queue='digests')
def gather(blob, **depends_on):
"""Combines and serializes the results of the various dependencies into a single
[snoop.data.models.Digest][] instance.
"""
rv = {'broken': []}
try:
first_file = _get_first_file(blob)
first_file_extension = pathlib.Path(first_file.name).suffix.lower()
except Exception as e:
log.error('failed to get filename extension: %s', e)
first_file_extension = None
rv['skipped'] = not allow_processing_for_mime_type(blob.mime_type, first_file_extension)
try:
# Info Gathering section
if archives.is_table(blob):
rv['is-table'] = True
try:
with blob.mount_path() as path:
table_info = archives.get_table_info(
path, blob.mime_type, blob.mime_encoding)
except Exception as e:
log.warning('error retrieving table info: %s', str(e))
table_info = None
if table_info:
rv["table-sheets"] = table_info['sheets']
rv["table-sheet-count"] = len(table_info['sheets'])
if table_info['sheets']:
first_sheet = table_info['sheets'][0]
rv["table-columns"] = table_info['sheet-columns'][first_sheet]
rv["table-row-count"] = table_info['sheet-row-count'][first_sheet]
rv["table-col-count"] = table_info['sheet-col-count'][first_sheet]
# extract text and meta with apache tika
tika_rmeta_blob = depends_on.get('tika_rmeta')
if tika_rmeta_blob:
if isinstance(tika_rmeta_blob, SnoopTaskBroken):
rv['broken'].append(tika_rmeta_blob.reason)
log.warning("tika_rmeta task is broken; skipping")
else:
tika_rmeta = tika_rmeta_blob.read_json()
if tika_rmeta:
rv['text'] = tika_rmeta[0].get('X-TIKA:content', "")[:indexing.MAX_TEXT_FIELD_SIZE]
rv['date'] = tika.get_date_modified(tika_rmeta)
rv['date-created'] = tika.get_date_created(tika_rmeta)
rv.update(tika.convert_for_indexing(tika_rmeta))
else:
log.warning("tika task returned empty result!")
# parse email for text and headers
email_parse_blob = depends_on.get('email_parse')
if email_parse_blob:
if isinstance(email_parse_blob, SnoopTaskBroken):
rv['broken'].append(email_parse_blob.reason)
log.warning("email_parse task is broken; skipping")
else:
email_parse = email_parse_blob.read_json()
email_meta = email.email_meta(email_parse)
rv.update(email_meta)
# For large text/CSV files, Tika (and text extraction) fails. For these, we want to read the text
# directly from the file (limiting by indexing.MAX_TEXT_FIELD_SIZE) and ignore any files without a
# valid encoding.
if not rv.get('text') and can_read_text(blob):
rv['text'] = read_text(blob) or ''
# check if pdf-preview is available
rv['has-pdf-preview'] = False
pdf_preview = depends_on.get('get_pdf_preview')
if isinstance(pdf_preview, SnoopTaskBroken):
rv['broken'].append(pdf_preview.reason)
log.warning('get_pdf_preview task is broken; skipping')
elif isinstance(pdf_preview, models.Blob):
rv['has-pdf-preview'] = True
# combine OCR results, limiting string sizes to indexing.MAX_TEXT_FIELD_SIZE
ocr_results = dict(ocr.ocr_texts_for_blob(blob))
if ocr.can_process(blob) or rv['has-pdf-preview']:
for lang in current_collection().ocr_languages:
ocr_blob = depends_on.get(f'tesseract_{lang}')
if not ocr_blob or isinstance(ocr_blob, SnoopTaskBroken):
log.warning(f'tesseract ocr result missing for lang {lang}')
rv['broken'].append('ocr_missing')
ocr_results[f'tesseract_{lang}'] = ""
continue
log.info('found OCR blob with mime type: %s', ocr_blob.mime_type)
with ocr_blob.mount_path() as ocr_path:
if ocr_blob.mime_type == 'application/pdf':
ocr_results[f'tesseract_{lang}'] = subprocess.check_output(
f'pdftotext -q -enc UTF-8 {ocr_path} - | head -c {indexing.MAX_TEXT_FIELD_SIZE}',
shell=True,
).decode('utf8', errors='replace').strip()
else:
with open(ocr_path, 'rb') as f:
ocr_results[f'tesseract_{lang}'] = read_exactly(
f,
indexing.MAX_TEXT_FIELD_SIZE,
).decode('utf-8', errors='replace').strip()
log.info('loaded OCR text for lang %s with length %s', lang,
len(ocr_results[f'tesseract_{lang}']))
if ocr_results:
rv['ocr'] = any(len(x.strip()) > 0 for x in ocr_results.values())
if rv['ocr']:
if blob.mime_type == 'application/pdf' or rv['has-pdf-preview']:
rv['ocrpdf'] = True
else:
rv['ocrimage'] = True
rv['ocrtext'] = ocr_results
else:
log.warning('all OCR results were blank.')
# try and extract exif data
exif_data_blob = depends_on.get('exif_data')
if exif_data_blob:
if isinstance(exif_data_blob, SnoopTaskBroken):
rv['broken'].append(exif_data_blob.reason)
log.warning("exif_data task is broken; skipping")
else:
exif_data = exif_data_blob.read_json()
rv['location'] = exif_data.get('location')
rv['date-created'] = exif_data.get('date-created')
rv['has-thumbnails'] = False
thumbnails = depends_on.get('get_thumbnail')
if thumbnails:
if isinstance(thumbnails, SnoopTaskBroken):
rv['broken'].append(thumbnails.reason)
log.warning('get_thumbnail task is broken; skipping')
else:
rv['has-thumbnails'] = True
rv['detected-objects'] = []
detections = depends_on.get('detect_objects')
if detections:
if isinstance(detections, SnoopTaskBroken):
rv['broken'].append(detections.reason)
log.warning('object_detection task is broken; skipping')
else:
detected_objects = detections.read_json()
rv['detected-objects'] = detected_objects
rv['image-classes'] = []
predictions = depends_on.get('classify_image')
if predictions:
if isinstance(predictions, SnoopTaskBroken):
rv['broken'].append(predictions.reason)
log.warning('image_classification task is broken; skipping')
else:
image_classes = predictions.read_json()
rv['image-classes'] = image_classes
except Exception as e:
log.error('gather() failed while getting data: %s', str(e))
rv['broken'].append('gather_failed')
_delete_empty_keys(rv)
result_blob = models.Blob.create_json(rv)
_, _ = models.Digest.objects.update_or_create(
blob=blob,
defaults=dict(
result=result_blob,
),
)
return result_blob
get_directory_children(directory, page_index = 1)
#
Returns a list with the page of children for a given directory.
This list combines both File children and Directory children into a single view. The first pages are of Directories, and the following pages contain only Files. There's a page in the middle that contains both some Directories and some Files.
See snoop.data.digests.parent_children_page for computing the page number for an item inside this list.
Source code in snoop/data/digests.py
def get_directory_children(directory, page_index=1):
"""Returns a list with the page of children for a given directory.
This list combines both File children and Directory children into a single view. The first pages are of
Directories, and the following pages contain only Files. There's a page in the middle that contains both
some Directories and some Files.
See [snoop.data.digests.parent_children_page][] for computing the page number for an item inside this
list.
"""
def get_list(p1, p1f, p2, p2f, idx):
if idx < p1.num_pages:
return [p1f(x) for x in p1.page(idx).object_list]
# last page of dirs continues with first page of files
if idx == p1.num_pages:
return ([p1f(x) for x in p1.page(idx).object_list]
+ [p2f(x) for x in p2.page(1).object_list])
# skip the 1 page we added above
idx -= p1.num_pages - 1
if idx <= p2.num_pages:
return [p2f(x) for x in p2.page(idx).object_list]
return []
def has_next(p1, p2, page_index):
return page_index < (p1.num_pages + p2.num_pages - 1)
limit = settings.SNOOP_DOCUMENT_CHILD_QUERY_LIMIT
child_directory_queryset = directory.child_directory_set.order_by('name_bytes')
child_file_queryset = directory.child_file_set.order_by('name_bytes')
p1 = Paginator(child_directory_queryset, limit)
p1f = child_dir_to_dict
p2 = Paginator(child_file_queryset, limit)
p2f = child_file_to_dict
pages = p1.num_pages + p2.num_pages - 1
assert page_index > 0
assert page_index <= pages
total = child_directory_queryset.count() + child_file_queryset.count()
return get_list(p1, p1f, p2, p2f, page_index), \
has_next(p1, p2, page_index), total, pages
get_directory_data(directory, children_page = 1)
#
Returns dict with representation of a snoop.data.models.Directory.
Source code in snoop/data/digests.py
def get_directory_data(directory, children_page=1):
"""Returns dict with representation of a [snoop.data.models.Directory][]."""
children, has_next, total, pages = get_directory_children(directory, children_page)
return {
'id': directory_id(directory),
'parent_id': parent_id(directory),
'content': {
'content-type': 'application/x-directory',
'filetype': 'folder',
'filename': directory.name,
'path': full_path(directory),
},
'children': children,
'children_page': children_page,
'children_has_next_page': has_next,
'children_count': total,
'children_page_count': pages,
'parent_children_page': parent_children_page(directory),
}
get_document_data(blob, children_page = 1)
#
Returns dict with representation of de-duplicated document (snoop.data.models.Digest).
Source code in snoop/data/digests.py
def get_document_data(blob, children_page=1):
"""Returns dict with representation of de-duplicated document ([snoop.data.models.Digest][])."""
first_file = _get_first_file(blob)
children = None
has_next = False
total = 0
pages = 0
child_directory = first_file.child_directory_set.first()
if child_directory:
children, has_next, total, pages = get_directory_children(child_directory, children_page)
try:
digest = models.Digest.objects.get(blob=blob)
except models.Digest.DoesNotExist:
digest = None
rv = {
'id': blob.pk,
'parent_id': parent_id(first_file),
'has_locations': True,
'version': _get_document_version(digest),
'content': _get_document_content(digest, None),
'children': children,
'children_page': children_page,
'children_has_next_page': has_next,
'children_count': total,
'children_page_count': pages,
'parent_children_page': parent_children_page(first_file),
}
return rv
get_document_locations(digest, page_index)
#
Returns list of dicts representing a page of locations for a given document.
Source code in snoop/data/digests.py
def get_document_locations(digest, page_index):
"""Returns list of dicts representing a page of locations for a given document."""
def location(file):
parent_path = full_path(file.parent_directory.container_file or file.parent_directory)
return {
'id': file_id(file),
'filename': file.name,
'parent_id': parent_id(file),
'parent_path': parent_path,
}
queryset = digest.blob.file_set.order_by('pk')
paginator = Paginator(queryset, settings.SNOOP_DOCUMENT_LOCATIONS_QUERY_LIMIT)
page = paginator.page(page_index)
return [location(file) for file in page.object_list], page.has_next()
get_file_data(file, children_page = 1)
#
Returns dict with representation of a snoop.data.models.File.
Source code in snoop/data/digests.py
def get_file_data(file, children_page=1):
"""Returns dict with representation of a [snoop.data.models.File][]."""
children = None
has_next = False
total = 0
pages = 0
child_directory = file.child_directory_set.first()
if child_directory:
children, has_next, total, pages = get_directory_children(child_directory, children_page)
blob = file.blob
digest = None
version = None
content = None
try:
digest = blob.digest
version = _get_document_version(digest)
content = _get_document_content(digest, file)
except models.Blob.digest.RelatedObjectDoesNotExist:
version = _get_document_version(file)
content = _get_document_content(None, file)
rv = {
'id': file_id(file),
'digest': blob.pk,
'parent_id': parent_id(file),
'has_locations': True,
'version': version,
'content': content,
'children': children,
'children_page': children_page,
'children_has_next_page': has_next,
'children_count': total,
'children_page_count': pages,
'parent_children_page': parent_children_page(file),
}
return rv
get_filetype(mime_type)
#
Returns a Hoover-specific "file type" derived from the libmagic
mime type.
See snoop.data._file_types.FILE_TYPES for extended list. Extra patterns like audio/* -> audio
are
applied in this file.
Source code in snoop/data/digests.py
def get_filetype(mime_type):
"""Returns a Hoover-specific "file type" derived from the `libmagic` mime type.
See [snoop.data._file_types.FILE_TYPES][] for extended list. Extra patterns like `audio/* -> audio` are
applied in this file.
"""
if mime_type in FILE_TYPES:
return FILE_TYPES[mime_type]
supertype = mime_type.split('/')[0]
if supertype in ['audio', 'video', 'image']:
return supertype
return None
index(blob, **depends_on)
#
Task used to call the entity extraction for a document.
Calls entity extraction and/or language detection for a document. If there are no text sources in the document or entity extraction is disabled it will just return the blob and do nothing. This task will create a new task that it depends on, which will call the nlp service and save it's results.
Source code in snoop/data/digests.py
@snoop_task('digests.index', version=15, queue='digests')
def index(blob, **depends_on):
"""Task used to call the entity extraction for a document.
Calls entity extraction and/or language detection for a document.
If there are no text sources in the document or entity extraction is disabled
it will just return the blob and do nothing.
This task will create a new task that it depends on, which will call the nlp service and
save it's results.
"""
if not current_collection().nlp_entity_extraction_enabled \
and not current_collection().nlp_language_detection_enabled \
and not current_collection().translation_enabled:
log.warning('digests.index: All settings disabled. Exiting')
return None
if isinstance(depends_on.get('digests_gather'), SnoopTaskBroken):
log.error('gather busted: %s', depends_on.get('digests_gather'))
return None
digest = models.Digest.objects.get(blob=blob)
digest_data = digest.result.read_json()
if not digest_data.get('text') and not digest_data.get('ocrtext'):
log.warning('blob %s.. type %s: No text data. Exiting',
str(blob.pk)[:6], blob.content_type)
return None
result = {}
# detect language if any other settings are on
lang_result = None
if current_collection().nlp_language_detection_enabled \
or current_collection().translation_enabled \
or current_collection().nlp_entity_extraction_enabled:
log.warning('blob %s.. type %s: running language detect...',
str(blob.pk)[:6], blob.content_type)
lang_result = require_dependency(
'detect_language',
depends_on,
lambda: entities.detect_language.laterz(blob),
return_error=True,
)
if not lang_result or isinstance(lang_result, SnoopTaskBroken):
log.warning('detect_language failed!')
lang_result = None
else:
result.update(lang_result.read_json())
log.info('running language detect -- OK, lang = %s', result.get('lang'))
# translate
translation_result = None
if (current_collection().translation_enabled
and entities.can_translate(result.get('lang'))
and result.get('lang') not in settings.TRANSLATION_TARGET_LANGUAGES):
log.warning('blob %s.. type %s: running translation...',
str(blob.pk)[:6], blob.content_type)
translation_result = require_dependency(
'translate',
depends_on,
lambda: entities.translate.laterz(blob, result.get('lang')),
return_error=True,
)
if not translation_result or isinstance(translation_result, SnoopTaskBroken):
log.warning('translation failed!')
translation_result = None
else:
log.info('running translation -- OK')
result.update(translation_result.read_json())
# extract entities
entity_service_results = None
if current_collection().nlp_entity_extraction_enabled \
and entities.can_extract_entities(result.get('lang')):
if translation_result:
depends_on['translation_result_pk'] = translation_result
log.info('running entity extraction...')
entity_service_results = require_dependency(
'get_entity_results',
depends_on,
lambda: entities.get_entity_results.laterz(
blob,
result.get('lang'),
translation_result.pk if translation_result else None,
),
return_error=True,
)
if not entity_service_results or \
isinstance(entity_service_results, SnoopTaskBroken):
log.warning('get_entity_results dependency is BROKEN')
else:
processed_results = entities.process_results(
digest,
entity_service_results.read_json(),
)
if processed_results:
result.update(processed_results)
digest.extra_result = models.Blob.create_json(result)
digest.save()
return digest.extra_result
launch(blob)
#
Task to build and dispatch the different processing tasks for this de-duplicated document.
Runs snoop.data.analyzers.email.parse on emails, snoop.data.ocr.run_tesseract on OCR-able
documents, and snoop.data.analyzers.tika.rmeta on compatible documents. Schedules one
snoop.data.digests.gather Task depending on all of the above to recombine all the results, and
another snoop.data.digests.index Task depending on the gather
task.
Source code in snoop/data/digests.py
@snoop_task('digests.launch', version=12, queue='digests')
def launch(blob):
"""Task to build and dispatch the different processing tasks for this de-duplicated document.
Runs [snoop.data.analyzers.email.parse][] on emails, [snoop.data.ocr.run_tesseract][] on OCR-able
documents, and [snoop.data.analyzers.tika.rmeta][] on compatible documents. Schedules one
[snoop.data.digests.gather][] Task depending on all of the above to recombine all the results, and
another [snoop.data.digests.index][] Task depending on the `gather` task.
"""
depends_on = {}
try:
first_file = _get_first_file(blob)
first_file_extension = pathlib.Path(first_file.name).suffix.lower()
except Exception as e:
log.error('failed to get filename extension: %s', e)
first_file_extension = None
if allow_processing_for_mime_type(blob.mime_type, first_file_extension):
if blob.mime_type == 'message/rfc822':
depends_on['email_parse'] = email.parse.laterz(blob)
if tika.can_process(blob):
depends_on['tika_rmeta'] = tika.rmeta.laterz(blob)
if exif.can_extract(blob):
depends_on['exif_data'] = exif.extract.laterz(blob)
if current_collection().pdf_preview_enabled and pdf_preview.can_create(blob):
depends_on['get_pdf_preview'] = pdf_preview.get_pdf.laterz(blob)
# either process OCR on the original, or on a PDF conversion
if ocr.can_process(blob):
for lang in current_collection().ocr_languages:
log.info('dispatching direct OCR in language %s', lang)
depends_on[f'tesseract_{lang}'] = ocr.run_tesseract.laterz(blob, lang)
elif depends_on.get('get_pdf_preview'):
for lang in current_collection().ocr_languages:
log.info('dispatching OCR for PDF preview language %s', lang)
depends_on[f'tesseract_{lang}'] = ocr.run_tesseract.laterz(
blob, lang,
depends_on={'target_pdf': depends_on['get_pdf_preview']},
)
if current_collection().thumbnail_generator_enabled and thumbnails.can_create(blob):
if depends_on.get('get_pdf_preview'):
# if we just launched a pdf preview, add it to the deps
depends_on['get_thumbnail'] = thumbnails.get_thumbnail.laterz(
blob,
depends_on={'pdf_preview': depends_on.get('get_pdf_preview')},
)
elif thumbnails.can_create(blob):
depends_on['get_thumbnail'] = thumbnails.get_thumbnail.laterz(blob)
if current_collection().image_classification_object_detection_enabled \
and image_classification.can_detect(blob):
depends_on['detect_objects'] = (image_classification.detect_objects.laterz(blob))
if current_collection().image_classification_classify_images_enabled \
and image_classification.can_detect(blob):
depends_on['classify_image'] = (image_classification.classify_image.laterz(blob))
gather_task = gather.laterz(blob, depends_on=depends_on, retry=True, delete_extra_deps=True)
index_task = index.laterz(blob, depends_on={'digests_gather': gather_task}, retry=True, queue_now=False)
bulk_index.laterz(blob, depends_on={'digests_index': index_task, 'digests_gather': gather_task},
retry=True, queue_now=False)
parent_children_page(item)
#
Return the page number on the parent that points to the page containing this item.
All items have a children
list in their doc API. That list is paginated by page number.
When fetching an item from the middle of the tree, we need to populate the list of siblings. Since the
view is paginated, all parent objects must select the correct page in order find the item
in its parent's child list.
See snoop.data.digests.get_directory_children on how this list is created.
Source code in snoop/data/digests.py
def parent_children_page(item):
"""Return the page number on the parent that points to the page containing this item.
All items have a `children` list in their doc API. That list is paginated by page number.
When fetching an item from the middle of the tree, we need to populate the list of siblings. Since the
view is paginated, all parent objects must select the correct page in order find the item
in its parent's child list.
See [snoop.data.digests.get_directory_children][] on how this list is created.
"""
# don't use _get_parent --> don't skip parents when for polling children
parent = item.parent
if isinstance(item, models.Directory) and item.container_file: # dummy archive directory
return 1
if not parent: # root document, no parent
return 1
assert isinstance(parent, models.Directory)
page_index = 1
if isinstance(item, models.File):
children = parent.child_file_set
dir_paginator = Paginator(parent.child_directory_set,
settings.SNOOP_DOCUMENT_LOCATIONS_QUERY_LIMIT)
dir_pages = dir_paginator.num_pages
page_index += dir_pages
# last page of dirs also contains first page of files
if dir_pages:
page_index -= 1
if isinstance(item, models.Directory):
children = parent.child_directory_set
children_before_item = children.filter(name_bytes__lt=item.name_bytes).count()
page_index += int(children_before_item / settings.SNOOP_DOCUMENT_CHILD_QUERY_LIMIT)
return page_index
parent_id(item)
#
Returns the ID of the parent entity, for linking in the API.
Source code in snoop/data/digests.py
def parent_id(item):
"""Returns the ID of the parent entity, for linking in the API."""
parent = _get_parent(item)
if isinstance(parent, models.File):
return file_id(parent)
if isinstance(parent, models.Directory):
return directory_id(parent)
return None
path_parts(path)
#
Returns a list of all the prefixes for this document's snoop.data.digests.full_path.
This is set on the Digest as field path-parts
to create path buckets in Elasticsearch.
Source code in snoop/data/digests.py
def path_parts(path):
"""Returns a list of all the prefixes for this document's [snoop.data.digests.full_path][].
This is set on the Digest as field `path-parts` to create path buckets in Elasticsearch.
"""
elements = path.split('/')[1:]
result = []
prev = None
for e in elements:
if prev:
prev = prev + '/' + e
else:
prev = '/' + e
result.append(prev)
return result
read_text(blob)
#
Attempt to read text from raw text file.
This function returns a single string, truncated to the indexing.MAX_TEXT_FIELD_SIZE
constant.
If provided a file of type "application/octet-stream" (mime type unknown), we attempt to guess encoding using "chardet" and abort if we don't see 70% confidence.
If the first attempt to decode the file fails because of an unknown encoding, we also try to guess the right encoding.
Source code in snoop/data/digests.py
def read_text(blob):
"""Attempt to read text from raw text file.
This function returns a single string, truncated to the `indexing.MAX_TEXT_FIELD_SIZE` constant.
If provided a file of type "application/octet-stream" (mime type unknown), we attempt to guess encoding
using "chardet" and abort if we don't see 70% confidence.
If the first attempt to decode the file fails because of an unknown encoding, we also try to
guess the right encoding.
"""
if blob.mime_type == 'application/octet-stream' or blob.mime_encoding == 'binary':
encoding = detect_encoding(blob)
else:
encoding = blob.mime_encoding
if not encoding:
return
try:
with blob.open() as f:
return read_exactly(f, indexing.MAX_TEXT_FIELD_SIZE).decode(encoding, errors='replace')
# in case the initial mime type detected for the blob is unknown to python (e.g. unknown-8bit)
except LookupError:
encoding = detect_encoding(blob)
if not encoding:
return
with blob.open() as f:
return read_exactly(f, indexing.MAX_TEXT_FIELD_SIZE).decode(encoding, errors='replace')
retry_index(blob)
#
Retry the snoop.data.digests.bulk_index Task for the given Blob.
Needed by the web process when some user changes the Document tags; this function will be called for the affected document.
Source code in snoop/data/digests.py
def retry_index(blob):
"""Retry the [snoop.data.digests.bulk_index][] Task for the given Blob.
Needed by the web process when some user changes the Document tags; this function will be called for the
affected document."""
func = 'digests.bulk_index'
try:
task = models.Task.objects.filter(func=func, blob_arg=blob).get()
if task.status == models.Task.STATUS_PENDING:
bulk_index([task])
return
retry_task(task)
bulk_index([task])
except Exception as e:
log.exception(e)
log.error('failed to retry index for blob %s', blob.pk)
update_all_tags()
#
Re-runs the index task for all tags that have not been indexed.
Only works on tasks that are not already PENDING.
Requires a collection to be selected.
Source code in snoop/data/digests.py
def update_all_tags():
"""Re-runs the index task for all tags that have not been indexed.
Only works on tasks that are not already PENDING.
Requires a collection to be selected.
"""
tags = models.DocumentUserTag.objects.filter(
date_indexed__isnull=True,
)
digests = tags.distinct('digest').values('digest__blob')
tasks = models.Task.objects.filter(
func='digests.bulk_index',
blob_arg__pk__in=Subquery(digests),
)
tasks = tasks.exclude(status=models.Task.STATUS_PENDING)
retry_tasks(tasks)