Filesystem
snoop.data.filesystem
#
Definitions for filesystem-related Tasks.
These are the first steps in processing any dataset: walking the filesystem and recording files and directories in their respective database tables.
The root directory for a dataset is explored by the snoop.data.filesystem.walk Task. This recursively schedules walk[] on the directories inside it, and then schedules snoop.data.filesystem.handle_file on all the files inside it.
Files discovered may actually be Archives, Emails or other containers that contain directories too, but walking these is treated differently under snoop.data.analyzers.archives, snoop.data.analyzers.email and others.
Functions#
create_archive_files(file_pk, archive_listing)
#
Creates the File and Directoty structure after unpacking files.
Receives a dict (called the "archive_listing") from the
unarchive
Task with the names of the Files and Directories that must be created, as well as the File timestmaps
and binary data hashes.
This function serves half the role of walk()
, but inside archives; it
queues handle_file()
for all files unpacked. It assumes the
Blob
objects for the files inside have already been created.
Source code in snoop/data/filesystem.py
@snoop_task('filesystem.create_archive_files')
def create_archive_files(file_pk, archive_listing):
"""Creates the File and Directoty structure after unpacking files.
Receives a dict (called the "archive_listing") from the
[`unarchive`][snoop.data.analyzers.archives.unarchive]
Task with the names of the Files and Directories that must be created, as well as the File timestmaps
and binary data hashes.
This function serves half the role of [`walk()`][snoop.data.filesystem.walk], but inside archives; it
queues [`handle_file()`][snoop.data.filesystem.handle_file] for all files unpacked. It assumes the
`Blob` objects for the files inside have already been created.
"""
if isinstance(archive_listing, SnoopTaskBroken):
log.warning("Unarchive task is broken; returning without doing anything")
return
with archive_listing.open() as f:
archive_listing_data = json.load(f)
if not archive_listing_data:
log.warning("Unarchive data is empty; returning...")
return
def create_directory_children(directory, children):
for i, item in enumerate(children):
queue_limit = i >= settings.CHILD_QUEUE_LIMIT
if item['type'] == 'file':
child_original = models.Blob.objects.get(pk=item['blob_pk'])
file = create_file(directory, item['name'], child_original)
handle_file.laterz(file.pk, queue_now=not queue_limit)
if item['type'] == 'directory':
create_directory(directory, item['name'], item['children'])
def create_directory(parent_directory, name, children):
(directory, _) = parent_directory.child_directory_set.get_or_create(
name_bytes=name.encode('utf8', errors='surrogateescape'),
)
create_directory_children(directory, children)
def create_file(parent_directory, name, original):
size = original.size
file, _ = parent_directory.child_file_set.get_or_create(
name_bytes=name.encode('utf8', errors='surrogateescape'),
defaults=dict(
ctime=archive.ctime,
mtime=archive.mtime,
size=size,
original=original,
blob=original,
),
)
return file
archive = models.File.objects.get(pk=file_pk)
(fake_root, _) = archive.child_directory_set.get_or_create(name_bytes=b'')
create_directory_children(fake_root, archive_listing_data)
create_attachment_files(file_pk, email_parse)
#
Creates the File and Directoty structure after unpacking email attachments.
Receives a dict from the email.parse()
task with the names and
bodies of the attachments.
This function serves the role of walk()
, but inside emails; it queues handle_file()
for all
files unpacked.
Source code in snoop/data/filesystem.py
@snoop_task('filesystem.create_attachment_files')
def create_attachment_files(file_pk, email_parse):
"""Creates the File and Directoty structure after unpacking email attachments.
Receives a dict from the [`email.parse()`][snoop.data.analyzers.email.parse] task with the names and
bodies of the attachments.
This function serves the role of `walk()`, but inside emails; it queues `handle_file()` for all
files unpacked.
"""
attachments = list(get_email_attachments(email_parse))
if attachments:
email_file = models.File.objects.get(pk=file_pk)
(attachments_dir, _) = email_file.child_directory_set.get_or_create(
name_bytes=b'',
)
for attachment in attachments:
original = models.Blob.objects.get(pk=attachment['blob_pk'])
size = original.size
name_bytes = (
attachment['name']
.encode('utf8', errors='surrogateescape')
)
(file, _) = attachments_dir.child_file_set.get_or_create(
name_bytes=name_bytes,
defaults=dict(
ctime=email_file.ctime,
mtime=email_file.mtime,
size=size,
original=original,
blob=original,
),
)
handle_file.laterz(file.pk)
directory_absolute_path(root_data_path, directory)
#
Returns absolute Path for a dataset snoop.data.models.Directory.
Directory supplied must be present on the filesystem.
Warning
This is expected to return an invalid Path without failing if supplied with a Directory that is not found directly on the filesystem, but inside some archive or email.
Todo
Throw exception if directory isn't directly on filesystem.
Source code in snoop/data/filesystem.py
def directory_absolute_path(root_data_path, directory):
"""Returns absolute Path for a dataset [snoop.data.models.Directory][].
Directory supplied must be present on the filesystem.
Warning:
This is expected to return an invalid Path without failing if supplied with a Directory that is not
found directly on the filesystem, but inside some archive or email.
TODO:
Throw exception if directory isn't directly on filesystem.
"""
path_elements = []
node = directory
path = pathlib.Path(root_data_path)
while node.parent_directory:
path_elements.append(node.name)
node = node.parent_directory
for name in reversed(path_elements):
path /= name
return path
get_email_attachments(parsed_email)
#
Extracts attachment identifiers from parsed email data.
Source code in snoop/data/filesystem.py
def get_email_attachments(parsed_email):
"""Extracts attachment identifiers from parsed email data.
"""
if isinstance(parsed_email, SnoopTaskBroken):
log.debug("Email task is broken; returning without doing anything")
return
def iter_parts(email_data):
yield email_data
for part in email_data.get('parts') or []:
yield from iter_parts(part)
with parsed_email.open() as f:
email_data = json.load(f)
for part in iter_parts(email_data):
part_attachment = part.get('attachment')
if part_attachment:
yield part_attachment
handle_file(file_pk, **depends_on)
#
Parse, update and possibly convert file found on in dataset.
Re-runs libmagic
in case mime type changed (through updating the library). Switching by the resulting
mime type, a decision is made if the file needs to be converted to another format, or unpacked into more
Files and Directories (in cases like archives,
emails with attachments, PDFs with images, etc).
If a conversion/unpacking is required, then a Task responsible for doing the
conversion/unpacking operation is dynamically added as a dependency for this Task (using
require_dependency()
). Previous dependencies that are not valid
anymore must also be removed here; this is to fix documents with wrong mime types, not to support
document deletion.
Finally, after all unarchiving, unpacking and converting is done, we queue the
digests.launch
Task for the de-duplicated document that the given File is
pointing to. A dependency between this Task and that one is not made, since we have no use for such a
dependency and it would only slow down the database.
Source code in snoop/data/filesystem.py
@snoop_task('filesystem.handle_file', version=4, queue='filesystem')
def handle_file(file_pk, **depends_on):
"""Parse, update and possibly convert file found on in dataset.
Re-runs `libmagic` in case mime type changed (through updating the library). Switching by the resulting
mime type, a decision is made if the file needs to be converted to another format, or unpacked into more
[Files][snoop.data.models.File] and [Directories][snoop.data.models.Directory] (in cases like archives,
emails with attachments, PDFs with images, etc).
If a conversion/unpacking is required, then a [Task][snoop.data.models.Task] responsible for doing the
conversion/unpacking operation is dynamically added as a dependency for this Task (using
[`require_dependency()`][snoop.data.tasks.require_dependency]). Previous dependencies that are not valid
anymore must also be removed here; this is to fix documents with wrong mime types, not to support
document deletion.
Finally, after all unarchiving, unpacking and converting is done, we queue the
[`digests.launch`][snoop.data.digests.launch] Task for the de-duplicated document that the given File is
pointing to. A dependency between this Task and that one is not made, since we have no use for such a
dependency and it would only slow down the database.
"""
# TODO: remove `file.blob`, keep only `file.original`, and move the emlx_reconstruct/msg_to_eml
# into digests.launch
file = models.File.objects.get(pk=file_pk)
old_mime = file.original.mime_type
old_blob_mime = file.blob.mime_type
old_blob = file.blob
file.blob = file.original
# update mime type / magic information
file.blob.update_magic()
if file.blob != file.original:
file.original.update_magic()
extension = pathlib.Path(file.name).suffix.lower()
if allow_processing_for_mime_type(file.original.mime_type, extension):
if archives.is_archive(file.blob):
unarchive_task = archives.unarchive.laterz(file.blob)
create_archive_files.laterz(
file.pk,
depends_on={'archive_listing': unarchive_task},
)
if file.original.mime_type in email.OUTLOOK_POSSIBLE_MIME_TYPES:
try:
file.blob = require_dependency(
'msg_to_eml', depends_on,
lambda: email.msg_to_eml.laterz(file.original),
)
except SnoopTaskBroken:
pass
else:
remove_dependency('msg_to_eml', depends_on)
if file.original.mime_type in EMLX_EMAIL_MIME_TYPES:
file.blob = require_dependency(
'emlx_reconstruct', depends_on,
lambda: emlx.reconstruct.laterz(file.pk),
)
else:
remove_dependency('emlx_reconstruct', depends_on)
if file.blob.mime_type in RFC822_EMAIL_MIME_TYPES:
email_parse_task = email.parse.laterz(file.blob)
create_attachment_files.laterz(
file.pk,
depends_on={'email_parse': email_parse_task},
)
file.save()
# if conversion blob changed from last time, then
# we want to check if the old one is an orphan.
deleted = False
if file.blob.pk != old_blob.pk and old_blob.pk != file.original.pk:
if not old_blob.file_set.exists():
# since it is an orphan, let's remove it from the index
log.warning('deleting orphaned blob from index: ' + old_blob.pk)
delete_doc(old_blob.pk)
# and database - this should cascade into all tasks, Digests, etc
old_blob.delete()
deleted = True
retry = file.original.mime_type != old_mime \
or file.blob.mime_type != old_blob_mime \
or deleted
digests.launch.laterz(file.blob, retry=retry)
walk(directory_pk)
#
Scans one level of a directory and recursively ingests all files and directories found.
Items are iterated in arbitrary order. When a directory was found, an entry is added to the snoop.data.models.Directory. table (if it doesn't exist there already) and another snoop.data.filesystem.walk Task is queued on it. On the other hand, if a file was fonud, a corresponding row is added (or updated, but never removed) from the snoop.data.models.File table, the binary data for the file is stored in a snoop.data.models.Blob object, and finally the snoop.data.filesystem.handle_file Task is queued for it.
One of the decorators of this function, snoop.data.tasks.snoop_task, wraps this function in a
Django Transaction. Because snoop.data.tasks.queue_task also wraps the queueing operation inside
Django's transaction.on_commit()
, all queueing operations will be handled after the transaction (and
function) is finished successfully. This has two effects: we never queue tasks triggered from failing
functions (to avoid cascading errors), and we queue all the next tasks after finishing running the
current Task.
The number of tasks queued by this one is limited to a config setting called
CHILD_QUEUE_LIMIT
. Since the tasks to be queued are kept in
memory until the function is finished, we are limiting the memory used through this value. We also want
to avoid saturating the message queue with writes, a real
I/O bottleneck.
For the walk() -> handle_file()
queueing, this is the intended behavior, because handle_file
requires all sibling Files to have also been saved in the File table and on the Blobs. This is needed
because some file types depend on their sibling files to be decoded. For example, the Apple Email format
(".emlx") stores larger parts of its multipart RFC-822 messages in separate files under the same
Directory. If we would run handle_file()
on any file without its siblings existing in the database, we
woulnd't find those attachment files.
For the walk() -> walk()
queueing, this is not the intended behavior - we could save some time in the
beginning of processing a dataset by dispatching all the walk()
faster, saturating the workers
quicker. We take a conservative approach here in case we wanted to add deeper matching of related files
(for example, what if the ".emlx" files stored their attachment under a sub-folder?). Since this
functionality is not used, the walk() -> walk()
recursivity may be optimized in the future by removing
the transaction.on_commit
call from queue_task()
when queueing walk()
from this function.
Source code in snoop/data/filesystem.py
@snoop_task('filesystem.walk', version=2, queue='filesystem')
def walk(directory_pk):
"""Scans one level of a directory and recursively ingests all files and directories found.
Items are iterated in arbitrary order. When a directory was found, an entry is added to the
[snoop.data.models.Directory][]. table (if it doesn't exist there already) and another
[snoop.data.filesystem.walk][] Task is queued on it. On the other hand, if a file was fonud, a
corresponding row is added (or updated, but never removed) from the [snoop.data.models.File][] table,
the binary data for the file is stored in a [snoop.data.models.Blob][] object, and finally the
[snoop.data.filesystem.handle_file][] Task is queued for it.
One of the decorators of this function, [snoop.data.tasks.snoop_task][], wraps this function in a
Django Transaction. Because [snoop.data.tasks.queue_task][] also wraps the queueing operation inside
Django's `transaction.on_commit()`, all queueing operations will be handled after the transaction (and
function) is finished successfully. This has two effects: we never queue tasks triggered from failing
functions (to avoid cascading errors), and we queue all the next tasks after finishing running the
current Task.
The number of tasks queued by this one is limited to a config setting called
[`CHILD_QUEUE_LIMIT`][snoop.defaultsettings.CHILD_QUEUE_LIMIT]. Since the tasks to be queued are kept in
memory until the function is finished, we are limiting the memory used through this value. We also want
to avoid saturating the message queue with writes, a real
I/O bottleneck.
For the `walk() -> handle_file()` queueing, this is the intended behavior, because `handle_file`
requires all sibling Files to have also been saved in the File table and on the Blobs. This is needed
because some file types depend on their sibling files to be decoded. For example, the Apple Email format
(".emlx") stores larger parts of its multipart RFC-822 messages in separate files under the same
Directory. If we would run `handle_file()` on any file without its siblings existing in the database, we
woulnd't find those attachment files.
For the `walk() -> walk()` queueing, this is not the intended behavior - we could save some time in the
beginning of processing a dataset by dispatching all the `walk()` faster, saturating the workers
quicker. We take a conservative approach here in case we wanted to add deeper matching of related files
(for example, what if the ".emlx" files stored their attachment under a sub-folder?). Since this
functionality is not used, the `walk() -> walk()` recursivity may be optimized in the future by removing
the `transaction.on_commit` call from `queue_task()` when queueing `walk()` from this function.
"""
directory = models.Directory.objects.get(pk=directory_pk)
url_stat = settings.SNOOP_BROKEN_FILENAME_SERVICE + '/get-stat'
url_list = settings.SNOOP_BROKEN_FILENAME_SERVICE + '/get-list'
url_obj = settings.SNOOP_BROKEN_FILENAME_SERVICE + '/get-object'
with collections.current().mount_collections_root() as root_collection_path:
root_data_path = os.path.join(root_collection_path, collections.Collection.DATA_DIR)
dir_path = directory_absolute_path(root_data_path, directory)
relative_path = os.path.relpath(dir_path, start=root_collection_path)
service_path_bytes = os.path.join(
collections.current().name,
relative_path,
).encode('utf-8', errors='surrogateescape')
arg = {'path_base64': base64.b64encode(service_path_bytes).decode()}
for i, thing in enumerate(requests.post(url_list, json=arg).json()['list']):
queue_limit = i >= settings.CHILD_QUEUE_LIMIT
thing['name_bytes'] = base64.b64decode(thing['name_bytes'])
thing['name'] = thing['name_bytes'].decode('utf8', errors='surrogateescape')
if thing['is_dir']:
(child_directory, created) = directory.child_directory_set.get_or_create(
name_bytes=thing['name_bytes'],
)
# since the periodic task retries all talk tasks in rotation,
# we're not going to dispatch a walk task we didn't create
walk.laterz(child_directory.pk, queue_now=created and not queue_limit)
continue
f_path = directory_absolute_path(root_data_path, directory) / thing['name']
f_relative_path = os.path.relpath(f_path, start=root_collection_path)
if _is_valid_utf8(str(f_path)):
stat = f_path.stat()
stat_size = stat.st_size
stat_ctime = stat.st_ctime
stat_mtime = stat.st_mtime
original = models.Blob.create_from_file(
f_path,
collection_source_key=f_relative_path.encode('utf-8', errors='surrogateescape'),
)
else:
# use the broken filename service
f_service_path_bytes = os.path.join(
collections.current().name,
f_relative_path,
).encode('utf-8', errors='surrogateescape')
f_arg = {'path_base64': base64.b64encode(f_service_path_bytes).decode()}
stat = requests.post(url_stat, json=f_arg).json()
stat_size = stat['size']
stat_ctime = stat['ctime']
stat_mtime = stat['mtime']
# save file to remote disk and create blob from it
with collections.current().mount_blobs_root(readonly=False) as w_blobs_root:
tmp_base = pathlib.Path(w_blobs_root) / 'tmp' / 'blobs-broken-filenames'
tmp_base.mkdir(parents=True, exist_ok=True)
# not using "with" because we give arg delete=False
# pylint: disable=consider-using-with
temp = tempfile.NamedTemporaryFile(dir=tmp_base, prefix='blob-', delete=False)
temp_name = temp.name
try:
with requests.post(url_obj, json=f_arg, stream=True) as r:
r.raise_for_status()
for chunk in r.iter_content(chunk_size=512 * 1024):
temp.write(chunk)
temp.flush()
temp.close()
original = models.Blob.create_from_file(temp_name)
finally:
os.unlink(temp_name)
file, created = directory.child_file_set.get_or_create(
name_bytes=thing['name_bytes'],
defaults=dict(
ctime=time_from_unix(stat_ctime),
mtime=time_from_unix(stat_mtime),
size=stat_size,
original=original,
blob=original,
),
)
# if file is already loaded, and size+mtime are the same,
# don't retry handle task
if created \
or file.mtime != time_from_unix(stat_mtime) \
or file.size != stat_size:
file.mtime = time_from_unix(stat_mtime)
file.size = stat_size
file.original = original
file.save()
handle_file.laterz(file.pk, retry=True, queue_now=not queue_limit)
else:
handle_file.laterz(file.pk, queue_now=False)