Skip to content

snoop.data.admin #

Django Admin definitions.

This defines MultiDBModelAdmin, the root Admin class required to view models from the different databases.

Specialized admin sites for the different tables also live here; we add links, a table with task statistics, and generally try to restrict editing info that should remain read-only. There are still a million ways to break or exploit this site from this admin, so we keep it locked under firewall and access it through tunneling onto the machine.

All the different admin sites are kept in the global dict sites. The default admin site is also part of this dict, under the key "_default". The sites are mapped to URLs in snoop.data.urls using this global.

Classes#

BlobAdmin #

List and detail views for the blobs.

Classes#
BlobFileInline #
Classes#
model #

Database modle for a file found in the dataset.

Attributes#
blob #

The converted data for this File.

This is usually identical to original, but for some file formats conversion is required before any further processing (like apple email .emlx which is basically .eml with another some binary data prefixed to it).

ctime #

Taken from stat() or other sources.

mtime #

Taken from stat() or other sources.

name property readonly #

Decodes the name of this File as UTF-8.

Escapes UTF-8 encoding errors with 'surrogateescape' - this has the advantage that it's reversible, for bad encodings.

name_bytes #

Name of file on disk, as bytes.

We store this as bytes and not as strings because we have to support a multitude of original filesystems and encodings that create mutually invalid results.

original #

The original data found for this File.

parent property readonly #

Returns the ID of the parent directory.

parent_directory #

The directory containg this File.

size #

Size, taken from stat(), in bytes.

Methods#
__repr__(self) special #

String representation for a File is its filename, with non-UTF8 code points escaped with backslashes, truncated.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a File is its filename,
    with non-UTF8 code points escaped with backslashes, truncated.
    """
    name_bytes = self.name_bytes
    if isinstance(name_bytes, memoryview):
        name_bytes = name_bytes.tobytes()
    the_str = truncatechars(name_bytes.decode('utf8', errors='backslashreplace'), 60)
    return f'File({the_str})'
__str__(self) special #

String representation for a File is its filename, with non-UTF8 code points escaped with backslashes, truncated.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a File is its filename,
    with non-UTF8 code points escaped with backslashes, truncated.
    """
    name_bytes = self.name_bytes
    if isinstance(name_bytes, memoryview):
        name_bytes = name_bytes.tobytes()
    the_str = truncatechars(name_bytes.decode('utf8', errors='backslashreplace'), 60)
    return f'File({the_str})'
DigestArgumentInline #
Classes#
model #

Digest contains all the data we have parsed for a de-duplicated document.

The data is neatly stored as JSON in the "result" blob, ready for quick re-indexing if the need arises.

Attributes#
blob #

The de-duplicated Document for which processing has happened.

This corresponds to snoop.data.models.File.blob, not snoop.data.models.File.original.

extra_result #

The Blob that contains the result of the digests.index task, encoded as JSON. The field is optional, and required by tasks that depend on the `

This may become huge, so we store it as a Blob instead of a JSON field.

result #

The Blob that contains the result of parsing the document, encoded as JSON.

This output is generated by the digests.gather task.

This may become huge, so we store it as a Blob instead of a JSON field.

Methods#
__repr__(self) special #

To represent a Digest we use its blob hash and the result hash.

Source code in snoop/data/admin.py
def __str__(self):
    """To represent a Digest we use its blob hash and the result hash.
    """
    return f'{self.blob} -> {self.result.pk[:5]}...'
__str__(self) special #

To represent a Digest we use its blob hash and the result hash.

Source code in snoop/data/admin.py
def __str__(self):
    """To represent a Digest we use its blob hash and the result hash.
    """
    return f'{self.blob} -> {self.result.pk[:5]}...'
get_etag(self) #

Compute HTTP ETag header for this Digest. To be used for implementing caching mechanisms.

Source code in snoop/data/admin.py
def get_etag(self):
    """Compute HTTP ETag header for this Digest.
    To be used for implementing caching mechanisms."""
    etag = str(self.pk)
    etag += ':'
    if self.result:
        etag += str(self.result.pk)
    etag += ':'
    if self.extra_result:
        etag += str(self.extra_result.pk)
    etag += ':'
    etag += str(self.date_modified)
    etag += ':'
    etag += str(self.date_created)
    etag = etag.encode('utf-8', errors='backslashreplace')
    etag = hashlib.sha1(etag).hexdigest()
    return etag
DigestExtraResultInline #
Classes#
model #

Digest contains all the data we have parsed for a de-duplicated document.

The data is neatly stored as JSON in the "result" blob, ready for quick re-indexing if the need arises.

Attributes#
blob #

The de-duplicated Document for which processing has happened.

This corresponds to snoop.data.models.File.blob, not snoop.data.models.File.original.

extra_result #

The Blob that contains the result of the digests.index task, encoded as JSON. The field is optional, and required by tasks that depend on the `

This may become huge, so we store it as a Blob instead of a JSON field.

result #

The Blob that contains the result of parsing the document, encoded as JSON.

This output is generated by the digests.gather task.

This may become huge, so we store it as a Blob instead of a JSON field.

Methods#
__repr__(self) special #

To represent a Digest we use its blob hash and the result hash.

Source code in snoop/data/admin.py
def __str__(self):
    """To represent a Digest we use its blob hash and the result hash.
    """
    return f'{self.blob} -> {self.result.pk[:5]}...'
__str__(self) special #

To represent a Digest we use its blob hash and the result hash.

Source code in snoop/data/admin.py
def __str__(self):
    """To represent a Digest we use its blob hash and the result hash.
    """
    return f'{self.blob} -> {self.result.pk[:5]}...'
get_etag(self) #

Compute HTTP ETag header for this Digest. To be used for implementing caching mechanisms.

Source code in snoop/data/admin.py
def get_etag(self):
    """Compute HTTP ETag header for this Digest.
    To be used for implementing caching mechanisms."""
    etag = str(self.pk)
    etag += ':'
    if self.result:
        etag += str(self.result.pk)
    etag += ':'
    if self.extra_result:
        etag += str(self.extra_result.pk)
    etag += ':'
    etag += str(self.date_modified)
    etag += ':'
    etag += str(self.date_created)
    etag = etag.encode('utf-8', errors='backslashreplace')
    etag = hashlib.sha1(etag).hexdigest()
    return etag
DigestResultInline #
Classes#
model #

Digest contains all the data we have parsed for a de-duplicated document.

The data is neatly stored as JSON in the "result" blob, ready for quick re-indexing if the need arises.

Attributes#
blob #

The de-duplicated Document for which processing has happened.

This corresponds to snoop.data.models.File.blob, not snoop.data.models.File.original.

extra_result #

The Blob that contains the result of the digests.index task, encoded as JSON. The field is optional, and required by tasks that depend on the `

This may become huge, so we store it as a Blob instead of a JSON field.

result #

The Blob that contains the result of parsing the document, encoded as JSON.

This output is generated by the digests.gather task.

This may become huge, so we store it as a Blob instead of a JSON field.

Methods#
__repr__(self) special #

To represent a Digest we use its blob hash and the result hash.

Source code in snoop/data/admin.py
def __str__(self):
    """To represent a Digest we use its blob hash and the result hash.
    """
    return f'{self.blob} -> {self.result.pk[:5]}...'
__str__(self) special #

To represent a Digest we use its blob hash and the result hash.

Source code in snoop/data/admin.py
def __str__(self):
    """To represent a Digest we use its blob hash and the result hash.
    """
    return f'{self.blob} -> {self.result.pk[:5]}...'
get_etag(self) #

Compute HTTP ETag header for this Digest. To be used for implementing caching mechanisms.

Source code in snoop/data/admin.py
def get_etag(self):
    """Compute HTTP ETag header for this Digest.
    To be used for implementing caching mechanisms."""
    etag = str(self.pk)
    etag += ':'
    if self.result:
        etag += str(self.result.pk)
    etag += ':'
    if self.extra_result:
        etag += str(self.extra_result.pk)
    etag += ':'
    etag += str(self.date_modified)
    etag += ':'
    etag += str(self.date_created)
    etag = etag.encode('utf-8', errors='backslashreplace')
    etag = hashlib.sha1(etag).hexdigest()
    return etag
OriginalFileInline #
Classes#
model #

Database modle for a file found in the dataset.

Attributes#
blob #

The converted data for this File.

This is usually identical to original, but for some file formats conversion is required before any further processing (like apple email .emlx which is basically .eml with another some binary data prefixed to it).

ctime #

Taken from stat() or other sources.

mtime #

Taken from stat() or other sources.

name property readonly #

Decodes the name of this File as UTF-8.

Escapes UTF-8 encoding errors with 'surrogateescape' - this has the advantage that it's reversible, for bad encodings.

name_bytes #

Name of file on disk, as bytes.

We store this as bytes and not as strings because we have to support a multitude of original filesystems and encodings that create mutually invalid results.

original #

The original data found for this File.

parent property readonly #

Returns the ID of the parent directory.

parent_directory #

The directory containg this File.

size #

Size, taken from stat(), in bytes.

Methods#
__repr__(self) special #

String representation for a File is its filename, with non-UTF8 code points escaped with backslashes, truncated.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a File is its filename,
    with non-UTF8 code points escaped with backslashes, truncated.
    """
    name_bytes = self.name_bytes
    if isinstance(name_bytes, memoryview):
        name_bytes = name_bytes.tobytes()
    the_str = truncatechars(name_bytes.decode('utf8', errors='backslashreplace'), 60)
    return f'File({the_str})'
__str__(self) special #

String representation for a File is its filename, with non-UTF8 code points escaped with backslashes, truncated.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a File is its filename,
    with non-UTF8 code points escaped with backslashes, truncated.
    """
    name_bytes = self.name_bytes
    if isinstance(name_bytes, memoryview):
        name_bytes = name_bytes.tobytes()
    the_str = truncatechars(name_bytes.decode('utf8', errors='backslashreplace'), 60)
    return f'File({the_str})'
TaskArgumentInline #
Classes#
model #

Database model for tracking status of the processing pipeline.

Each row in this table tracks an application of a Python function to some arguments. Additional arguments can also be supplied as other Tasks that must run before this one.

Attributes#
ALL_STATUS_CODES #

List of all valid status codes.

Todo

We should really change these out for Enums at some point.

args #

JSON containing arguments.

blob_arg #

If the first argument is a Blob, it will be duplicated here.

Used to optimize fetching tasks, as most tasks will only process one Blob as input.

broken_reason #

Identifier with reason for this permanent failure.

date_finished #

Moment when task finished running.

Used in logic for retrying old errors and re-running sync tasks.

date_started #

Moment when task started running.

This isn't saved on the object when the task actually starts, in order to limit database writes.

error #

Text with stack trace, if status is "error" or "broken".

fail_count #

The number of times this function has failed in a row.

Used to stop retrying tasks that will never make it.

func #

String key for Python function.

Supplied as argument in the decorator snoop.data.tasks.snoop_task.

See snoop.data.tasks for general definition and snoop.data.filesystem, snoop.data.analyzers.init and snoop.data.digests for actual Task implementations.

log #

Text with first few KB of logs generated when this task was run.

result #

Binary object with result of running the function.

Is set if finished successfully, and if the function actually returns a Blob value.

status #

String token with task status; see above.

STATUS_BROKEN #

Permanent error.

Used to some known type of breakage, such as: encrypted archives, encrypted PDFs, or if dependencies are in an ERROR state too.

STATUS_DEFERRED #

Waiting on some other task to finish.

STATUS_ERROR #

Unexpected error.

Might be termporary, might be permanent, we don't know.

STATUS_PENDING #

Task either wasn't run yet, or was started but not finished.

Making the difference between pending and running requires a write to happen inside our transaction, so we can't tell from outside the runner anyway.

STATUS_QUEUED #

Used for tasks that have been put on the queue.

STATUS_STARTED #

Has been started by the worker at some point.

Used to detect when Python process was unexpectedly Killed, e.g. from OOM.

STATUS_SUCCESS #

Task finished successfully.

version #

The version of the function that ran this task.

Used to re-process data when the code (version number) is changed.

Methods#
__repr__(self) special #

String representation for a Task contains its name, args and status.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a Task contains its name, args and status.
    """
    deps = ''
    prev_set = self.prev_set.all()
    prev_ids = ', '.join(str(t.prev.pk) for t in prev_set)
    deps = '; depends on ' + prev_ids if prev_ids else ''
    the_args = str([truncatechars(str(x), 12) for x in self.args])
    return f'Task #{self.pk} {self.func}({the_args}{deps}) [{self.status}]'
__str__(self) special #

String representation for a Task contains its name, args and status.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a Task contains its name, args and status.
    """
    deps = ''
    prev_set = self.prev_set.all()
    prev_ids = ', '.join(str(t.prev.pk) for t in prev_set)
    deps = '; depends on ' + prev_ids if prev_ids else ''
    the_args = str([truncatechars(str(x), 12) for x in self.args])
    return f'Task #{self.pk} {self.func}({the_args}{deps}) [{self.status}]'
size(self) #

Returns task size in bytes. Includes blob argument size, JSON argument size, and all dependency result blob sizes, all added up.

Source code in snoop/data/admin.py
def size(self):
    """Returns task size in bytes.
    Includes blob argument size, JSON argument size, and all dependency result blob sizes, all added up.
    """
    s = len(json.dumps(self.args))
    if self.blob_arg:
        s += self.blob_arg.size

    for dep in self.prev_set.all():
        if dep.prev.result:
            s += dep.prev.result.size

    return s
update(self, status = None, error = None, broken_reason = None, log = None, version = None) #

Helper method to update multiple fields at once, without saving.

This method also truncates our Text fields to decent limits, so it's preferred to use this instead of the fields directly.

Parameters:

Name Type Description Default
status

field to set, if not None

None
error

field to set, if not None

None
broken_reason

field to set, if not None

None
log

field to set, if not None

None
version

field to set, if not None

None
Source code in snoop/data/admin.py
def update(self, status=None, error=None, broken_reason=None, log=None, version=None):
    """Helper method to update multiple fields at once, without saving.

    This method also truncates our Text fields to decent limits, so it's
    preferred to use this instead of the fields directly.

    Args:
        status: field to set, if not None
        error: field to set, if not None
        broken_reason: field to set, if not None
        log: field to set, if not None
        version: field to set, if not None
    """
    def _escape(s):
        """Escapes non-printable characters as \\XXX.

        Args:
            s: string to escape
        """
        def _translate(x):
            """Turns non-printable characters into \\XXX, prerves the rest.

            Args:
                x:
            """
            if x in string.printable:
                return x
            return f'\\{ord(x)}'
        return "".join(map(_translate, s))

    old_version = self.version
    if version is not None:
        self.version = version

    if status is not None:
        self.status = status

    if error is not None:
        self.error = _escape(error)[:2**13]  # 8k of error screen
    if broken_reason is not None:
        self.broken_reason = _escape(broken_reason)[:2**12]  # 4k reason
    if log is not None:
        self.log = _escape(log)[:2**14]  # 16k of log space

    # Increment fail_count only if we ran the same version and still got a bad status code.
    # Reset the fail count only when status is success, or if the version changed.
    if self.status == self.STATUS_SUCCESS or old_version != self.version:
        self.fail_count = 0
    elif self.status in [self.STATUS_BROKEN, self.STATUS_ERROR]:
        self.fail_count = self.fail_count + 1
TaskResultInline #
Classes#
model #

Database model for tracking status of the processing pipeline.

Each row in this table tracks an application of a Python function to some arguments. Additional arguments can also be supplied as other Tasks that must run before this one.

Attributes#
ALL_STATUS_CODES #

List of all valid status codes.

Todo

We should really change these out for Enums at some point.

args #

JSON containing arguments.

blob_arg #

If the first argument is a Blob, it will be duplicated here.

Used to optimize fetching tasks, as most tasks will only process one Blob as input.

broken_reason #

Identifier with reason for this permanent failure.

date_finished #

Moment when task finished running.

Used in logic for retrying old errors and re-running sync tasks.

date_started #

Moment when task started running.

This isn't saved on the object when the task actually starts, in order to limit database writes.

error #

Text with stack trace, if status is "error" or "broken".

fail_count #

The number of times this function has failed in a row.

Used to stop retrying tasks that will never make it.

func #

String key for Python function.

Supplied as argument in the decorator snoop.data.tasks.snoop_task.

See snoop.data.tasks for general definition and snoop.data.filesystem, snoop.data.analyzers.init and snoop.data.digests for actual Task implementations.

log #

Text with first few KB of logs generated when this task was run.

result #

Binary object with result of running the function.

Is set if finished successfully, and if the function actually returns a Blob value.

status #

String token with task status; see above.

STATUS_BROKEN #

Permanent error.

Used to some known type of breakage, such as: encrypted archives, encrypted PDFs, or if dependencies are in an ERROR state too.

STATUS_DEFERRED #

Waiting on some other task to finish.

STATUS_ERROR #

Unexpected error.

Might be termporary, might be permanent, we don't know.

STATUS_PENDING #

Task either wasn't run yet, or was started but not finished.

Making the difference between pending and running requires a write to happen inside our transaction, so we can't tell from outside the runner anyway.

STATUS_QUEUED #

Used for tasks that have been put on the queue.

STATUS_STARTED #

Has been started by the worker at some point.

Used to detect when Python process was unexpectedly Killed, e.g. from OOM.

STATUS_SUCCESS #

Task finished successfully.

version #

The version of the function that ran this task.

Used to re-process data when the code (version number) is changed.

Methods#
__repr__(self) special #

String representation for a Task contains its name, args and status.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a Task contains its name, args and status.
    """
    deps = ''
    prev_set = self.prev_set.all()
    prev_ids = ', '.join(str(t.prev.pk) for t in prev_set)
    deps = '; depends on ' + prev_ids if prev_ids else ''
    the_args = str([truncatechars(str(x), 12) for x in self.args])
    return f'Task #{self.pk} {self.func}({the_args}{deps}) [{self.status}]'
__str__(self) special #

String representation for a Task contains its name, args and status.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a Task contains its name, args and status.
    """
    deps = ''
    prev_set = self.prev_set.all()
    prev_ids = ', '.join(str(t.prev.pk) for t in prev_set)
    deps = '; depends on ' + prev_ids if prev_ids else ''
    the_args = str([truncatechars(str(x), 12) for x in self.args])
    return f'Task #{self.pk} {self.func}({the_args}{deps}) [{self.status}]'
size(self) #

Returns task size in bytes. Includes blob argument size, JSON argument size, and all dependency result blob sizes, all added up.

Source code in snoop/data/admin.py
def size(self):
    """Returns task size in bytes.
    Includes blob argument size, JSON argument size, and all dependency result blob sizes, all added up.
    """
    s = len(json.dumps(self.args))
    if self.blob_arg:
        s += self.blob_arg.size

    for dep in self.prev_set.all():
        if dep.prev.result:
            s += dep.prev.result.size

    return s
update(self, status = None, error = None, broken_reason = None, log = None, version = None) #

Helper method to update multiple fields at once, without saving.

This method also truncates our Text fields to decent limits, so it's preferred to use this instead of the fields directly.

Parameters:

Name Type Description Default
status

field to set, if not None

None
error

field to set, if not None

None
broken_reason

field to set, if not None

None
log

field to set, if not None

None
version

field to set, if not None

None
Source code in snoop/data/admin.py
def update(self, status=None, error=None, broken_reason=None, log=None, version=None):
    """Helper method to update multiple fields at once, without saving.

    This method also truncates our Text fields to decent limits, so it's
    preferred to use this instead of the fields directly.

    Args:
        status: field to set, if not None
        error: field to set, if not None
        broken_reason: field to set, if not None
        log: field to set, if not None
        version: field to set, if not None
    """
    def _escape(s):
        """Escapes non-printable characters as \\XXX.

        Args:
            s: string to escape
        """
        def _translate(x):
            """Turns non-printable characters into \\XXX, prerves the rest.

            Args:
                x:
            """
            if x in string.printable:
                return x
            return f'\\{ord(x)}'
        return "".join(map(_translate, s))

    old_version = self.version
    if version is not None:
        self.version = version

    if status is not None:
        self.status = status

    if error is not None:
        self.error = _escape(error)[:2**13]  # 8k of error screen
    if broken_reason is not None:
        self.broken_reason = _escape(broken_reason)[:2**12]  # 4k reason
    if log is not None:
        self.log = _escape(log)[:2**14]  # 16k of log space

    # Increment fail_count only if we ran the same version and still got a bad status code.
    # Reset the fail count only when status is success, or if the version changed.
    if self.status == self.STATUS_SUCCESS or old_version != self.version:
        self.fail_count = 0
    elif self.status in [self.STATUS_BROKEN, self.STATUS_ERROR]:
        self.fail_count = self.fail_count + 1
Methods#
change_view(self, request, object_id, form_url = '', extra_context = None) #

Optionally fetch and display the actual blob data in the defail view.

Our detail view is called "change_view" by Django, but we made everything read-only in this admin.

Source code in snoop/data/admin.py
def change_view(self, request, object_id, form_url='', extra_context=None):
    """Optionally fetch and display the actual blob data in the defail view.

    Our detail view is called "change_view" by Django, but we made everything read-only in this admin.
    """

    with self.collection.set_current():
        extra_context = extra_context or {}

        if object_id:
            blob = models.Blob.objects.get(pk=object_id)
            if blob.mime_type in ['text/plain', 'application/json']:
                extra_context['preview'] = True

                if request.GET.get('preview'):
                    content = self.get_preview_content(blob)
                    extra_context['preview_content'] = content

        return super().change_view(
            request, object_id, form_url, extra_context=extra_context,
        )
created(self, obj) #

Returns user-friendly string with date created (like "3 months ago").

Source code in snoop/data/admin.py
def created(self, obj):
    """Returns user-friendly string with date created (like "3 months ago")."""
    return naturaltime(obj.date_created)
get_preview_content(self, blob) #

Returns string with text for Blobs that are JSON or text.

Used to peek at the Blob data from the Admin without opening a shell.

Only works for text/plain and application/json mime types.

Source code in snoop/data/admin.py
def get_preview_content(self, blob):
    """Returns string with text for Blobs that are JSON or text.

    Used to peek at the Blob data from the Admin without opening a shell.

    Only works for `text/plain` and `application/json` mime types.
    """
    if blob.mime_type == 'text/plain':
        encoding = 'latin1' if blob.mime_encoding == 'binary' else blob.mime_encoding
        with blob.open() as f:
            return f.read().decode(encoding)

    elif blob.mime_type == 'application/json':
        with blob.open() as f:
            return json.dumps(json.load(f), indent=2, sort_keys=True)

    else:
        return ''

CollectionAdminSite #

Admin site that connects to a collection's database.

Requires that all models linked here be subclasses of MultiDBModelAdmin.

Methods#
admin_view(self, *args, **kwargs) #

Decorator to create an admin view attached to this AdminSite. This wraps the view and provides permission checking by calling self.has_permission.

You'll want to use this from within AdminSite.get_urls():

class MyAdminSite(AdminSite):

    def get_urls(self):
        from django.urls import path

        urls = super().get_urls()
        urls += [
            path('my_view/', self.admin_view(some_view))
        ]
        return urls

By default, admin_views are marked non-cacheable using the never_cache decorator. If the view can be safely cached, set cacheable=True.

Source code in snoop/data/admin.py
def admin_view(self, *args, **kwargs):
    with self.collection.set_current():
        return super().admin_view(*args, **kwargs)
stats(self, request) #

Shows tables with statistics for this collection.

The data is fetched from snoop.data.models.Statistics with key = "stats".

A periodic worker will update this data every minute or so to limit usage and allow monitoring. See snoop.data.tasks.save_stats() on how this is done.

Source code in snoop/data/admin.py
def stats(self, request):
    """Shows tables with statistics for this collection.

    The data is fetched from `snoop.data.models.Statistics` with key = "stats".

    A periodic worker will update this data every minute or so to limit usage and allow monitoring.
    See `snoop.data.tasks.save_stats()` on how this is done.
    """

    with self.collection.set_current():
        context = dict(self.each_context(request))
        # stats, _ = models.Statistics.objects.get_or_create(key='stats')
        context.update(get_stats())
        print(context)
        return render(request, 'snoop/admin_stats.html', context)

DigestAdmin #

Listing and detail views for the Digests.

DirectoryAdmin #

List and detail views for the folders.

Classes#
ChildDirectoryInline #
Classes#
model #

Database model for a file directory.

Along with File, this comprises the file tree structure analyzed by Hoover. A Directory can be found in two places: in another Directory, or as the only child of some archive or archive-like file.

Attributes#
container_file #

The parent, if it's a file (archive, email-archive or something else), else NULL.

Mutually exclusive with snoop.data.models.Directory.parent_directory.

name property readonly #

Decodes the name of this Directory as UTF-8.

Escapes UTF-8 encoding errors with 'surrogateescape' - this has the advantage that it's reversible, for bad encodings.

name_bytes #

Name of directory on disk, as bytes.

We store this as bytes and not as strings because we have to support a multitude of original filesystems and encodings that create mutually invalid results.

parent property readonly #

Returns its parent, be it a File or Directory.

parent_directory #

The parent, if it is a directory, or NULL.

Mutually exclusive with snoop.data.models.Directory.container_file.

path_str property readonly #

Returns a string representation of its full path.

Methods#
__repr__(self) special #

String representation for this Directory is its full path.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for this Directory is its full path.
    """
    # ensure no display errors by replacing surrogates with backslashes
    name = self.path_str.encode('utf8', errors='surrogateescape')
    name = name.decode('utf8', errors='backslashreplace')
    return truncatechars(name, 70)
__str__(self) special #

String representation for this Directory is its full path.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for this Directory is its full path.
    """
    # ensure no display errors by replacing surrogates with backslashes
    name = self.path_str.encode('utf8', errors='surrogateescape')
    name = name.decode('utf8', errors='backslashreplace')
    return truncatechars(name, 70)
ancestry(item) #

Yields ancestors until root is found.

Source code in snoop/data/admin.py
def ancestry(item):
    """Yields ancestors until root is found.
    """
    while item:
        yield item
        item = item.parent
root() classmethod #

Get the root of the whole filesystem.

Exceptions:

Type Description
DoesNotExist

if table empty.

Source code in snoop/data/admin.py
@classmethod
def root(cls):
    """Get the root of the whole filesystem.

    Raises:
        DoesNotExist: if table empty.
    """
    return cls.objects.filter(
        parent_directory__isnull=True,
        container_file__isnull=True
    ).first()
ChildFileInline #
Classes#
model #

Database modle for a file found in the dataset.

Attributes#
blob #

The converted data for this File.

This is usually identical to original, but for some file formats conversion is required before any further processing (like apple email .emlx which is basically .eml with another some binary data prefixed to it).

ctime #

Taken from stat() or other sources.

mtime #

Taken from stat() or other sources.

name property readonly #

Decodes the name of this File as UTF-8.

Escapes UTF-8 encoding errors with 'surrogateescape' - this has the advantage that it's reversible, for bad encodings.

name_bytes #

Name of file on disk, as bytes.

We store this as bytes and not as strings because we have to support a multitude of original filesystems and encodings that create mutually invalid results.

original #

The original data found for this File.

parent property readonly #

Returns the ID of the parent directory.

parent_directory #

The directory containg this File.

size #

Size, taken from stat(), in bytes.

Methods#
__repr__(self) special #

String representation for a File is its filename, with non-UTF8 code points escaped with backslashes, truncated.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a File is its filename,
    with non-UTF8 code points escaped with backslashes, truncated.
    """
    name_bytes = self.name_bytes
    if isinstance(name_bytes, memoryview):
        name_bytes = name_bytes.tobytes()
    the_str = truncatechars(name_bytes.decode('utf8', errors='backslashreplace'), 60)
    return f'File({the_str})'
__str__(self) special #

String representation for a File is its filename, with non-UTF8 code points escaped with backslashes, truncated.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a File is its filename,
    with non-UTF8 code points escaped with backslashes, truncated.
    """
    name_bytes = self.name_bytes
    if isinstance(name_bytes, memoryview):
        name_bytes = name_bytes.tobytes()
    the_str = truncatechars(name_bytes.decode('utf8', errors='backslashreplace'), 60)
    return f'File({the_str})'
Methods#
get_queryset(self, request) #

Return a QuerySet of all model instances that can be edited by the admin site. This is used by changelist_view.

Source code in snoop/data/admin.py
def get_queryset(self, request):
    return super().get_queryset(request).annotate(
        name_str=PG_Encode(
            F("name_bytes"),
            Value('escape'),
            output_field=CharField(),
        )
    )
get_search_results(self, request, queryset, search_term) #

Override search results to look in the annotated field name_str.

Source code in snoop/data/admin.py
def get_search_results(self, request, queryset, search_term):
    """Override search results to look in the annotated field name_str."""
    # The results of the built-in search, based on search_fields
    queryset_a, _ = super().get_search_results(request, queryset, search_term)

    # Queryset B starts off equal to the original queryset with
    # anotations
    queryset_b = queryset.alias(
        name_str=PG_Encode(
            F("name_bytes"),
            Value('escape'),
            output_field=CharField(),
        )
    )
    # Filter out queryset_b on every search term
    for bit in smart_split(search_term):
        if bit.startswith(('"', "'")) and bit[0] == bit[-1]:
            bit = unescape_string_literal(bit)
        queryset_b = queryset_b.filter(Q(name_str__icontains=bit))

    # Return both querysets
    # Since we're doing 2 separate searches and combining them, it's
    # not impossible for there to be duplicates, so we set
    # may_have_duplicates return value to True, which will have Django
    # filter out the duplicates
    return (queryset_a | queryset_b), True
name_str(self, obj) #

Get converted name from annotated query.

Source code in snoop/data/admin.py
def name_str(self, obj):
    """Get converted name from annotated query."""
    with self.collection.set_current():
        return obj.name_str

DocumentUserTagAdmin #

Listing and detail views for the Tags.

EntityAdmin #

List and detail views for entities.

EntityHitAdmin #

List and detail views for entities.

EntityTypeAdmin #

List and detail views for types.

FileAdmin #

List and detail views for the files.

Classes#
ChildDirectoryInline #
Classes#
model #

Database model for a file directory.

Along with File, this comprises the file tree structure analyzed by Hoover. A Directory can be found in two places: in another Directory, or as the only child of some archive or archive-like file.

Attributes#
container_file #

The parent, if it's a file (archive, email-archive or something else), else NULL.

Mutually exclusive with snoop.data.models.Directory.parent_directory.

name property readonly #

Decodes the name of this Directory as UTF-8.

Escapes UTF-8 encoding errors with 'surrogateescape' - this has the advantage that it's reversible, for bad encodings.

name_bytes #

Name of directory on disk, as bytes.

We store this as bytes and not as strings because we have to support a multitude of original filesystems and encodings that create mutually invalid results.

parent property readonly #

Returns its parent, be it a File or Directory.

parent_directory #

The parent, if it is a directory, or NULL.

Mutually exclusive with snoop.data.models.Directory.container_file.

path_str property readonly #

Returns a string representation of its full path.

Methods#
__repr__(self) special #

String representation for this Directory is its full path.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for this Directory is its full path.
    """
    # ensure no display errors by replacing surrogates with backslashes
    name = self.path_str.encode('utf8', errors='surrogateescape')
    name = name.decode('utf8', errors='backslashreplace')
    return truncatechars(name, 70)
__str__(self) special #

String representation for this Directory is its full path.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for this Directory is its full path.
    """
    # ensure no display errors by replacing surrogates with backslashes
    name = self.path_str.encode('utf8', errors='surrogateescape')
    name = name.decode('utf8', errors='backslashreplace')
    return truncatechars(name, 70)
ancestry(item) #

Yields ancestors until root is found.

Source code in snoop/data/admin.py
def ancestry(item):
    """Yields ancestors until root is found.
    """
    while item:
        yield item
        item = item.parent
root() classmethod #

Get the root of the whole filesystem.

Exceptions:

Type Description
DoesNotExist

if table empty.

Source code in snoop/data/admin.py
@classmethod
def root(cls):
    """Get the root of the whole filesystem.

    Raises:
        DoesNotExist: if table empty.
    """
    return cls.objects.filter(
        parent_directory__isnull=True,
        container_file__isnull=True
    ).first()
Methods#
get_queryset(self, request) #

Return a QuerySet of all model instances that can be edited by the admin site. This is used by changelist_view.

Source code in snoop/data/admin.py
def get_queryset(self, request):
    return super().get_queryset(request).annotate(
        name_str=PG_Encode(
            F("name_bytes"),
            Value('escape'),
            output_field=CharField(),
        )
    )
get_search_results(self, request, queryset, search_term) #

Override search results to look in the annotated field name_str.

Source code in snoop/data/admin.py
def get_search_results(self, request, queryset, search_term):
    """Override search results to look in the annotated field name_str."""
    # The results of the built-in search, based on search_fields
    queryset_a, _ = super().get_search_results(request, queryset, search_term)

    # Queryset B starts off equal to the original queryset with
    # anotations
    queryset_b = queryset.alias(
        name_str=PG_Encode(
            F("name_bytes"),
            Value('escape'),
            output_field=CharField(),
        )
    )
    # Filter out queryset_b on every search term
    for bit in smart_split(search_term):
        if bit.startswith(('"', "'")) and bit[0] == bit[-1]:
            bit = unescape_string_literal(bit)
        queryset_b = queryset_b.filter(Q(name_str__icontains=bit))

    # Return both querysets
    # Since we're doing 2 separate searches and combining them, it's
    # not impossible for there to be duplicates, so we set
    # may_have_duplicates return value to True, which will have Django
    # filter out the duplicates
    return (queryset_a | queryset_b), True
name_str(self, obj) #

Get converted name from annotated query.

Source code in snoop/data/admin.py
def name_str(self, obj):
    """Get converted name from annotated query."""
    with self.collection.set_current():
        return obj.name_str

LanguageModelAdmin #

List and detail views for entities.

MultiDBModelAdmin #

Base class for an Admin that connects to a database different from "default".

The database is fetched from the thread-local memory using snoop.data.collections.current(). See that module for details on implementation and limitations.

Methods#
changelist_view(self, *args, **kwargs) #

The 'change list' admin view for this model.

Source code in snoop/data/admin.py
def changelist_view(self, *args, **kwargs):
    with self.collection.set_current():
        return super().changelist_view(*args, **kwargs)
delete_model(self, request, obj) #

Given a model instance delete it from the database.

Source code in snoop/data/admin.py
def delete_model(self, request, obj):
    # Tell Django to delete objects from the 'other' database
    obj.delete(using=self.using)
formfield_for_foreignkey(self, db_field, request, **kwargs) #

Get a form Field for a ForeignKey.

Source code in snoop/data/admin.py
def formfield_for_foreignkey(self, db_field, request, **kwargs):
    # Tell Django to populate ForeignKey widgets using a query
    # on the 'other' database.
    return super().formfield_for_foreignkey(db_field, request, using=self.using, **kwargs)
formfield_for_manytomany(self, db_field, request, **kwargs) #

Get a form Field for a ManyToManyField.

Source code in snoop/data/admin.py
def formfield_for_manytomany(self, db_field, request, **kwargs):
    # Tell Django to populate ManyToMany widgets using a query
    # on the 'other' database.
    return super().formfield_for_manytomany(db_field, request, using=self.using, **kwargs)
get_queryset(self, request) #

Return a QuerySet of all model instances that can be edited by the admin site. This is used by changelist_view.

Source code in snoop/data/admin.py
def get_queryset(self, request):
    # Tell Django to look for objects on the 'other' database.
    return super().get_queryset(request).using(self.using)
has_change_permission(self, request, obj = None) #

Return True if the given request has permission to change the given Django model instance, the default implementation doesn't examine the obj parameter.

Can be overridden by the user in subclasses. In such case it should return True if the given request has permission to change the obj model instance. If obj is None, this should return True if the given request has permission to change any object of the given type.

Source code in snoop/data/admin.py
def has_change_permission(self, request, obj=None):
    if not self.allow_change:
        return False
    # otherwise, check the django permissions
    return super().has_change_permission(request, obj)
has_delete_permission(self, request, obj = None) #

Return True if the given request has permission to change the given Django model instance, the default implementation doesn't examine the obj parameter.

Can be overridden by the user in subclasses. In such case it should return True if the given request has permission to delete the obj model instance. If obj is None, this should return True if the given request has permission to delete any object of the given type.

Source code in snoop/data/admin.py
def has_delete_permission(self, request, obj=None):
    if not self.allow_delete:
        return False
    # otherwise, check the django permissions
    return super().has_delete_permission(request, obj)
history_view(self, *args, **kwargs) #

The 'history' admin view for this model.

Source code in snoop/data/admin.py
def history_view(self, *args, **kwargs):
    with self.collection.set_current():
        return super().history_view(*args, **kwargs)
save_model(self, request, obj, form, change) #

Given a model instance save it to the database.

Source code in snoop/data/admin.py
def save_model(self, request, obj, form, change):
    # Tell Django to save objects to the 'other' database.
    obj.save(using=self.using)

OcrSourceAdmin #

Editable admin views for the OCR Sources.

These are manually managed through this interface. Management commands to rename / edit these also exist.

PG_Encode #

Proxy for the PG SQL function encode()

SnoopAdminSite #

Base AdminSite definition, adds list with links to all collection Admins.

Methods#
each_context(self, request) #

Return a dictionary of variables to put in the template context for every page in the admin site.

For sites running on a subpath, use the SCRIPT_NAME value if site_url hasn't been customized.

Source code in snoop/data/admin.py
def each_context(self, request):
    context = super().each_context(request)
    context['collection_links'] = get_admin_links()
    return context

TaskAdmin #

List and detail views for the Tasks with Retry action.

Classes#
NextInline #
Classes#
model #

Database model for tracking which Tasks depend on which.

Attributes#
name #

a string used to identify the kwarg name of this dependency

next #

the task that depends on prev

prev #

the task needed by another task

Methods#
__repr__(self) special #

String representation for a TaskDependency contains both task IDs and an arrow.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a TaskDependency contains both task IDs
    and an arrow.
    """
    return f'{self.prev} -> {self.next}'
__str__(self) special #

String representation for a TaskDependency contains both task IDs and an arrow.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a TaskDependency contains both task IDs
    and an arrow.
    """
    return f'{self.prev} -> {self.next}'
PrevInline #
Classes#
model #

Database model for tracking which Tasks depend on which.

Attributes#
name #

a string used to identify the kwarg name of this dependency

next #

the task that depends on prev

prev #

the task needed by another task

Methods#
__repr__(self) special #

String representation for a TaskDependency contains both task IDs and an arrow.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a TaskDependency contains both task IDs
    and an arrow.
    """
    return f'{self.prev} -> {self.next}'
__str__(self) special #

String representation for a TaskDependency contains both task IDs and an arrow.

Source code in snoop/data/admin.py
def __str__(self):
    """String representation for a TaskDependency contains both task IDs
    and an arrow.
    """
    return f'{self.prev} -> {self.next}'
Methods#
change_view(self, request, object_id, form_url = '', extra_context = None) #

Adds links to the detail page pointing to the Tasks this one depends on.

Source code in snoop/data/admin.py
def change_view(self, request, object_id, form_url='', extra_context=None):
    """Adds links to the detail page pointing to the Tasks this one depends on."""

    with self.collection.set_current():
        extra_context = extra_context or {}

        if object_id:
            obj = models.Task.objects.get(pk=object_id)
            extra_context['task_dependency_links'] = self.dependency_links(obj)

        return super().change_view(
            request, object_id, form_url, extra_context=extra_context,
        )
retry_selected_tasks(self, request, queryset) #

Action to retry selected tasks.

Source code in snoop/data/admin.py
def retry_selected_tasks(self, request, queryset):
    """Action to retry selected tasks."""

    tasks.retry_tasks(queryset)
    self.message_user(request, f"requeued {queryset.count()} tasks")

TaskDependencyAdmin #

Listing for dependencies between tasks.

These are skipped when using the TaskAdmin links, but looking at this table may still be interesting.

Functions#

Return markup with link pointing to the Admin Edit page for this Blob.

Source code in snoop/data/admin.py
def blob_link(blob_pk):
    """Return markup with link pointing to the Admin Edit page for this Blob."""

    url = reverse(f'{collections.current().name}:data_blob_change', args=[blob_pk])
    return mark_safe(f'<a href="{url}">{blob_pk[:10]}...{blob_pk[-4:]}</a>')

Creates a link to any other Data entry in the database.

It uses the auto generated urls from django admin and takes the description as input.

Parameters:

Name Type Description Default
model_name

The name of the model that the entry belongs to

required
pk

the pk of the object

required
url_description

The string that the link should show.

required
Source code in snoop/data/admin.py
def create_link(model_name, pk, url_description):
    """Creates a link to any other Data entry in the database.

    It uses the auto generated urls from django admin and takes the description
    as input.

    Args:
        model_name: The name of the model that the entry belongs to
        pk: the pk of the object
        url_description: The string that the link should show.
    """

    def escape(htmlstring):
        """Escape HTML tags in admin links."""
        # Stolen from https://stackoverflow.com/a/11550901
        escapes = {
            '\"': '&quot;',
            '\'': '&#39;',
            '<': '&lt;',
            '>': '&gt;',
        }
        # This is done first to prevent escaping other escapes.
        htmlstring = htmlstring.replace('&', '&amp;')
        for seq, esc in escapes.items():
            htmlstring = htmlstring.replace(seq, esc)
        return htmlstring

    url = reverse(f'{collections.current().name}:data_{model_name.lower()}_change', args=[pk])
    url_description = escape(str(url_description))
    return mark_safe(f'<a href="{url}">{url_description}</a>')

Yields tuples with admin site name and URL from the global sites.

Source code in snoop/data/admin.py
def get_admin_links():
    """Yields tuples with admin site name and URL from the global `sites`."""

    global sites
    for name in sorted(sites.keys()):
        yield name, f'/{settings.URL_PREFIX}admin/{name}/'

get_stats(force_reset = False) #

This function runs (and caches) expensive collection statistics.

Source code in snoop/data/admin.py
def get_stats(force_reset=False):
    """This function runs (and caches) expensive collection statistics."""

    col_name_hash = int(hash(collections.current().name))
    if collections.current().process:
        # default stats refresh rate once per 2 min
        REFRESH_AFTER_SEC = 120
        # add pseudorandom 0-40s
        REFRESH_AFTER_SEC += col_name_hash % 40
    else:
        # non-processed collection stats are only pulled once / week
        REFRESH_AFTER_SEC = 604800
        # add a pseudorandom 0-60min based on collection name
        REFRESH_AFTER_SEC += col_name_hash % 3600

    s, _ = models.Statistics.objects.get_or_create(key='stats')
    old_value = s.value
    duration = old_value.get('stats_collection_time', 1) if old_value else 1

    # ensure we don't fill up the worker with a single collection
    REFRESH_AFTER_SEC += duration * 2
    if force_reset or not old_value or time.time() - old_value.get('_last_updated', 0) > REFRESH_AFTER_SEC:
        s.value = _get_stats(old_value)
    else:
        log.info('skipping stats for collection %s, need to pass %s sec since last one',
                 collections.current().name,
                 REFRESH_AFTER_SEC)
    s.save()
    return s.value

get_task_matrix(task_queryset, prev_matrix = {}) #

Runs expensive database aggregation queries to fetch the Task matrix.

Included here are: counts aggregated by task function and status; average duration and ETA aggregated by function.

We estimate an ETA for every function type through a naive formula that counts the Tasks remaining and divides them by the average duration of Tasks finished in the previous 5 minutes. This is not precise for tasks that take more than 5 minutes to finish, so this value fluctuates.

Data is returned in a JSON-serializable python dict.

Source code in snoop/data/admin.py
def get_task_matrix(task_queryset, prev_matrix={}):
    """Runs expensive database aggregation queries to fetch the Task matrix.

    Included here are: counts aggregated by task function and status; average duration and ETA aggregated by
    function.

    We estimate an ETA for every function type through a naive formula that counts the Tasks remaining and
    divides them by the average duration of Tasks finished in the previous 5 minutes. This is not precise
    for tasks that take more than 5 minutes to finish, so this value fluctuates.

    Data is returned in a JSON-serializable python dict.
    """

    task_matrix = defaultdict(dict)

    for key, func in tasks.task_map.items():
        if func.queue:
            task_matrix[key]['queue'] = func.queue

    task_buckets_query = (
        task_queryset
        .values('func', 'status')
        .annotate(count=Count('*'))
    )
    for bucket in task_buckets_query:
        task_matrix[bucket['func']][bucket['status']] = bucket['count']

    # time frame in the past for which we pull tasks
    mins = 5
    # LIMIT the amount of rows we poll when doing the 4M query
    MAX_ROW_COUNT = 5000
    # Task table row takes about 5K in PG, and blob/data storage fetching does at least 8K of I/O
    SIZE_OVERHEAD = 13 * 2 ** 10
    # Overhead measured for NO-OP tasks; used here to make sure we never divide by 0
    TIME_OVERHEAD = 0.005
    RECENT_SPEED_KEY = str(mins) + 'm_avg_bytes_sec'
    AVG_WORKERS_KEY = str(mins) + 'm_avg_workers'

    task_5m_query = (
        task_queryset
        .filter(date_finished__gt=timezone.now() - timedelta(minutes=mins),
                status=models.Task.STATUS_SUCCESS)[:MAX_ROW_COUNT]
        .values('func')
        .annotate(count=Count('*'))
        .annotate(size=Sum('blob_arg__size'))
        .annotate(start=Min('date_started'))
        .annotate(end=Max('date_finished'))
        .annotate(time=Sum(F('date_finished') - F('date_started')))
    )
    for bucket in task_5m_query:
        row = task_matrix[bucket['func']]
        count = bucket['count']
        real_time = (bucket['end'] - bucket['start']).total_seconds()
        total_time = bucket['time'].total_seconds()
        fill_a = total_time / (real_time + TIME_OVERHEAD)
        fill_b = total_time / (mins * 60)
        fill = round((fill_a + fill_b) / 2, 3)
        # get total system bytes/sec in this period
        size = (bucket['size'] or 0) + SIZE_OVERHEAD * count
        bytes_sec = size / (total_time + TIME_OVERHEAD)
        row[str(mins) + 'm_count'] = count
        row[AVG_WORKERS_KEY] = fill
        row[str(mins) + 'm_avg_duration'] = total_time / count
        row[str(mins) + 'm_avg_size'] = size / count
        row[RECENT_SPEED_KEY] = bytes_sec

    for func in prev_matrix:
        for key in [RECENT_SPEED_KEY, AVG_WORKERS_KEY]:
            old = prev_matrix.get(func, {}).get(key, 0)
            # sometimes garbage appears in the JSON (say, if you edit it manually while working on it)
            if not isinstance(old, (int, float)):
                old = 0
            new = task_matrix.get(func, {}).get(key, 0)
            new = (old + new) / 2
            task_matrix[func][key] = round(new, 2)

    task_success_speed = (
        task_queryset
        .filter(date_finished__isnull=False, status=models.Task.STATUS_SUCCESS)
        .values('func')
        .annotate(size=Avg('blob_arg__size'))
        .annotate(avg_duration=Avg(F('date_finished') - F('date_started')))
        .annotate(total_duration=Sum(F('date_finished') - F('date_started')))
    )
    for bucket in task_success_speed:
        row = task_matrix[bucket['func']]
        row['success_avg_size'] = (bucket['size'] or 0) + SIZE_OVERHEAD
        row['success_avg_duration'] = bucket['avg_duration'].total_seconds() + TIME_OVERHEAD
        row['success_avg_bytes_sec'] = (row['success_avg_size']) / (row['success_avg_duration'])
        row['success_total_duration'] = int(bucket['total_duration'].total_seconds() + TIME_OVERHEAD)
    for func in prev_matrix:
        old = prev_matrix.get(func, {}).get('success_avg_bytes_sec', 0)
        # sometimes garbage appears in the JSON (say, if you edit it manually while working on it)
        if not isinstance(old, (int, float)):
            old = 0
        new = task_matrix.get(func, {}).get('success_avg_bytes_sec', 0)
        if not new and old > 0:
            task_matrix[func]['success_avg_bytes_sec'] = old

    exclude_remaining = [models.Task.STATUS_SUCCESS, models.Task.STATUS_BROKEN, models.Task.STATUS_ERROR]
    task_remaining_total_bytes = (
        task_queryset
        .exclude(status__in=exclude_remaining)
        .values('func')
        .annotate(size=Sum('blob_arg__size'))
        .annotate(count=Count('*'))
    )
    for bucket in task_remaining_total_bytes:
        row = task_matrix[bucket['func']]
        prev_matrix_row = prev_matrix.get(bucket['func'], {})

        row['remaining_size'] = (bucket['size'] or 0) + bucket['count'] * SIZE_OVERHEAD
        speed_success = row.get('success_avg_bytes_sec', 0)
        # the other one is measured over the previous few minutes;
        # average it with this one if it exists
        recent_speed = row.get(RECENT_SPEED_KEY, 0)
        if recent_speed:
            speed = (speed_success + recent_speed) / 2
        else:
            speed = speed_success

        MIN_AVG_WORKERS = 0.05
        if row.get(AVG_WORKERS_KEY, 0) <= MIN_AVG_WORKERS:
            row[AVG_WORKERS_KEY] = 0

        if speed and row.get('success_avg_duration') and row.get(AVG_WORKERS_KEY, 0) >= MIN_AVG_WORKERS:
            remaining_time = row['remaining_size'] / speed
            eta = remaining_time + row.get('pending', 0) * TIME_OVERHEAD
            # average with simple ETA (count * duration)
            eta_simple = bucket['count'] * row['success_avg_duration']
            eta = (eta + eta_simple) / 2

            # Set a small 0.01 default worker count instead of 0,
            avg_worker_count = row.get(AVG_WORKERS_KEY, 0) + 0.2

            # Divide by avg workers count for this task, to obtain multi-worker ETA.
            eta = eta / avg_worker_count

            # double the estimation, since 3X is too much
            eta = round(eta, 1) * 2

            # Add small time overhead for pending task types,
            # accounting for the 50s refresh interval in queueing new tasks
            # (the dispatcher periodic task) -- the average wait is half that.
            eta += 25

            # if available, average with previous value
            if prev_matrix_row.get('eta', 0) > 1:
                eta = (eta + prev_matrix_row.get('eta', 0)) / 2

            row['eta'] = eta
        else:
            # falloff ETA for tasks when no progress is happening
            row['eta'] = max(0, int(prev_matrix_row.get('eta', 0) / 3) - 500)

        if not collections.current().process:
            # reset ETA for disabled collections
            row['eta'] = 0

    return task_matrix

make_collection_admin_site(collection) #

Registeres all MultiDBModelAdmin classes with a new CollectionAdminSite.

Parameters:

Name Type Description Default
collection

the collection to bind this CollectionAdminSite to.

required
Source code in snoop/data/admin.py
def make_collection_admin_site(collection):
    """Registeres all MultiDBModelAdmin classes with a new CollectionAdminSite.
    Args:
        collection: the collection to bind this CollectionAdminSite to.
    """

    with collection.set_current():
        site = CollectionAdminSite(name=collection.name)
        site.site_header = f'collection "{collection.name}"'
        site.index_title = "task stats, logs, results"

        site.register(models.Directory, DirectoryAdmin)
        site.register(models.File, FileAdmin)
        site.register(models.Blob, BlobAdmin)
        site.register(models.Task, TaskAdmin)
        site.register(models.TaskDependency, TaskDependencyAdmin)
        site.register(models.Digest, DigestAdmin)
        site.register(models.DocumentUserTag, DocumentUserTagAdmin)
        site.register(models.OcrSource, OcrSourceAdmin)
        site.register(models.OcrDocument, MultiDBModelAdmin)
        site.register(models.Statistics, MultiDBModelAdmin)
        site.register(models.Entity, EntityAdmin)
        site.register(models.EntityHit, EntityHitAdmin)
        site.register(models.EntityType, EntityTypeAdmin)
        site.register(models.LanguageModel, LanguageModelAdmin)
        return site

raw_sql(query) #

Execute SQL string in current collection database.

Source code in snoop/data/admin.py
def raw_sql(query):
    """Execute SQL string in current collection database."""
    col = collections.current()
    with connections[col.db_alias].cursor() as cursor:
        cursor.execute(query)
        return cursor.fetchall()