Skip to content

Tasks

snoop.data.tasks #

Definition of Snoop Task System.

This is a simple set of wrappers around Celery functions to afford them stability, reproductability and result storage. Even while Celery has support for using "result backends" to store the task results, we didn't enjoy the fact that a power failure or unexpected server restart would wipe out our progress and be hard to predict. The solution is to mirror all information about running Tasks in a dedicated database, and use that as the source of thuth.

We also gain something else by mirroring Tasks inside a database table: the ability to de-duplicate running them, through locking their correspondent rows when running (SQL SELECT FOR UPDATE).

Another requirement for this system is the building of Directed Acyclic Graphs (DAGs) of Tasks. The edges of this graph should carry Task result data from parent task to child task.

As far as alternatives go: Apache Airflow is too slow (takes a few seconds just to run a simple task), Spotify Luigi does all the scheduling in memory (and can't scale to our needs for persistent de-duplication), and other K8s-oriented container-native solutions were not investigated. But as a rule of thumb, if it can't run 1000-5000 idle (no-op) Tasks per minute per CPU, it's too slow for our use.

Classes#

ExtraDependency #

Thrown by Task when it no longer depends on another Task it used to depend on.

This happens when a File was not identified correctly and now is; different parts of the Task graph must run on it.

MissingDependency #

Thrown by Task when it depends on another Task that is not finished.

SnoopTaskBroken #

Thrown by Task when died and should set status = "broken".

This is to be used from inside a Task function to mark permanent problems.

SnoopTaskError #

Thrown by Task when died and should set status = "error".

This is be used from inside a Task function to mark unexpected or temporary errors. These tasks will be retried after a while until finished.

Functions#

dispatch_directory_walk_tasks(directory_pk) #

Trigger processing of a specific directory.

Returns: A string that is the full path of the directory.

Source code in snoop/data/tasks.py
def dispatch_directory_walk_tasks(directory_pk):
    """Trigger processing of a specific directory.

    Returns: A string that is the full path of the directory.
    """

    from .filesystem import walk
    directory = models.Directory.objects.get(pk=directory_pk)
    assert directory, "Directory does not exist"
    walk.laterz(directory.pk)

dispatch_for(collection, func) #

Queue the next batches of Tasks for a given collection.

This is used to periodically look for new Tasks that must be executed. This queues: "pending" and "deferred" tasks left over from previous batches; then adds some Directories to revisit if the collection "sync" configuration is set. Finally, tasks that finished with a temporary error more than a predefined number of days ago are also re-queued with the intent of them succeeding.

The function tends to exit early if any Tasks were found to be queued, as to make sure the Tasks run in their natural order (and we're running dependencies before the tasks that require them). The tasks are queued by newest first, to make sure the tasks left over from previous batches are being finished first (again, to keep the natural order between batches).

Source code in snoop/data/tasks.py
@tracer.wrap_function()
def dispatch_for(collection, func):
    """Queue the next batches of Tasks for a given collection.

    This is used to periodically look for new Tasks that must be executed. This queues: "pending" and
    "deferred" tasks left over from previous batches; then adds some Directories to revisit if the
    collection "sync" configuration is set.  Finally, tasks that finished with a temporary error more than a
    predefined number of days ago are also re-queued with the intent of them succeeding.

    The function tends to exit early if any Tasks were found to be queued, as to make sure the Tasks run in
    their natural order (and we're running dependencies before the tasks that require them). The tasks are
    queued by newest first, to make sure the tasks left over from previous batches are being finished first
    (again, to keep the natural order between batches).
    """
    if not collection.process:
        logger.debug(f'dispatch: skipping "{collection}", configured with "process = False"')
        return

    with collection.set_current():
        # count tasks in Rabbit and on DB and check if we want to queue more
        queue_len = get_rabbitmq_queue_length_no_cache(rmq_queue_name(func))
        if queue_len == 0:
            # check if we have any queued tasks in the DB. if we do, they all need to be re-queued...
            db_invalid_queued_tasks = models.Task.objects.filter(func=func, status=models.Task.STATUS_QUEUED)
            if db_invalid_queued_tasks.exists():
                logger.warning(
                    "collection %s func %s: db has %s queued records, but rabbit has %s! resetting db...",
                    collection.name, func,
                    db_invalid_queued_tasks.count(),
                    queue_len,
                )
                db_invalid_queued_tasks.update(status=models.Task.STATUS_PENDING)
        db_tasks_remaining = _count_remaining_db_tasks_for_queue(func)
        if queue_len > 0:
            skip = False
            if queue_len >= settings.DISPATCH_MIN_QUEUE_SIZE or _is_rabbitmq_memory_full():
                skip = True
            if 0 < queue_len <= settings.DISPATCH_MIN_QUEUE_SIZE:
                # skip if we don't have many tasks left --> we would double queue the ones we have
                if db_tasks_remaining <= settings.DISPATCH_QUEUE_LIMIT:
                    skip = True
            if skip:
                logger.info(f'dispatch: skipping {collection}, has {queue_len} queued tasks on f = {func}')
                return

    logger.debug('Dispatching for %r, func = %s', collection, func)
    from .ocr import dispatch_ocr_tasks

    with collection.set_current():
        if dispatch_tasks(func, status=models.Task.STATUS_PENDING):
            # if we have enough tasks to not double queue,
            # queue the other end of the database too
            count_pending = _count_remaining_db_tasks_for_queue_and_status(
                func, models.Task.STATUS_PENDING,
                collections.current().name,
            )
            if count_pending > 3 * settings.DISPATCH_QUEUE_LIMIT:
                dispatch_tasks(func, status=models.Task.STATUS_PENDING, newest_first=False)
            logger.debug('%r found PENDING tasks, exiting...', collection)
            return True

        if func.startswith('filesystem'):
            count_before = models.Task.objects.count()
            dispatch_walk_tasks()
            dispatch_ocr_tasks()
            count_after = models.Task.objects.count()
            if count_before != count_after:
                logger.debug('%r initial dispatch added new tasks, exiting...', collection)
                return True

        # Re-try deferred tasks if we don't have anything in pending. This is to avoid a deadlock.
        # Try the oldest tasks first, since they are the most probable to have complete deps.
        if dispatch_tasks(func, status=models.Task.STATUS_DEFERRED, newest_first=False):
            logger.debug('%r found DEFERRED tasks, exiting...', collection)
            return True

        # retry outdated tasks
        if dispatch_tasks(func, outdated=True):
            logger.debug('%r found outdated tasks, exiting...', collection)
            return True

        if collection.sync and func in ['filesystem.walk', 'ocr.walk_source']:
            logger.debug("sync: retrying all walk tasks")
            # retry up oldest non-pending walk tasks that are older than 3 min

            retry_tasks(
                models.Task.objects
                .filter(func=func)
                .filter(date_modified__lt=timezone.now() - timedelta(minutes=3))
                .filter(status=models.Task.STATUS_SUCCESS)
                .order_by('date_modified')[:settings.SYNC_RETRY_LIMIT_DIRS],
                one_slice_only=True,
            )

        # retry errors
        for age_minutes, retry_limit in [
            (settings.TASK_RETRY_AFTER_MINUTES, settings.TASK_RETRY_FAIL_LIMIT),  # ~5min
            (settings.TASK_RETRY_AFTER_MINUTES * 30, settings.TASK_RETRY_FAIL_LIMIT * 2),  # ~1h
            (settings.TASK_RETRY_AFTER_MINUTES * 1000, settings.TASK_RETRY_FAIL_LIMIT * 3),  # ~5day
        ]:
            old_error_qs = (
                models.Task.objects
                .filter(func=func)
                .filter(status__in=[models.Task.STATUS_BROKEN, models.Task.STATUS_ERROR])
                .filter(fail_count__lt=retry_limit)
                .filter(date_modified__lt=timezone.now() - timedelta(minutes=age_minutes))
                .order_by('date_modified')[:settings.RETRY_LIMIT_TASKS]
            )
            if old_error_qs.exists():
                logger.info(f'{collection} found {old_error_qs.count()} ERROR|BROKEN tasks to retry')
                retry_tasks(old_error_qs, one_slice_only=True)
                return True

            # mark dead STARTED tasks as error (hangs / memory leaks / kills)
            old_started_qs = (
                models.Task.objects
                .filter(func=func)
                .filter(fail_count__lt=retry_limit)
                .filter(status__in=[models.Task.STATUS_STARTED])
                .filter(date_modified__lt=timezone.now() - timedelta(minutes=age_minutes))
                .order_by('date_modified')[:settings.RETRY_LIMIT_TASKS]
            )
            if old_started_qs.exists():
                logger.debug(f'{collection} found {old_started_qs.count()} old STARTED tasks to check')
                for started_task in old_started_qs:
                    if not is_task_running(started_task.pk):
                        logger.debug('marking task %s as Killed', started_task.pk)
                        tracer.count("task_killed")
                        started_task.status = models.Task.STATUS_BROKEN
                        started_task.error = "Task Killed"
                        started_task.broken_reason = "task_killed"
                        started_task.fail_count += 1
                        started_task.save()
                return True

    logger.debug(f'dispatch for collection "{collection.name}" done\n')

dispatch_tasks(func, status = None, outdated = None, newest_first = True) #

Dispatches (queues) a limited number of Task instances of each type.

Requires a collection to be selected. Does not dispatch tasks registered with bulk = True.

Queues one batch of settings.DISPATCH_QUEUE_LIMIT Tasks for every function type. The function types are shuffled before queuing, in an attempt to equalize the processing cycles for different collections running at the same time.

Parameters:

Name Type Description Default
status

the status used to filter Tasks to dispatch

None

Returns:

Type Description
bool

True if any tasks have been queued, False if none matching status have been found in current collection.

Source code in snoop/data/tasks.py
@tracer.wrap_function()
def dispatch_tasks(func, status=None, outdated=None, newest_first=True):
    """Dispatches (queues) a limited number of Task instances of each type.

    Requires a collection to be selected. Does not dispatch tasks registered with `bulk = True`.

    Queues one batch of `settings.DISPATCH_QUEUE_LIMIT` Tasks for every function type. The function types
    are shuffled before queuing, in an attempt to equalize the processing cycles for different collections
    running at the same time.
    Args:
        status: the status used to filter Tasks to dispatch

    Returns:
        bool: True if any tasks have been queued, False if none matching status have been found in current
        collection.
    """

    # with transaction.atomic(using=collections.current().db_alias):
    found_something = False
    task_query = models.Task.objects.filter(
        func=func, date_modified__lt=timezone.now() - timedelta(minutes=1)
    )
    if status:
        task_query = (
            task_query
            # .select_for_update(skip_locked=True)
            .filter(status=status)
        )
        item_str = status
    if outdated:
        task_query = task_query.exclude(version=task_map[func].version)
        item_str = f'OUTDATED (exclude version {task_map[func].version})'

    if newest_first:
        task_query = task_query.order_by('-date_modified')
    else:
        task_query = task_query.order_by('date_modified')

    # if outdated, use retry_tasks to mark everything as pending
    # from the start (to get actual ETA, not 99.99%)
    if outdated:
        found_something = task_query.exists()
        if found_something:
            logger.info(f'collection "{collections.current().name}": Dispatching {item_str} {func} tasks')  # noqa: E501
            retry_tasks(task_query, reset_fail_count=True)

    if found_something:
        return found_something

    task_query = task_query[:settings.DISPATCH_QUEUE_LIMIT]

    task_count = task_query.count()
    if not task_count:
        return found_something
    logger.info(f'collection "{collections.current().name}": Dispatching {task_count} {item_str} {func} tasks')  # noqa: E501

    for task in task_query.iterator():
        queue_task(task)
    found_something = True
    return found_something

dispatch_walk_tasks() #

Trigger processing of a collection, starting with its root directory.

Source code in snoop/data/tasks.py
def dispatch_walk_tasks():
    """Trigger processing of a collection, starting with its root directory.
    """

    from .filesystem import walk
    root = models.Directory.root()
    assert root, "root document not created"
    walk.laterz(root.pk)

do_nothing(name) #

No-op task, here for demonstration purposes.

Source code in snoop/data/tasks.py
@snoop_task('do_nothing', queue=None)
def do_nothing(name):
    """No-op task, here for demonstration purposes.

    """
    pass

get_bulk_tasks_to_run(reverse = False, exclude_deferred = False, deferred_only = False, lock = True) #

Checks current collection if we have bulk tasks run.

a tuple (TASKS, SIZES, MARKED) where:

  • TASKS is a dict, keyed by function name, containing a batch of tasks for that function
  • SIZES contains the total size, in bytes, for each task.
  • MARKED contains the count of tasks marked here as deferred (instead of being returned)
Source code in snoop/data/tasks.py
@tracer.wrap_function()
def get_bulk_tasks_to_run(reverse=False, exclude_deferred=False, deferred_only=False, lock=True):
    """Checks current collection if we have bulk tasks run.

    Returns: a tuple (TASKS, SIZES, MARKED) where:
        - TASKS is a dict, keyed by function name, containing a batch of tasks for that function
        - SIZES contains the total size, in bytes, for each task.
        - MARKED contains the count of tasks marked here as deferred (instead of being returned)
    """

    # Max number of tasks to pull.
    # We estimate extra ES metadata: 2 KB / task
    TASK_SIZE_OVERHEAD = 2000

    # stop looking in database after the first X tasks:
    MAX_BULK_TASK_COUNT = 300

    # Stop adding Tasks to bulk when current size is larger than this 30 MB
    MAX_BULK_SIZE = 30 * (2 ** 20)

    marked_deferred = 0

    import_snoop_tasks()

    def all_deps_finished(task):
        for dep in task.prev_set.all():
            if dep.prev.status not in [models.Task.STATUS_SUCCESS, models.Task.STATUS_BROKEN]:
                logger.debug('Task %s skipped because dep %s status is %s',
                             task, dep.prev, dep.prev.status)
                return False
            if dep.prev.version != task_map[dep.prev.func].version:
                logger.debug('Task %s skipped because dep %s version = %s, expected = %s',
                             task, dep.prev, dep.prev.version, task_map[dep.prev.func].version)
                return False
        return True

    all_functions = [
        x['func']
        for x in models.Task.objects.values('func').distinct()
        if x['func'] in task_map and task_map[x['func']].bulk
    ]
    task_list = {}
    task_sizes = {}
    for func in all_functions:
        task_list[func] = []
        task_sizes[func] = {}
        current_size = 0

        task_query = models.Task.objects
        if lock:
            task_query = task_query.select_for_update(skip_locked=True)
        task_query = (
            task_query
            .filter(func=func)
            # don't do anything to successful, up to date tasks
            .exclude(status=models.Task.STATUS_SUCCESS, version=task_map[func].version)
        )
        if exclude_deferred:
            task_query = task_query.exclude(status=models.Task.STATUS_DEFERRED)
        if deferred_only:
            task_query = task_query.filter(status=models.Task.STATUS_DEFERRED)

        if reverse:
            task_query = task_query.order_by('-date_modified')
        else:
            task_query = task_query.order_by('date_modified')

        for task in task_query[:MAX_BULK_TASK_COUNT]:
            # filter out any taks with non-completed dependencies
            # we could have done this in the DB query above, but it times out on weak machines
            if all_deps_finished(task):
                logger.debug('%s: Selected task %s', func, task)
                task_list[func].append(task)
                task_size = task.size() + TASK_SIZE_OVERHEAD
                current_size += task_size
                task_sizes[func][task.pk] = task_size
                if current_size > MAX_BULK_SIZE:
                    break
            else:
                # deps not finished ==> set this as DEFERRED
                task.status = models.Task.STATUS_DEFERRED
                task.save()
                marked_deferred += 1
        logger.warning('%s: Selected %s items with total size: %s', func, len(task_list[func]), current_size)

    if marked_deferred > 0:
        logger.debug('marked %s tasks as deferred.', marked_deferred)
    return task_list, task_sizes, marked_deferred

get_rabbitmq_queue_length_no_cache(q) #

Fetch queue length from RabbitMQ for a given queue.

Used periodically to decide if we want to queue more functions or not.

Uses the Management HTTP API of RabbitMQ, since the Celery client doesn not have access to these counts.

Source code in snoop/data/tasks.py
@tracer.wrap_function()
def get_rabbitmq_queue_length_no_cache(q):
    """Fetch queue length from RabbitMQ for a given queue.

    Used periodically to decide if we want to queue more functions or not.

    Uses the Management HTTP API of RabbitMQ, since the Celery client doesn not have access to these counts.
    """

    def _get_queue_depth(q):
        cl = pyrabbit.api.Client(
            settings.SNOOP_RABBITMQ_HTTP_URL,
            settings.SNOOP_RABBITMQ_HTTP_USERNAME,
            settings.SNOOP_RABBITMQ_HTTP_PASSWORD,
        )
        return cl.get_queue('/', q).get('messages', 0)

    try:
        return _get_queue_depth(q)
    except Exception as e:
        logger.warning('error when fetching queue depth: %s', e)
        return 0

is_completed(task) #

Returns True if Task is in the "success" or "broken" states, and if the task is at the latest version.

Parameters:

Name Type Description Default
task

will check task.status for values listed above

required
Source code in snoop/data/tasks.py
def is_completed(task):
    """Returns True if Task is in the "success" or "broken" states, and if the task is at the latest
    version.

    Args:
        task: will check `task.status` for values listed above
    """
    COMPLETED = [models.Task.STATUS_SUCCESS, models.Task.STATUS_BROKEN]
    return task.status in COMPLETED and task.version == task_map[task.func].version

is_task_running(task_pk) #

Check if a started task is still running, by trying to get the lock for it.

Source code in snoop/data/tasks.py
def is_task_running(task_pk):
    """Check if a started task is still running, by trying to get the lock for it."""

    with transaction.atomic(using=collections.current().db_alias):
        try:
            task = (
                models.Task.objects
                .select_for_update(nowait=True)
                .get(pk=task_pk)
            )
            logger.warning('got lock for task %s, task is DEAD', task.pk)
            return False
        except DatabaseError as e:
            logger.debug('task is RUNNING, error while fetching lock: %s', e)
            return True

laterz_snoop_task(col_name, task_pk, raise_exceptions = False) #

Celery task used to run snoop Tasks without duplication.

This function is using Django's select_for_update to ensure only one instance of a Task is running at one time. After running select_for_update to lock the row, this function will directly call into the main Task switch: run_task.

Parameters:

Name Type Description Default
col_name

name of collection where Task is found

required
task_pk

primary key of Task

required
raise_exceptions

if set, will propagate any Exceptions after Task.status is set to "error"

False
Source code in snoop/data/tasks.py
@celery.app.task
@tracer.wrap_function()
def laterz_snoop_task(col_name, task_pk, raise_exceptions=False):
    """Celery task used to run snoop Tasks without duplication.

    This function is using Django's `select_for_update` to
    ensure only one instance of a Task is running at one time.
    After running `select_for_update` to lock the row,
    this function will directly call into the main Task switch: `run_task`.

    Args:
        col_name: name of collection where Task is found
        task_pk: primary key of Task
        raise_exceptions: if set, will propagate any Exceptions after Task.status is set to "error"
    """
    import_snoop_tasks()
    col = collections.ALL[col_name]
    if not col.process:
        logger.warning('collection %s process=False, skipping task %s', col.name, task_pk)
        return

    logger.info('collection %s: init task %s', col_name, task_pk)

    def lock_children(task):
        """Lock all children tasks to make sure we can update them after the results."""
        next_tasks = [dep.next for dep in task.next_set.all()]
        for next_task in next_tasks:
            try:
                next_locked = (
                    models.Task.objects
                    .select_for_update(nowait=True)
                    .get(pk=next_task.pk)
                )
                logger.debug('locked child: %s', next_locked.pk)
            except Exception as e:
                logger.warning('failed to lock child: %s -> %s (%s)',
                               task.func, next_task.func, str(e))
                raise

    with snoop_task_log_handler() as handler:
        with col.set_current():
            # first tx & select for update: get task, set status STARTED, save, end tx (commit)
            with transaction.atomic(using=col.db_alias), tracer.span('task fetch mark started'):
                try:
                    task = (
                        models.Task.objects
                        .select_for_update(nowait=True)
                        .get(pk=task_pk)
                    )
                    lock_children(task)

                except DatabaseError as e:
                    logger.warning(
                        "collection %s: task %r already running (1st check), locked in db: %s",
                        col_name, task_pk, e,
                    )
                    tracer.count('task_already_running')
                    return
                except models.Task.DoesNotExist:
                    logger.error(
                        "collection %s: task pk=%s DOES NOT EXIST IN DB",
                        col_name, task_pk
                    )
                    tracer.count('task_not_found')
                    return

            _tracer_opt = {
                'attributes': {
                    'function': task.func,
                    'function_group': task.func.split('.')[0] if '.' in task.func else task.func,
                    'collection': collections.current().name,
                },
                'extra_counters': {
                    'size_bytes': {
                        "unit": "b",
                        "value": task.size(),
                    },
                },
            }

            with tracer.span('check if task already completed', **_tracer_opt):
                if is_completed(task):
                    logger.warning("%r already completed", task)
                    tracer.count('task_already_completed', **_tracer_opt)
                    queue_next_tasks(task)
                    return

            with tracer.span('check dependencies', **_tracer_opt):
                depends_on = {}

                all_prev_deps = list(task.prev_set.all())
                if any(dep.prev.status == models.Task.STATUS_ERROR for dep in all_prev_deps):
                    logger.warning("%r has a dependency in the ERROR state.", task)
                    task.update(
                        status=models.Task.STATUS_BROKEN,
                        error='',
                        broken_reason='dependency_has_error',
                        log=handler.stream.getvalue(),
                        version=task_map[task.func].version,
                    )
                    task.save()
                    queue_next_tasks(task, reset=True)
                    return

                for dep in all_prev_deps:
                    prev_task = dep.prev
                    if not is_completed(prev_task):
                        task.update(
                            status=models.Task.STATUS_DEFERRED,
                            error='',
                            broken_reason='',
                            log=handler.stream.getvalue(),
                            version=task_map[task.func].version,
                        )
                        task.save()
                        logger.warning("%r missing dependency %r", task, prev_task)
                        tracer.count("task_missing_dependency", **_tracer_opt)
                        queue_task(prev_task)
                        return

                    if prev_task.status == models.Task.STATUS_SUCCESS:
                        prev_result = prev_task.result
                    elif prev_task.status == models.Task.STATUS_BROKEN:
                        prev_result = SnoopTaskBroken(
                            prev_task.error,
                            prev_task.broken_reason
                        )
                    else:
                        raise RuntimeError(f"Unexpected status {prev_task.status}")

                    depends_on[dep.name] = prev_result

            with tracer.span('save state before run', **_tracer_opt):
                task.status = models.Task.STATUS_STARTED
                task.date_started = timezone.now()
                task.date_modified = timezone.now()
                task.date_finished = None
                task.save()

            # second tx & select for update: get task, run task
            with transaction.atomic(using=col.db_alias):
                with tracer.span('task fetch lock object'):
                    try:
                        task = (
                            models.Task.objects
                            .select_for_update(nowait=True)
                            .get(pk=task_pk)
                        )
                        lock_children(task)
                    except DatabaseError as e:
                        logger.error(
                            "collection %s: task %r already running (2nd check), locked in db: %s",
                            col_name, task_pk, e,
                        )
                        return
                logger.info('collection %s: executing task %s', col_name, task_pk)
                run_task(task, depends_on, handler, raise_exceptions, _tracer_opt)

queue_another_task(collection_name, func, *args, **kw) #

Queue a different task.

Decoupled from "queue_next_tasks" to remove ourselves from the database transaction concerning previous task.

Source code in snoop/data/tasks.py
@celery.app.task
@tracer.wrap_function()
def queue_another_task(collection_name, func, *args, **kw):
    """Queue a different task.

    Decoupled from "queue_next_tasks" to remove ourselves from the database transaction
    concerning previous task.
    """

    if _is_rabbitmq_memory_full():
        return

    with collections.ALL[collection_name].set_current():
        db_alias = collections.current().db_alias
        queue_length = get_rabbitmq_queue_length(rmq_queue_name(func))
        if queue_length < settings.DISPATCH_MAX_QUEUE_SIZE - QUEUE_ANOTHER_TASK_BATCH_COUNT:
            with tracer.span('queue another task of same type'), \
                    transaction.atomic(using=db_alias):
                tasks = (
                    models.Task.objects
                    .select_for_update(skip_locked=True)
                    .filter(status=models.Task.STATUS_PENDING, func=func)
                    .order_by('date_modified')[:int(QUEUE_ANOTHER_TASK_BATCH_COUNT)].all()
                )
                for task in tasks:
                    queue_task(task)

            with tracer.span('queue another task of a different type'), \
                    transaction.atomic(using=db_alias):
                tasks = (
                    models.Task.objects
                    .select_for_update(skip_locked=True)
                    .filter(status=models.Task.STATUS_PENDING).exclude(func=func)
                    .order_by('date_modified')[:int(QUEUE_ANOTHER_TASK_BATCH_COUNT)].all()
                )
                for task in tasks:
                    queue_task(task)

        if random.random() < 0.1:
            with tracer.span('queue some errors'), \
                    transaction.atomic(using=db_alias):
                for age_minutes, retry_limit in [
                    (settings.TASK_RETRY_AFTER_MINUTES, settings.TASK_RETRY_FAIL_LIMIT),  # ~5min
                    (settings.TASK_RETRY_AFTER_MINUTES * 30, settings.TASK_RETRY_FAIL_LIMIT * 2),  # ~1h
                    (settings.TASK_RETRY_AFTER_MINUTES * 1000, settings.TASK_RETRY_FAIL_LIMIT * 3),  # ~5day
                ]:
                    old_error_qs = (
                        models.Task.objects
                        .select_for_update(skip_locked=True)
                        .filter(func=func)
                        .filter(status__in=[models.Task.STATUS_BROKEN, models.Task.STATUS_ERROR])
                        .filter(fail_count__lt=retry_limit)
                        .filter(date_modified__lt=timezone.now() - timedelta(minutes=age_minutes))
                        .order_by('date_modified')[:QUEUE_ANOTHER_TASK_BATCH_COUNT].all()
                    )
                    for task in old_error_qs:
                        queue_task(task)
                        return

        if random.random() < 0.1:
            with tracer.span('mark some killed task'), \
                    transaction.atomic(using=db_alias):
                for age_minutes, retry_limit in [
                    (settings.TASK_RETRY_AFTER_MINUTES, settings.TASK_RETRY_FAIL_LIMIT),  # ~5min
                    (settings.TASK_RETRY_AFTER_MINUTES * 30, settings.TASK_RETRY_FAIL_LIMIT * 2),  # ~1h
                    (settings.TASK_RETRY_AFTER_MINUTES * 1000, settings.TASK_RETRY_FAIL_LIMIT * 3),  # ~5day
                ]:
                    # mark dead STARTED tasks as error (hangs / memory leaks / kills)
                    old_started_qs = (
                        models.Task.objects
                        .select_for_update(skip_locked=True)
                        .filter(func=func)
                        .filter(fail_count__lt=retry_limit)
                        .filter(status__in=[models.Task.STATUS_STARTED])
                        .filter(date_modified__lt=timezone.now() - timedelta(minutes=age_minutes))
                        .order_by('date_modified')[:QUEUE_ANOTHER_TASK_BATCH_COUNT].all()
                    )
                    for task in old_started_qs:
                        if not is_task_running(task.pk):
                            logger.debug('marking task %s as Killed', task.pk)
                            tracer.count("task_killed")
                            task.status = models.Task.STATUS_BROKEN
                            task.error = "Task Killed"
                            task.broken_reason = "task_killed"
                            task.fail_count += 1
                            task.save()
                            return

        if random.random() < 0.1:
            with tracer.span('queue some deferred'), \
                    transaction.atomic(using=db_alias):
                DEFERRED_WAIT_MIN = 15
                tasks = (
                    models.Task.objects
                    .select_for_update(skip_locked=True)
                    .filter(func=func)
                    .filter(status=models.Task.STATUS_DEFERRED,
                            date_modified__lt=timezone.now() - timedelta(minutes=DEFERRED_WAIT_MIN))
                    .order_by('date_modified')[:QUEUE_ANOTHER_TASK_BATCH_COUNT].all()
                )
                for task in tasks:
                    queue_task(task)

queue_next_tasks(task, reset = False) #

Queues all Tasks that directly depend on this one.

Also queues a pending task of the same type.

Parameters:

Name Type Description Default
task

will queue running all Tasks in task.next_set

required
reset

if set, will set next Tasks status to "pending" before queueing it

False
Source code in snoop/data/tasks.py
@tracer.wrap_function()
def queue_next_tasks(task, reset=False):
    """Queues all Tasks that directly depend on this one.

    Also queues a pending task of the same type.

    Args:
        task: will queue running all Tasks in `task.next_set`
        reset: if set, will set next Tasks status to "pending" before queueing it
    """
    logger.debug('queue next task for: %s', task)
    for next_dependency in task.next_set.all():
        next_task = next_dependency.next
        if reset:
            next_task.update(
                status=models.Task.STATUS_PENDING,
                error='',
                broken_reason='',
                log='',
                version=task_map[task.func].version,
            )
            next_task.save()

        if task_map[next_task.func].bulk:
            logger.debug("Not queueing bulk task %r after %r", next_task, task)
            continue

        logger.debug("Queueing %r after %r", next_task, task)
        queue_task(next_task)

    if settings.SNOOP_TASK_DISABLE_TAIL_QUEUE:
        return

    # batch things toghether probabilistically when calling queue_another_task
    if random.random() < 1 / QUEUE_ANOTHER_TASK_BATCH_COUNT:
        if _is_rabbitmq_memory_full():
            return
        if get_rabbitmq_queue_length(QUEUE_ANOTHER_TASK) < QUEUE_ANOTHER_TASK_LIMIT:
            queue_another_task.apply_async(
                (collections.current().name, task.func,),
                queue=QUEUE_ANOTHER_TASK,
                retry=False,
            )

queue_task(*a, **kw) #

Queue given Task with Celery to run on a worker.

If called from inside a transaction, queueing will be done after the transaction is finished succesfully.

Parameters:

Name Type Description Default
task

task to be queued in Celery

required
Source code in snoop/data/tasks.py
def queue_task(*a, **kw):
    """Queue given Task with Celery to run on a worker.

    If called from inside a transaction, queueing will be done after
    the transaction is finished succesfully.

    Args:
        task: task to be queued in Celery
    """
    # attempt to avoid locks by running the update outside the first transaction
    transaction.on_commit(lambda: _do_queue_task(*a, **kw))

remove_dependency(name, depends_on) #

Dynamically removes a dependency from running task.

This stops execution, removes the extra dependency in the Task loop and eventually executes this task again.

Source code in snoop/data/tasks.py
def remove_dependency(name, depends_on):
    """Dynamically removes a dependency from running task.

    This stops execution, removes the extra dependency in the Task loop and eventually executes this task
    again.
    """
    if name not in depends_on:
        return
    raise ExtraDependency(name)

require_dependency(name, depends_on, callback, return_error = False) #

Dynamically adds a dependency to running task.

Use this when a Task requires the result of another Task, but this is not known when queueing it.

Parameters:

Name Type Description Default
name

name of dependency

required
depends_on

current kwargs dict of function. If the name given is missing from this dict, then execution will be aborted (by throwing a MissingDependency error), and this Task will have its status set to "deferred". When the required task finishes running, this one will be re-queued.

required
callback

function that returns Task instance. Will only be called if the dependency was not found.

required
Source code in snoop/data/tasks.py
def require_dependency(name, depends_on, callback, return_error=False):
    """Dynamically adds a dependency to running task.

    Use this when a Task requires the result of another Task, but this is not known when queueing it.

    Args:
        name: name of dependency
        depends_on: current kwargs dict of function. If the name given is missing from this dict, then
            execution will be aborted (by throwing a MissingDependency error), and this Task will have its
            status set to "deferred". When the required task finishes running, this one will be re-queued.
        callback: function that returns Task instance. Will only be called if the dependency was not found.
    """
    if name in depends_on:
        result = depends_on[name]
        if isinstance(result, Exception) and not return_error:
            raise result
        return result

    task = callback()
    raise MissingDependency(name, task)

retry_task(task, fg = False) #

Resets task status, logs and error messages to their blank value, then re-queues the Task.

Source code in snoop/data/tasks.py
def retry_task(task, fg=False):
    """Resets task status, logs and error messages to their blank value, then re-queues the Task.
    """
    task.update(
        status=models.Task.STATUS_PENDING,
        error='',
        broken_reason='',
        log='',
        version=task_map[task.func].version,
    )
    logger.debug("Retrying %r", task)
    task.save()

    if fg:
        col = collections.from_object(task)
        laterz_snoop_task(col.name, task.pk, raise_exceptions=True)
    else:
        queue_task(task)

retry_tasks(queryset, reset_fail_count = False, one_slice_only = False) #

Efficient re-queueing of an entire QuerySet pointing to Tasks.

This is using bulk_update to reset the status, logs and error messages on the table; then only queues the first few thousand tasks.

Source code in snoop/data/tasks.py
@tracer.wrap_function()
def retry_tasks(queryset, reset_fail_count=False, one_slice_only=False):
    """Efficient re-queueing of an entire QuerySet pointing to Tasks.

    This is using bulk_update to reset the status, logs and error messages on the table; then only queues
    the first few thousand tasks."""

    # relatively low number to avoid memory leak / crash
    BATCH_SIZE = 1000

    logger.info('Retrying %s tasks...', queryset.count())

    task_count = queryset.count()
    first_batch = list(queryset.all()[0:BATCH_SIZE])

    if one_slice_only:
        fields = ['status', 'error', 'broken_reason', 'log', 'date_modified']
        for task in first_batch:
            task.status = models.Task.STATUS_PENDING
            task.error = ''
            task.broken_reason = ''
            task.log = ''
            task.date_modified = timezone.now()
            if reset_fail_count:
                task.fail_count = 0
        models.Task.objects.bulk_update(first_batch, fields, batch_size=BATCH_SIZE)
    else:
        update_options = {
            'status': models.Task.STATUS_PENDING,
            'error': '',
            'broken_reason': '',
            'log': '',
            'date_modified': timezone.now(),
        }
        if reset_fail_count:
            update_options['fail_count'] = 0
        queryset.update(**update_options)

    logger.info('Queueing first %s tasks...', task_count)
    for task in first_batch:
        queue_task(task)
    logger.info('Done queueing first %s tasks.', task_count)

    logger.info('100% done submitting tasks.')

returns_json_blob(func) #

Function decorator that returns a Blob with the JSON-encoded return value of the wrapped function.

Used in various Task functions to return results in JSON format, while also respecting the fact that Task results are always Blobs.

Warning

This function dumps the whole JSON at once, from memory, so this may have problems with very large JSON result sizes (>1GB) or dynamically generated results (from a generator).

Source code in snoop/data/tasks.py
def returns_json_blob(func):
    """Function decorator that returns a Blob with the JSON-encoded return value of the wrapped function.

    Used in various Task functions to return results in JSON format, while also respecting the fact that
    Task results are always Blobs.


    Warning:
        This function dumps the whole JSON at once, from memory, so this may have problems with very large
        JSON result sizes (>1GB) or dynamically generated results (from a generator).
    """

    @wraps(func)
    def wrapper(*args, **kwargs):
        rv = func(*args, **kwargs)
        return models.Blob.create_json(rv)

    return wrapper

rmq_queue_name(func, collection = None) #

Get rabbitmq name from function, collection. Collection is inferred by default.

Source code in snoop/data/tasks.py
def rmq_queue_name(func, collection=None):
    """Get rabbitmq name from function, collection.
    Collection is inferred by default.
    """
    return task_map[func].queue + '.' + func

run_bulk_tasks() #

Periodic task that runs some batches of bulk tasks for all collections. For each collection, we update the ES index refresh interval.

Source code in snoop/data/tasks.py
@celery.app.task
@tracer.wrap_function()
@flock
def run_bulk_tasks():
    """Periodic task that runs some batches of bulk tasks for all collections.
    For each collection, we update the ES index refresh interval."""

    if not single_task_running('run_bulk_tasks'):
        logger.warning('run_bulk_tasks function already running, exiting')
        return

    all_collections = list(collections.ALL.values())
    random.shuffle(all_collections)
    deadline = settings.SYSTEM_TASK_DEADLINE_SECONDS + time()
    for collection in all_collections:
        # if no tasks to do, continue
        with collection.set_current():
            if not collection.process:
                logger.debug(f'bulk tasks: skipping "{collection}", configured with "process = False"')
                continue

            if not have_bulk_tasks_to_run(reverse=False) and not have_bulk_tasks_to_run(reverse=True):
                logger.debug('Skipping collection %s, no bulk tasks to run', collection.name)
                continue

            # disable refreshing
            logger.debug('Disable index refresh for collection %s', collection.name)
            indexing.update_refresh_interval("-1")

            try:
                logger.debug('Running bulk tasks for collection %s', collection.name)
                _run_bulk_tasks_for_collection()
            except Exception:
                logger.error("Running bulk tasks failed for %s!", collection.name)
            finally:
                # restore default
                logger.debug('Enable index refresh for collection %s', collection.name)
                indexing.update_refresh_interval()
        if time() > deadline:
            break

run_dispatcher() #

Periodic Celery task used to queue next batches of Tasks for each collection.

We limit the total size of each queue on the message queue, since too many messages on the queue at the same time creates performance issues (because the message queue will need to use Disk instead of storing everything in memory, thus becoming very slow).

Source code in snoop/data/tasks.py
@celery.app.task
@tracer.wrap_function()
@flock
def run_dispatcher():
    """Periodic Celery task used to queue next batches of Tasks for each collection.

    We limit the total size of each queue on the message queue, since too many messages on the queue at the
    same time creates performance issues (because the message queue will need to use Disk instead of storing
    everything in memory, thus becoming very slow).
    """

    import_snoop_tasks()
    if not single_task_running('run_dispatcher'):
        logger.warning('run_dispatcher function already running, exiting')
        return

    collection_list = sorted(collections.ALL.values(), key=lambda x: x.name)
    func_list = sorted(set(f.func for f in task_map.values() if f.queue))
    random.shuffle(collection_list)
    random.shuffle(func_list)
    for collection in collection_list:
        logger.info(f'{"=" * 10} collection "{collection.name}" {"=" * 10}')
        try:
            for func in func_list:
                if func:
                    dispatch_for(collection, func)
        except Exception as e:
            logger.exception(e)

run_single_batch_for_bulk_task(reverse = False, exclude_deferred = False, deferred_only = False) #

Directly runs a single batch for each bulk task type.

Requires a collection to be selected. Does not dispatch tasks registered with bulk = False.

Returns:

Type Description
int

the number of Tasks completed successfully or marked as Deferred

Source code in snoop/data/tasks.py
@tracer.wrap_function()
def run_single_batch_for_bulk_task(reverse=False, exclude_deferred=False, deferred_only=False):
    """Directly runs a single batch for each bulk task type.

    Requires a collection to be selected. Does not dispatch tasks registered with `bulk = False`.

    Returns:
        int: the number of Tasks completed successfully or marked as Deferred
    """

    total_completed = 0
    all_task_list, all_task_sizes, marked = get_bulk_tasks_to_run(reverse, exclude_deferred, deferred_only)
    for func in all_task_list:
        task_list = all_task_list[func]
        task_sizes = all_task_sizes[func]
        current_size = sum(task_sizes.values())

        logger.debug('Running single batch of bulk tasks of type: %s', func)
        t0 = timezone.now()
        if not task_list:
            continue

        # set data on rows before running function
        for task in task_list:
            task.status = models.Task.STATUS_PENDING
            task.date_finished = None
            task.date_started = timezone.now()
            task.date_modified = timezone.now()
            task.log = ''
            task.broken_reason = ''
            task.version = task_map[func].version
            task.fail_count = 0
            task.error = ''
        models.Task.objects.bulk_update(task_list, [
            "status",
            "date_finished",
            "date_started",
            "date_modified",
            "log",
            "broken_reason",
            "version",
            "fail_count",
            "error",
        ])
        logger.debug(f"Pre-run save on Task objects took {(timezone.now() - t0).total_seconds():0.2f}s")

        # Run the bulk task. If it failed, mark all the items inside as failed. Otherwise, mark them as
        # succeeded.
        try:
            result = task_map[func](task_list)
        except Exception:
            logger.exception(f'Error running bulk task: "{func}"!')
            error = traceback.format_exc()[:2000]
            status = models.Task.STATUS_ERROR
            result = {}
        else:
            status = models.Task.STATUS_SUCCESS
            error = ''
            logger.debug(f"Successfully ran bulk of {len(task_list)} tasks, "
                         f"type {func}, size {pretty_size.pretty_size(current_size)}")

        t_elapsed = (timezone.now() - t0).total_seconds()

        # save results
        for task in task_list:
            task.status = status if result.get(task.blob_arg.pk) else models.Task.STATUS_BROKEN
            task.date_finished = timezone.now()
            # adjust date started so duration is scaled for task size
            current_task_size = task_sizes[task.pk]
            relative_duration = t_elapsed * current_task_size / current_size
            task.date_started = task.date_finished - timedelta(seconds=relative_duration)
            task.date_modified = timezone.now()
            task.error = error

        models.Task.objects.bulk_update(task_list, [
            "status",
            "date_finished",
            "date_started",
            "date_modified",
            "error",
        ])

        if status == models.Task.STATUS_SUCCESS:
            total_completed += len(task_list)

    return total_completed + marked

run_task(task, depends_on, log_handler, raise_exceptions = False, _tracer_opt = {}) #

Runs the main Task switch: get dependencies, run code, react to SnoopTaskErrors, save state and logs, queue next tasks.

Parameters:

Name Type Description Default
task

Task instance to run

required
log_handler

instance of log handler to dump results

required
raise_exceptions

if set, will propagate any Exceptions after Task.status is set to "error"

False
Source code in snoop/data/tasks.py
def run_task(task, depends_on, log_handler, raise_exceptions=False, _tracer_opt=dict()):
    """Runs the main Task switch: get dependencies, run code,
    react to `SnoopTaskError`s, save state and logs, queue next tasks.

    Args:
        task: Task instance to run
        log_handler: instance of log handler to dump results
        raise_exceptions: if set, will propagate any Exceptions after Task.status is set to "error"
    """
    with tracer.span('run_task', **_tracer_opt):
        tracer.count("task_started", **_tracer_opt)

        args = task.args
        if task.blob_arg:
            assert args[0] == task.blob_arg.pk
            args = [task.blob_arg] + args[1:]

        with tracer.span('run task function', **_tracer_opt):
            logger.debug("Running %r", task)
            t0 = time()
            try:
                func = task_map.get(task.func)
                if not func:
                    msg = "task func " + task.func + ' does not exist.'
                    raise SnoopTaskBroken(msg, 'unknown_task_func')

                with tracer.span('call function', **_tracer_opt):
                    if func.bulk:
                        result = func([task])
                    else:
                        result = func(*args, **depends_on)

                if result is not None:
                    if func.bulk:
                        assert isinstance(result, dict)
                        result_ok = result[task.blob_arg.pk]
                        if not result_ok:
                            raise RuntimeError('bulk task result not OK')
                    else:
                        assert isinstance(result, models.Blob)
                        task.result = result

            except MissingDependency as dep:
                with tracer.span('handle missing dependency', **_tracer_opt):
                    tracer.count("task_missing_dependency", **_tracer_opt)
                    msg = 'requests extra dependency: %r, dep = %r [%.03f s]' % (task, dep, time() - t0)
                    logger.debug(msg)

                    task.update(
                        status=models.Task.STATUS_DEFERRED,
                        error='',
                        broken_reason='',
                        log=log_handler.stream.getvalue(),
                        version=task_map[task.func].version,
                    )
                    task.prev_set.get_or_create(
                        prev=dep.task,
                        name=dep.name,
                    )
                    queue_task(task)

            except ExtraDependency as dep:
                with tracer.span('handle extra dependency', **_tracer_opt):
                    tracer.count("task_extra_dependency", **_tracer_opt)
                    msg = 'requests to remove dependency: %r, dep = %r [%.03f s]' % (task, dep, time() - t0)
                    logger.debug(msg)

                    task.prev_set.filter(
                        name=dep.name,
                    ).delete()
                    task.update(
                        status=models.Task.STATUS_PENDING,
                        error='',
                        broken_reason='',
                        log=log_handler.stream.getvalue(),
                        version=task_map[task.func].version,
                    )
                    queue_task(task)

            except SnoopTaskBroken as e:
                with tracer.span('handle task broken', **_tracer_opt):
                    tracer.count("task_broken", **_tracer_opt)
                    task.update(
                        status=models.Task.STATUS_BROKEN,
                        error="{}: {}".format(e.reason, e.args[0]),
                        broken_reason=e.reason,
                        log=log_handler.stream.getvalue(),
                        version=task_map[task.func].version,
                    )
                    msg = 'Broken: %r %s [%.03f s]' % (task, task.broken_reason, time() - t0)
                    logger.exception(msg)

            except ConnectionError as e:
                with tracer.span('handle connection error', **_tracer_opt):
                    tracer.count("task_connection_error", **_tracer_opt)
                    logger.exception(e)
                    task.update(
                        status=models.Task.STATUS_PENDING,
                        error=repr(e),
                        broken_reason='',
                        log=log_handler.stream.getvalue(),
                        version=task_map[task.func].version,
                    )

            except Exception as e:
                with tracer.span('handle unknown error', **_tracer_opt):
                    tracer.count("task_unknown_error", **_tracer_opt)
                    if isinstance(e, SnoopTaskError):
                        error = "{} ({})".format(e.args[0], e.details)
                    else:
                        error = repr(e)
                    logger.exception(e)
                    task.update(
                        status=models.Task.STATUS_ERROR,
                        error=error,
                        broken_reason='',
                        log=log_handler.stream.getvalue(),
                        version=task_map[task.func].version,
                    )

                    msg = 'Failed: %r  %s [%.03f s]' % (task, task.error, time() - t0)
                    logger.exception(msg)

                    if raise_exceptions:
                        raise
            else:
                with tracer.span('save success', **_tracer_opt):
                    tracer.count("task_success", **_tracer_opt)
                    logger.debug("Succeeded: %r [%.03f s]", task, time() - t0)
                    task.update(
                        status=models.Task.STATUS_SUCCESS,
                        error='',
                        broken_reason='',
                        log=log_handler.stream.getvalue(),
                        version=task_map[task.func].version,
                    )

            finally:
                with tracer.span('save state after run', **_tracer_opt):
                    task.date_finished = timezone.now()
                    task.save()

    if is_completed(task):
        queue_next_tasks(task, reset=True)

save_collection_stats() #

Run the expensive computations to get collection stats, then save result in database.

Source code in snoop/data/tasks.py
@tracer.wrap_function()
def save_collection_stats():
    """Run the expensive computations to get collection stats, then save result in database.
    """

    from snoop.data.admin import get_stats
    t0 = time()
    get_stats()
    logger.debug('stats for collection {} saved in {} seconds'.format(collections.current().name, time() - t0))  # noqa: E501

save_stats() #

Periodic Celery task used to save stats for all collections.

Source code in snoop/data/tasks.py
@celery.app.task
@tracer.wrap_function()
@flock
def save_stats():
    """Periodic Celery task used to save stats for all collections.
    """

    deadline = time() + settings.SYSTEM_TASK_DEADLINE_SECONDS

    if not single_task_running('save_stats'):
        logger.warning('save_stats function already running, exiting')
        return

    shuffled_col_list = list(collections.ALL.values())
    random.shuffle(shuffled_col_list)

    for collection in shuffled_col_list:
        if time() > deadline:
            break
        with collection.set_current():
            try:
                save_collection_stats()
            except Exception as e:
                logger.exception(e)
                continue

single_task_running(key) #

Queries both Celery and RabbitMQ to find out if the queue is completely idle.

Used by all periodic tasks to make sure only one instance is running at any given time. Tasks earlier in the queue will exit to make way for the ones that are later in the queue, to make sure the queue will never grow unbounded in size if the Task takes more time to run than its execution interval.

Source code in snoop/data/tasks.py
def single_task_running(key):
    """Queries both Celery and RabbitMQ to find out if the queue is completely idle.

    Used by all periodic tasks to make sure only one instance is running at any given time. Tasks earlier in
    the queue will exit to make way for the ones that are later in the queue, to make sure the queue will
    never grow unbounded in size if the Task takes more time to run than its execution interval.
    """
    return get_rabbitmq_queue_length_no_cache(key) <= 1

snoop_task(name, version = 0, bulk = False, queue = 'default') #

Decorator marking a snoop Task function.

Parameters:

Name Type Description Default
name

qualified name of the function, not required to be equal to Python module or function name (but recommended)

required
version

int, default zero. Statically incremented by programmer when Task code/behavior changes and Tasks need to be retried.

0
bulk

If set to True, completely deactivates queue_task on this function. This task will instead be scheduled periodically in batches. The function decorated with this flag will receive a single argument: a list of Task objects containing the individual tasks that need to be run.

False
Source code in snoop/data/tasks.py
def snoop_task(name, version=0, bulk=False, queue='default'):
    """Decorator marking a snoop Task function.

    Args:
        name: qualified name of the function, not required to be equal
            to Python module or function name (but recommended)
        version: int, default zero. Statically incremented by programmer when Task code/behavior changes and
            Tasks need to be retried.
        bulk: If set to True, completely deactivates queue_task on this function.
            This task will instead be scheduled periodically in batches. The function decorated with this
            flag will receive a single argument: a list of Task objects containing the individual
            tasks that need to be run.
    """

    def decorator(func):
        # add telemetry to all snoop tasks
        func = tracer.wrap_function()(func)

        def laterz(*args, depends_on={}, retry=False, queue_now=True, delete_extra_deps=False):
            """Actual function doing dependency checking and queueing.

            Args:
                args: positional function arguments
                depends_on: dict with strings mapping to Task instances that this one depends on (and uses
                    their results as keyword arguments) when calling the wrapped function.
                retry: if set, will reset this function even if it's been finished. Otherwise, this doesn't
                    re-trigger a finished function.
                queue_now: If set, will queue this task immediately (the default). Otherwise, tasks will not
                    be left on the queue, and they'll be picked up by the periodic task `run_dispatcher()`
                    in this module.
                delete_extra_deps: If set, will remove any dependencies that are not listed in `depends_on`.
                    Used for fixing dependency graph after its structure or the data evaluation changed.
            """

            if args and isinstance(args[0], models.Blob):
                blob_arg = args[0]
                args = (blob_arg.pk,) + args[1:]

            else:
                blob_arg = None

            task, created = models.Task.objects.get_or_create(
                func=name,
                args=args,
                blob_arg=blob_arg,
            )

            if depends_on:
                for dep_name, dep in depends_on.items():
                    _, created = task.prev_set.get_or_create(
                        prev=dep,
                        name=dep_name,
                    )
                    if created:
                        retry = True

            if delete_extra_deps:
                task.prev_set.exclude(name__in=depends_on.keys()).delete()

            if task.date_finished \
                    and task.status in [models.Task.STATUS_SUCCESS,
                                        models.Task.STATUS_BROKEN,
                                        models.Task.STATUS_ERROR]:
                if retry:
                    retry_task(task)
                return task

            if not bulk:
                if queue_now or ALWAYS_QUEUE_NOW:
                    queue_task(task)
                return task
            return task

        def delete(*args):
            """Delete the Task instance with given positional arguments.

            The Task arguments (the dependencies) are not used as primary keys for the Tasks, so they can't
            be used to filter for the Task to delete.

            Args:
                args: the positional arguemts used to fetch the Task.
            """
            if args and isinstance(args[0], models.Blob):
                blob_arg = args[0]
                args = (blob_arg.pk,) + args[1:]

            else:
                blob_arg = None

            task = models.Task.objects.get(
                func=name,
                args=args,
                blob_arg=blob_arg,
            )
            task.delete()

        func.laterz = laterz
        func.delete = delete
        func.version = version
        func.bulk = bulk
        func.queue = queue
        func.func = name
        task_map[name] = func
        return func

    return decorator

snoop_task_log_handler(level = 10) #

Context manager for a text log handler.

This captures in memory the entire log of running its context. It's used to capture Task logs in the database.

Parameters:

Name Type Description Default
level

log level, by default logging.DEBUG

10
Source code in snoop/data/tasks.py
@contextmanager
def snoop_task_log_handler(level=logging.DEBUG):
    """Context manager for a text log handler.

    This captures in memory the entire log of running its context.
    It's used to capture Task logs in the database.

    Args:
        level: log level, by default logging.DEBUG
    """
    formatter = logging.Formatter('%(asctime)s %(name)s [%(levelname)s]: %(message)s')
    stream = StringIO()
    handler = logging.StreamHandler(stream)
    handler.setLevel(level)
    handler.setFormatter(formatter)

    root_logger = logging.getLogger()
    # old_root_level = root_logger.level
    # root_logger.setLevel(level)
    root_logger.addHandler(handler)

    try:
        yield handler
    finally:
        handler.flush()
        root_logger.removeHandler(handler)
        # root_logger.setLevel(old_root_level)

update_all_tags() #

Periodic Celery task used to re-index documents with changed Tags.

This task ensures tag editing conflicts (multiple users editing tags for the same document at the same time) are fixed in a short time after indexing.

Source code in snoop/data/tasks.py
@celery.app.task
@tracer.wrap_function()
@flock
def update_all_tags():
    """Periodic Celery task used to re-index documents with changed Tags.

    This task ensures tag editing conflicts (multiple users editing tags for the same document at the same
    time) are fixed in a short time after indexing.
    """

    # circular import
    from . import digests

    if not single_task_running('update_all_tags'):
        logger.warning('run_all_tags function already running, exiting')
        return

    collection_list = sorted(collections.ALL.values(), key=lambda x: x.name)
    random.shuffle(collection_list)

    deadline = time() + settings.SYSTEM_TASK_DEADLINE_SECONDS
    for collection in collection_list:
        with collection.set_current():
            logger.debug('collection "%r": updating tags', collection)
            digests.update_all_tags()
        if time() > deadline:
            break