Skip to content

Runworkers

snoop.data.management.commands.runworkers #

Entrypoint for worker process.

Starts up a variable number of worker processes with Celery, depending on settings and available CPU count.

Classes#

Command #

Run celery worker

Methods#
add_arguments(self, parser) #

Adds flag to switch between running collection workers and system workers.

Source code in snoop/data/management/commands/runworkers.py
def add_arguments(self, parser):
    """Adds flag to switch between running collection workers and system workers."""
    parser.add_argument('--queue', default='default',
                        help="Run specific queue.")
    parser.add_argument('--count', type=int, default=1,
                        help="Worker processes to run (default 1).")
    parser.add_argument('--mem', type=int, default=500,
                        help=("If task exceeds this memory usage (in MB), "
                              "after finishing, it will restart."))
    parser.add_argument('--solo', action="store_true",
                        help=("Run a single worker with celery solo pool."
                              "Useful to kill container resources when task is killed."))
handle(self, *args, **options) #

Runs workers for either collection processing or system tasks.

Source code in snoop/data/management/commands/runworkers.py
def handle(self, *args, **options):
    """Runs workers for either collection processing or system tasks."""

    logging_for_management_command()

    tasks.import_snoop_tasks()
    all_queues = []
    if options['queue'] == 'system':
        all_queues = settings.SYSTEM_QUEUES
    elif options['queue'] == 'queues':
        all_queues.append(tasks.QUEUE_ANOTHER_TASK)
    elif options['queue']:
        all_queues.extend(rmq_queues_for(options['queue']))
        # every worker can run digests and filesystem and ocr (if enabled)
        all_queues.extend(rmq_queues_for('digests'))
        all_queues.extend(rmq_queues_for('filesystem'))
        all_queues.extend(rmq_queues_for('default'))

        if settings.OCR_ENABLED:
            all_queues.extend(rmq_queues_for('ocr'))

        if options['queue'] == 'default':
            all_queues.extend(rmq_queues_for('default'))
            all_queues.extend(rmq_queues_for('filesystem'))
            all_queues.extend(rmq_queues_for('ocr'))
            all_queues.extend(rmq_queues_for('digests'))

            all_queues.extend(rmq_queues_for('img-cls'))
            all_queues.extend(rmq_queues_for('entities'))
            all_queues.extend(rmq_queues_for('translate'))
            all_queues.extend(rmq_queues_for('thumbnails'))
            all_queues.extend(rmq_queues_for('pdf-preview'))

        all_queues.append(tasks.QUEUE_ANOTHER_TASK)
    else:
        raise RuntimeError('no queue given')

    all_queues = list(set(all_queues))
    random.shuffle(all_queues)

    worker_name = options['queue'] + str(random.randint(1, 10000)) + '@%h'
    argv = celery_argv(queues=all_queues, solo=options.get('solo'),
                       count=options['count'], mem_limit_mb=options['mem'],
                       name=worker_name)
    log.info('+' + ' '.join(argv))
    os.execv(argv[0], argv)

Functions#

celery_argv(queues, solo, count, mem_limit_mb, name) #

Builds the command line to run a celery worker process.

Source code in snoop/data/management/commands/runworkers.py
def celery_argv(queues, solo, count, mem_limit_mb, name):
    """Builds the command line to run a `celery worker` process."""

    celery_binary = (
        subprocess.check_output(['which', 'celery'])
        .decode('latin1')
        .strip()
    )

    loglevel = 'info' if settings.DEBUG else 'warning'
    argv = [
        celery_binary,
        '-A', 'snoop.data',
        'worker',
        '-E',
        '-n', name,
        '--pidfile=',
        f'--loglevel={loglevel}',
        '-Ofair',
        '--without-gossip', '--without-mingle',
        '--max-tasks-per-child', str(settings.WORKER_TASK_LIMIT),
        # '--max-tasks-per-child', str(1),
        '--max-memory-per-child', str(mem_limit_mb * 1024),
        '--prefetch-multiplier', str(settings.WORKER_PREFETCH),
        '--soft-time-limit', '216000',  # 60h
        '--time-limit', '230400',  # 64h
        '-Q', ','.join(queues),
    ]

    if solo:
        argv += ['-P', 'solo']
    else:
        argv += ['-P', 'prefork', '-c', str(count)]

    return argv

rmq_queues_for(queue) #

Return the rabbitmq complete queue names, given the queue category (the queue argument of @snoop_task).

Source code in snoop/data/management/commands/runworkers.py
def rmq_queues_for(queue):
    """Return the rabbitmq complete queue names, given
    the queue category (the queue argument of @snoop_task).
    """
    lst = [
        tasks.rmq_queue_name(func)
        for func in tasks.task_map
        if tasks.task_map[func].queue == queue
    ]
    return list(set(lst))