Skip to content

Collections

snoop.data.collections #

Manage data into separate collections (and corresponding indexes, databases, and object storages) under one Django instance.

Each Collection is bound to one PostgreSQL database, one Elasticsearch index, and one object storage location (known as "blob directory"). This module defines operations to create and remove each of these, as well as to list every resource on this server.

When writing any data-oriented code, a Collection must be selected (in order to know the correct database, index and object storage to use). This is done through the context manager Collection.set_current(). Inside a collection context, collection.current() will return the collection set in the context manager, and any Model can be used with Django ORM and will use that collection's database.

Internally, this is stored in Python's threading.local memory on entering the context manager, and fetched from that same local memory whenever required inside the context. This means we can do multi-threaded work on different collections at different points in time, from the same process. This also means we sometimes have to patch Django's different admin, database and framework classes to either read or write to our current collection storage.

The list of collections is static and supplied through a single dict in settings.SNOOP_COLLECTIONS. This means a Django server restart is required whenever the collection count or configuration is changed.

This module creates Collection instances from the setting and stores them in a global called ALL. This global is usually used in management commands to select the collection requested by the user.

Attributes#

ALL #

Global dictionary storing all the collections.

ALL_TESSERACT_LANGS #

Global list with all the available OCR languages.

Classes#

Collection #

Model for managing collection resources: SQL databases, ES indexes, object storage.

Accepts additional settings for switching off processing, switching on periodic sync of the dataset, OCR language list, index-level settings.

The collection name is restricted to a very simple format and used directly to obtain: a PG database name, an ES index name, and two folders on disk: one directly under the DATA_DIR, with the initial dataset, and another one directly under the BLOB_DIR, used to store all the binary data for snoop.data.models.Blob objects. All these names and paths are retrieved as properties on the Collection object.

Attributes#
db_alias property readonly #

Name of SQL database alias for this collection.

Identical to db_name above.

Todo

investigate merging this property and db_name.

db_name property readonly #

Name of SQL database for this collection.

es_index property readonly #

Name of elasticsearch index for this collection.

queue_name property readonly #

Name of message queue for this collection.

Methods#
__init__(self, name, process = False, sync = False, **opt) special #

Initialize object.

Exceptions:

Type Description
AssertionError

if OCR language configuration is incorrect.

Source code in snoop/data/collections.py
def __init__(self, name, process=False, sync=False, **opt):
    """Initialize object.

    Raises:
        AssertionError: if OCR language configuration is incorrect.
    """

    self.name = name
    self.process = process
    self.sync = sync and process
    self.ocr_languages = opt.get('ocr_languages', [])
    self.max_result_window = opt.get('max_result_window', 10000)
    self.refresh_interval = opt.get('refresh_interval', "1s")
    self.opt = opt

    for lang_grp in self.ocr_languages:
        assert lang_grp.strip() != ''
        for lang in lang_grp.split('+'):
            assert lang in ALL_TESSERACT_LANGS, \
                f'language code "{lang}" is not available'

    # parse default table heads: different variante
    table_headers = self.opt.get('default_table_header', '').strip()
    variant_list = [[col.strip() for col in variant.split(':')] for variant in table_headers.split(';')]
    self.default_table_head_by_len = {
        len(variant): variant
        for variant in variant_list
        if len(variant) > 1
    }
    self.explode_table_rows = self.opt.get('explode_table_rows', False)
__repr__(self) special #

String representation for a Collection.

Source code in snoop/data/collections.py
def __repr__(self):
    """String representation for a Collection.
    """

    return f"<Collection {self.name} process={self.process} sync={self.sync}>"
migrate(self) #

Run django migrate on this collection's database.

Source code in snoop/data/collections.py
def migrate(self):
    """Run `django migrate` on this collection's database."""

    management.call_command('migrate', '--database', self.db_alias)
mount_blobs_root(self, readonly = True) #

Mount the whole blob root directory under a temporary path, using s3-fuse.

Another temporary directory is created to store the cache.

Source code in snoop/data/collections.py
@contextmanager
def mount_blobs_root(self, readonly=True):
    """Mount the whole blob root directory under a temporary path, using s3-fuse.

    Another temporary directory is created to store the cache."""

    mount_mode = 'ro' if readonly else 'rw'
    yield get_mount(
        mount_name=f'{self.name}-{mount_mode}-blobs',
        bucket=self.name,
        mount_mode=mount_mode,
        **(self.blobs_s3_connection_settings),
    )
mount_collections_root(self, readonly = True) #

Mount the whole collections root directory under a temporary path, using s3-fuse.

Another temporary directory is created to store the cache.

Source code in snoop/data/collections.py
@contextmanager
def mount_collections_root(self, readonly=True):
    """Mount the whole collections root directory under a temporary path, using s3-fuse.

    Another temporary directory is created to store the cache."""

    mount_mode = 'ro' if readonly else 'rw'
    yield get_mount(
        mount_name=f'{self.name}-{mount_mode}-collections',
        bucket=self.name,
        mount_mode=mount_mode,
        access_key=settings.SNOOP_COLLECTIONS_MINIO_ACCESS_KEY,
        secret_key=settings.SNOOP_COLLECTIONS_MINIO_SECRET_KEY,
        address=settings.SNOOP_COLLECTIONS_MINIO_ADDRESS,
    )
set_current(self) #

Creates context where this collection is the current one.

Running this is required to access any of the collection's data from inside database tables.

Source code in snoop/data/collections.py
@contextmanager
def set_current(self):
    """Creates context where this collection is the current one.

    Running this is required to access any of the collection's data from inside database tables.
    """

    old = getattr(threadlocal, 'collection', None)
    assert old in (None, self), \
        "There is already a current collection"
    try:
        threadlocal.collection = self
        logger.debug("WITH collection = %s BEGIN", self)
        yield
    finally:
        logger.debug("WITH collectio = %s END", self)
        assert threadlocal.collection is self, \
            "Current collection has changed!"
        threadlocal.collection = old
        # this causes some tests with rollbacks to fail
        # if old is None:
        #     close_old_connections()

CollectionsRouter #

Django database router.

Uses the current collection's .db_alias to decide what database to route the reads and writes to.

Methods#
allow_migrate(self, db, app_label, model_name = None, **hints) #

Snoop models not allowed in 'default'; other models not allowed in collection_* databases

Source code in snoop/data/collections.py
def allow_migrate(self, db, app_label, model_name=None, **hints):
    """
    Snoop models not allowed in 'default'; other models not allowed
    in collection_* databases
    """
    if db == 'default':
        return (app_label not in self.snoop_app_labels)
    else:
        return (app_label in self.snoop_app_labels)

Functions#

all_collection_dbs() #

List all the collection databases by asking postgres.

Source code in snoop/data/collections.py
def all_collection_dbs():
    """List all the collection databases by asking postgres.
    """
    with connection.cursor() as conn:
        conn.execute('SELECT datname FROM pg_database WHERE datistemplate = false')
        return [name for (name,) in conn.fetchall() if name.startswith('collection_')]

create_databases() #

Go through snoop.data.collections.ALL and create databases that don't exist yet.

Source code in snoop/data/collections.py
def create_databases():
    """Go through [snoop.data.collections.ALL][] and create databases that don't exist yet."""

    dbs = all_collection_dbs()
    for col in ALL.values():
        if col.db_name not in dbs:
            logger.info(f'Creating database {col.db_name}')
            with connection.cursor() as conn:
                conn.execute(f'CREATE DATABASE "{col.db_name}"')

create_es_indexes() #

Create elasticsearch indexes for everything in snoop.data.collections.ALL

Source code in snoop/data/collections.py
def create_es_indexes():
    """Create elasticsearch indexes for everything in [snoop.data.collections.ALL][]"""

    from snoop.data import indexing
    for col in ALL.values():
        with col.set_current():
            if not indexing.index_exists():
                logger.info(f'Creating index {col.es_index}')
                indexing.create_index()
            indexing.update_mapping()

create_roots() #

Creates a root directory (bucket) for the collection in the blob directory.

Also creates a root document entry for the input data, so we have something to export.

Source code in snoop/data/collections.py
def create_roots():
    """Creates a root directory (bucket) for the collection in the blob directory.

    Also creates a root document entry for the input data, so we have something to export.
    """

    from .models import Directory

    for col in ALL.values():
        with transaction.atomic(using=col.db_alias), col.set_current():
            if settings.BLOBS_S3.bucket_exists(col.name):
                logger.info('found bucket %s', col.name)
            else:
                logger.info('creating bucket %s', col.name)
                settings.BLOBS_S3.make_bucket(col.name)
                settings.BLOBS_S3.put_object(col.name, 'tmp/dummy', io.BytesIO(b"hello"), length=5)

            root = Directory.root()
            if not root:
                root = Directory.objects.create()
                logger.info(f'Creating root document {root} for collection {col.name}')

current() #

Get back the Collection that was set in the Collections.set_current() context manager.

Raises if not called from inside the Collections.set_current() context.

Source code in snoop/data/collections.py
def current():
    """Get back the Collection that was set in the `Collections.set_current()` context manager.

    Raises if not called from inside the `Collections.set_current()` context.
    """
    col = getattr(threadlocal, 'collection', None)
    assert col is not None, "There is no current collection set"
    return col

drop_db(db_name) #

Run the SQL DROP DATABASE SQL command.

Source code in snoop/data/collections.py
def drop_db(db_name):
    """Run the SQL `DROP DATABASE SQL` command.
    """
    logger.warning('DROPPPING SQL DATABASE %s', db_name)
    with connection.cursor() as conn:
        # https://dba.stackexchange.com/questions/11893/force-drop-db-while-others-may-be-connected/11895#11895
        # stop new connections to db
        conn.execute(f'ALTER DATABASE "{db_name}" CONNECTION LIMIT 0;')
        conn.execute(f"UPDATE pg_database SET datallowconn = 'false' WHERE datname = '{db_name}';")
        # delete existing connections to db
        conn.execute(f"SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{db_name}';")
        # drop database
        conn.execute(f'DROP DATABASE "{db_name}";')
    logger.info('SQL DATABASE %s DROPPED', db_name)

from_object(obj) #

Get the collection from an instance of an object.

Source code in snoop/data/collections.py
def from_object(obj):
    """Get the collection from an instance of an object."""

    db_alias = obj._state.db
    assert db_alias.startswith('collection_')
    return ALL[db_alias.split('_', 1)[1]]

migrate_databases() #

Run database migrations for everything in snoop.data.collections.ALL

Source code in snoop/data/collections.py
def migrate_databases():
    """Run database migrations for everything in [snoop.data.collections.ALL][]"""

    for col in ALL.values():
        try:
            logger.info(f'Migrating database {col.db_name}')
            col.migrate()
        except Exception as e:
            logger.exception(e)
            logger.error("Failed to migrate database {col.db_name}")
            raise