Skip to content

Entities

snoop.data.analyzers.entities #

Tasks for entity extraction and language detection.

Attributes#

ENTITIES_MIN_SPEED_BPS #

Minimum reference speed for this task. Saved as 10% of the Average Success Speed in the Admin UI. The timeout is calculated using this value, the request file size, and the previous TIMEOUT_BASE constant.

ENTITIES_SUPPORTED_LANGUAGE_CODES #

Supported 2 letter language codes for entity extraction.

ENTITIES_TIMEOUT_BASE #

Minimum number of seconds to wait for this service.

ENTITIES_TIMEOUT_MAX #

Maximum number of seconds to wait for this service. For the entities, this comes out at about 5MB of text / request at the min speed.

This should be double the timeout of the service itself, to account for a queue in front of it.

LANGUAGE_CODE_MAP #

Maps ISO 639-1 Codes from Polyglot to ISO 639-2 Codes from Tesseract. tesseract-ocr.github.io/tessdoc/Data-Files-in-different-versions.html

MAX_ENTITY_COUNT_PER_DOC #

Stop looking at entitites in a document after this many distinct results.

MAX_ENTITY_TEXT_LENGTH #

Truncate entities after this length.

MAX_LANGDETECT_DOC_READ #

Max text length to read when running language detection.

TRANSLATION_MIN_SPEED_BPS #

Minimum reference speed used for translation tasks.

TRANSLATION_SUPPORTED_LANGUAGES #

Languages that can be translated to one another, including code and name. Taken copy/paste from the LibreTranslate API /languages route.

TRANSLATION_TIMEOUT_BASE #

Minimum number of seconds to wait for translation service.

TRANSLATION_TIMEOUT_MAX #

Maximum number of seconds to wait for translation service.

Functions#

call_nlp_server(endpoint, data_dict, timeout = 1200) #

Calls the nlp server with data at an endpoint

Sends a request to the nlp service with a specified endpoint and a data load as json. The two endpoints are language_detection and entity extraction at the moment. The data_dict is a dictionary which contains the request. It needs to contain a key text and can optionally also contain a key language, if the language is already specified. The response of the service will be JSON, which is decoded before returning.

Parameters:

Name Type Description Default
endpoint

The name of the server endpoint that's called.

required
data_dict

A dictionary containing data for a post request.

required

Returns:

Type Description

The parsed JSON-response of the server

Exceptions:

Type Description
ConnectionError

If the server was not found

RuntimeError

If the server returns with an unexpected result.

NotImplementedError

If the server is not able to process the request.

Source code in snoop/data/analyzers/entities.py
def call_nlp_server(endpoint, data_dict, timeout=ENTITIES_TIMEOUT_MAX):
    """Calls the nlp server with data at an endpoint

    Sends a request to the nlp service with a specified endpoint and a data load as json.
    The two endpoints are `language_detection` and `entity extraction` at the moment.
    The `data_dict` is a dictionary which contains the request. It needs to contain a key `text`
    and can optionally also contain a key `language`, if the language is already specified.
    The response of the service will be JSON, which is decoded before returning.

    Args:
        endpoint: The name of the server endpoint that's called.
        data_dict: A dictionary containing data for a post request.

    Returns:
        The parsed JSON-response of the server

    Raises:
        ConnectionError: If the server was not found
        RuntimeError: If the server returns with an unexpected result.
        NotImplementedError: If the server is not able to process the request.
    """
    url = f'{settings.SNOOP_NLP_URL}/{endpoint}'
    resp = requests.post(url, json=data_dict, timeout=timeout)

    if resp.status_code != 200 or resp.headers['Content-Type'] != 'application/json':
        raise SnoopTaskBroken(resp.text, 'nlp_http_' + str(resp.status_code))
    return resp.json()

call_translate_server(endpoint, data_dict, timeout = 1200) #

Calls theh translation server with given arguments.

Source code in snoop/data/analyzers/entities.py
def call_translate_server(endpoint, data_dict, timeout=TRANSLATION_TIMEOUT_MAX):
    """Calls theh translation server with given arguments."""

    url = f'{settings.TRANSLATION_URL}/{endpoint}'
    resp = requests.post(url, json=data_dict, timeout=timeout)

    if resp.status_code != 200 or resp.headers['Content-Type'] != 'application/json':
        raise SnoopTaskBroken(resp.text, 'translate_http_' + str(resp.status_code))
    return resp.json()

can_extract_entities(lang) #

Checks if we can extract entities from this language.

Source code in snoop/data/analyzers/entities.py
def can_extract_entities(lang):
    """Checks if we can extract entities from this language."""
    return lang in ENTITIES_SUPPORTED_LANGUAGE_CODES

can_translate(lang) #

Checks if we can translate this language.

Source code in snoop/data/analyzers/entities.py
def can_translate(lang):
    """Checks if we can translate this language."""
    return lang in TRANSLATION_SUPPORTED_LANGUAGE_CODES

clean_entity_text(text) #

Try and sanitize the entity text a little bit.

Does the following: - limit text length - replaces newlines with spaces - folds multiple spaces - strips spaces at start/end

Source code in snoop/data/analyzers/entities.py
def clean_entity_text(text):
    """Try and sanitize the entity text a little bit.

    Does the following:
        - limit text length
        - replaces newlines with spaces
        - folds multiple spaces
        - strips spaces at start/end
    """

    text = text[:MAX_ENTITY_TEXT_LENGTH]
    text = text.replace('\n', ' ')
    text = text.replace('\t', ' ')
    text = text.replace('\r', ' ')
    text = text.replace('  ', ' ')
    text = text.strip()
    return text

create_db_entries(ents) #

Creates all Database entries which are related to an entity.

get_or_create is used for the entity, so that every entity is only once in the database. In the Hit, the information where that entity was found is stored.

Parameters:

Name Type Description Default
ents

A list of dicts with the following keys: - entity: dict containing entity text and type as Strings. - model: the name of the used model - language: language code of the used model - text_source: 'text' or the OCR text source - digest: The digest object where the entity was found in.

required

Returns:

Type Description

The Ids of the created entity objects(if any).

Source code in snoop/data/analyzers/entities.py
@transaction.atomic
def create_db_entries(ents):
    """Creates all Database entries which are related to an entity.

    `get_or_create` is used for the entity, so that every entity is only once in
    the database. In the Hit, the information where that entity was found is stored.
    Args:
        ents: A list of dicts with the following keys:
            - entity: dict containing entity text and type as Strings.
            - model: the name of the used model
            - language: language code of the used model
            - text_source: 'text' or the OCR text source
            - digest: The digest object where the entity was found in.

    Returns:
        The Ids of the created entity objects(if any).
    """

    def lock_table(model):
        col = collections.current()
        with connections[col.db_alias].cursor() as cursor:
            query = f'LOCK TABLE {model._meta.db_table}'
            log.info('collection %s: %s', col.name, query)
            cursor.execute(query)

    for ent in ents:
        ent['engine'] = 'polyglot' if ent['model'].startswith('polyglot_') else 'spacy'

    # Translate type from the 20+ Spacy types and the 3 Polyglot types into our 5 types.
    for ent in ents:
        ent['entity']['type'] = translate_entity_type(ent['entity']['type'])
    # Exclude entities with no translated type
    ents = [e for e in ents if e['entity']['type']]

    lock = False
    ent_types = {}
    # get new types to insert
    sorted_ent_types = sorted(set(e['entity']['type'] for e in ents))
    # if we have new types, lock table
    if not all(models.EntityType.objects.filter(type=_type).exists()
               for _type in sorted_ent_types):
        lock = True
        lock_table(models.EntityType)
    # after a potentially blocking table lock, do a new get/create
    for _type in sorted_ent_types:
        ent_types[_type], _ = models.EntityType.objects.get_or_create(type=_type)

    ent_items = {}
    sorted_ent_items = sorted(set([(e['entity']['text'], e['entity']['type']) for e in ents]))
    if lock or not all(models.Entity.objects.filter(type=ent_types[_type], entity=_text).exists()
                       for _text, _type in sorted_ent_items):
        lock = True
        lock_table(models.Entity)
    for _text, _type in sorted_ent_items:
        ent_items[(_text, _type)], _ = models.Entity.objects.get_or_create(
            entity=_text,
            type=ent_types[_type],
        )

    lang_models = {}
    sorted_lang_models = sorted(set([(e['language'], e['engine'], e['model']) for e in ents]))
    if lock or not all(models.LanguageModel.objects
                       .filter(language_code=_lang, engine=_eng, model_name=_mod)
                       .exists()
                       for _lang, _eng, _mod in sorted_lang_models):
        lock = True
        lock_table(models.LanguageModel)
    for _lang, _eng, _mod in sorted_lang_models:
        lang_models[(_lang, _eng, _mod)], _ = models.LanguageModel.objects.get_or_create(
            language_code=_lang,
            engine=_eng,
            model_name=_mod,
        )

    hit_list = []
    for e in ents:
        ent_item = ent_items[(e['entity']['text'], e['entity']['type'])]
        ent_lang_model = lang_models[(e['language'], e['engine'], e['model'])]

        hit, _ = models.EntityHit.objects.get_or_create(
            entity=ent_item,
            model=ent_lang_model,
            text_source=e['text_source'],
            start=e['entity']['start'],
            end=e['entity']['end'],
            digest=e['digest'],
        )
        hit_list.append(hit)

    return [hit.pk for hit in hit_list]

detect_language(blob) #

Task that runs language detection

Source code in snoop/data/analyzers/entities.py
@snoop_task('entities.detect_language', version=1, queue='entities')
@returns_json_blob
def detect_language(blob):
    """Task that runs language detection"""

    if not collections.current().nlp_language_detection_enabled:
        raise SnoopTaskBroken('not enabled', 'nlp_lang_detection_not_enabled')

    digest = models.Digest.objects.get(blob=blob)
    digest_data = digest.result.read_json()
    texts = [digest_data.get('text', "")] + list(digest_data.get('ocrtext', {}).values())
    texts = [t[:MAX_LANGDETECT_DOC_READ].strip() for t in texts if len(t.strip()) > 1]
    texts = "\n\n".join(texts).strip()[:MAX_LANGDETECT_DOC_READ]

    timeout = min(TRANSLATION_TIMEOUT_MAX,
                  int(TRANSLATION_TIMEOUT_BASE + len(texts) / TRANSLATION_MIN_SPEED_BPS))

    lang = call_nlp_server('language_detection', {'text': texts}, timeout)['language']
    return {'lang': lang}

get_entity_results(blob, language = None, translation_result_pk = None) #

Gets all entities and the language for all text sources in a digest.

Creates a dict from a string and requests entity extraction from the nlp service optionally, the language can be specified. returns the dict with the collected responses from the service.

Parameters:

Name Type Description Default
digest_id

digest ID for the document to process. The different types of text sources will be extracted from the digest and send to the nlp service to perform entity recognition.

required
language

an optional language code, telling the service which language to use in order to process the text, instead of figuring that out by itself.

None
translation_result_pk

blob containing JSON with extra text sources to process.

None

Returns:

Type Description

The responses from the calls to the NLP Server for all text sources.

Source code in snoop/data/analyzers/entities.py
@snoop_task('entities.get_entity_results', version=3, queue='entities')
@returns_json_blob
def get_entity_results(blob, language=None, translation_result_pk=None):
    """ Gets all entities and the language for all text sources in a digest.

    Creates a dict from a string and requests entity extraction from the nlp service
    optionally, the language can be specified.
    returns the dict with the collected responses from the service.

    Args:
        digest_id: digest ID for the document to process. The different types of text sources will be
            extracted from the digest and send to the nlp service to perform entity recognition.
        language: an optional language code, telling the service which language
            to use in order to process the text, instead of figuring that out by
            itself.
        translation_result_pk: blob containing JSON with extra text sources to process.

    Returns:
        The responses from the calls to the NLP Server for all text sources.
    """
    if not current_collection().nlp_entity_extraction_enabled \
            or not can_extract_entities(language):
        raise SnoopTaskBroken('entity extraction disabled', 'nlp_entity_extraction_disabled')

    digest = models.Digest.objects.get(blob=blob)
    timeout = min(ENTITIES_TIMEOUT_MAX,
                  int(ENTITIES_TIMEOUT_BASE + blob.size / ENTITIES_MIN_SPEED_BPS))
    text_limit = collections.current().nlp_text_length_limit

    digest_data = digest.result.read_json()

    text_sources = {}

    if digest_data.get('text'):
        text_sources['text'] = digest_data.get('text', '')[:text_limit]
    tesseract_lang_code = LANGUAGE_CODE_MAP.get(language)
    if not tesseract_lang_code:
        log.warning(f'No OCR Language code found for language: {language}')
        # keep all texts if there is no matching language code
        tesseract_lang_code = ""
    if digest_data.get('ocrtext') and tesseract_lang_code in current_collection().ocr_languages:
        for k, v in digest_data.get('ocrtext').items():
            if not k == f'tesseract_{tesseract_lang_code}':
                continue
            if v:
                text_sources[k] = v[:text_limit]
    elif digest_data.get('ocrtext'):
        # no ocr language matches the detecte language so we only
        # keep the first ocr text
        first_ocr = list(digest_data.get('ocrtext').items())[0]
        text_sources[first_ocr[0]] = first_ocr[1][:text_limit]

    if translation_result_pk:
        log.info('loaded language data')
        lang_result_json = models.Blob.objects.get(pk=translation_result_pk).read_json()
        if lang_result_json.get('translated-text'):
            for k, v in lang_result_json.get('translated-text').items():
                if v:
                    text_sources[k] = v[:text_limit]

    collected_responses = []
    for source, text in text_sources.items():
        response = {'entities': None, 'language': None, 'source': source}
        data = {'text': text}
        if settings.EXTRACT_ENTITIES:
            if language:
                data['language'] = language
                response['language'] = language
            response['entities'] = call_nlp_server('entity_extraction', data, timeout)

        collected_responses.append(response)

    return collected_responses

process_results(digest, entity_result_parts) #

Processes entity extraction and language detection for one document.

This functions receives the collected results from the service, the database entries are created and the IDs of all entities are received from the database. Also, a list of all entity types which occur in the document is created.

Parameters:

Name Type Description Default
digest

The digest object for which entitity extraction is done.

required
entity_result

The entity results for a single text source.

required

Returns:

Type Description
results

A dictionary containing all found entities, the IDs of all unique entities that were found, as well as keys for all entity types containing a list of all entities with that type.

Source code in snoop/data/analyzers/entities.py
def process_results(digest, entity_result_parts):
    """Processes entity extraction and language detection for one document.

    This functions receives the collected results from the service, the database entries are created
    and the IDs of all entities are received from the database.
    Also, a list of all entity types which occur in the document is created.

    Args:
        digest: The digest object for which entitity extraction is done.
        entity_result: The entity results for a single text source.

    Returns:
        results: A dictionary containing all found entities, the IDs of all
        unique entities that were found, as well as keys for all entity types
        containing a list of all entities with that type.
    """
    for entity_result in entity_result_parts:
        # truncate number of entities returned
        entity_result['entities']['entities'] = \
            entity_result['entities']['entities'][:MAX_ENTITY_COUNT_PER_DOC]

        for e in entity_result['entities']['entities']:
            e['text'] = clean_entity_text(e['text'])

    ent_ids = list(set(
        create_db_entries(
            sum(
                [
                    [
                        {
                            'entity': entity,
                            'model': entity_result['entities']['model'],
                            'language': entity_result['entities']['language'],
                            'text_source': entity_result['source'],
                            'digest': digest,
                        }
                        for entity in entity_result['entities']['entities']
                    ]
                    for entity_result in entity_result_parts
                ],
                start=[]
            )
        )
    ))
    log.info('Saved %s entity hit IDs', len(ent_ids))

    rv = defaultdict(list)
    for entity_result in entity_result_parts:
        # rv['lang'].append(entity_result['entities']['language'])
        rv['entity'] += [
            k['text']
            for k in entity_result['entities']['entities']
            if k['type']
        ]

        # clone into sub-fields, ordered by type
        for k in entity_result['entities']['entities']:
            entity_type = k['type']
            if not entity_type:
                continue
            rv[f'entity-type.{entity_type}'].append(k['text'])

    # delete empty keys
    for k in list(rv.keys()):
        if not rv[k]:
            del rv[k]

    # remove DefaultDict wrapper
    rv = dict(rv)

    # fold together the various languages, pick the first
    # if rv['lang']:
    #     # rv['lang'] = list(set(rv['lang']))
    #     rv['lang'] = rv['lang'][0]
    return rv

translate(blob, lang) #

Task that runs language detection and machine translation.

Receives text from one document, concatenated from all sources (text and OCR).

Returns a JSON Blob with detected language, as well as any translations done.

Source code in snoop/data/analyzers/entities.py
@snoop_task('entities.translate', version=1, queue='translate')
@returns_json_blob
def translate(blob, lang):
    """Task that runs language detection and machine translation.

    Receives text from one document, concatenated from all sources (text and OCR).

    Returns a JSON Blob with detected language, as well as any translations done."""
    if not collections.current().translation_enabled:
        raise SnoopTaskBroken('translation not enabled', 'nlp_translate_not_enabled')

    _txt_limit = collections.current().translation_text_length_limit
    digest = models.Digest.objects.get(blob=blob)
    digest_data = digest.result.read_json()
    tesseract_lang_code = LANGUAGE_CODE_MAP.get(lang)
    texts = [digest_data.get('text', "")]
    if not tesseract_lang_code:
        log.warning(f'No OCR Language code found for language: {lang}')
        # keep all texts if there is no matching language code
        tesseract_lang_code = ""
    if tesseract_lang_code in current_collection().ocr_languages:
        texts += [
            text[1] for text in list(digest_data.get('ocrtext', {}).items())
            if text[0] == f'tesseract_{tesseract_lang_code}'
        ]
    else:
        # keep first ocr if no ocr language matches detected language
        all_ocr_texts = [text[1] for text in list(digest_data.get('ocrtext', {}).items())]
        if all_ocr_texts:
            texts += [all_ocr_texts[0]]
    texts = [t[:_txt_limit].strip() for t in texts if len(t.strip()) > 1]
    texts = "\n\n".join(texts).strip()[:MAX_LANGDETECT_DOC_READ]

    timeout = min(TRANSLATION_TIMEOUT_MAX,
                  int(TRANSLATION_TIMEOUT_BASE + len(texts) / TRANSLATION_MIN_SPEED_BPS))

    rv = {'lang': lang}

    texts = texts[:_txt_limit]
    rv['translated-text'] = {}
    rv['translated-from'] = []
    rv['translated-to'] = []
    for target in settings.TRANSLATION_TARGET_LANGUAGES:
        if target == lang:
            log.warning("skipping translation from %s into %s", lang, target)
            continue
        tr_source = lang or 'auto'
        log.info('translating length=%s from %s to %s', len(texts), tr_source, target)
        tr_args = {'q': texts, 'source': tr_source, 'target': target}
        tr_text = call_translate_server('translate', tr_args, timeout)
        tr_text = tr_text.get('translatedText', '').strip()
        if tr_text and len(tr_text) > 1:
            rv['translated-text'][f'translated_{tr_source}_to_{target}'] = tr_text
            rv['translated-from'].append(tr_source)
            rv['translated-to'].append(target)
        else:
            log.warning("failed translation from %s into %s", lang, target)

    return rv

translate_entity_type(old_type) #

Translate from Spacy or Polyglot entity types into our types.

Here are all the types: - location - organization - event - person - money

Returns None if old type is not interesting and entity can be discarded.

Source code in snoop/data/analyzers/entities.py
def translate_entity_type(old_type):
    """Translate from Spacy or Polyglot entity types into our types.

    Here are all the types:
        - location
        - organization
        - event
        - person
        - money

    Returns None if old type is not interesting and entity can be discarded.
    """

    LOC = 'location'
    ORG = 'organization'
    EVT = 'event'
    PER = 'person'
    MON = 'money'

    tr = {
        # SPACY
        # =====
        # 'CARDINAL': 'number',  # Numerals that do not fall under another typ
        # 'DATE': 'date',  # Absolute or relative dates or periods
        # 'DATETIME',
        'EVENT': EVT,  # Named hurricanes, battles, wars, sports events, etc.
        'FAC': LOC,
        'FACILITY': LOC,  # Buildings, airports, highways, bridges, etc.
        'GPE': LOC,  # Countries, cities, states
        # 'LANGUAGE': 'language',  # Any named language
        # 'LAW': 'law',  # Named documents made into laws
        'LOC': LOC,
        'LOCATION': LOC,  # Non-GPE locations, mountain ranges, bodies of water
        'MONEY': MON,  # Monetary values, including unit
        'NAT_REL_POL': ORG,
        'NORP': ORG,  # Nationalities or religious or political groups
        # 'NUMERIC_VALUE',
        # 'ORDINAL',
        # 'ORDINAL': LOC,  # “first”, “second”
        'ORG': ORG,
        'ORGANIZATION': ORG,  # Companies, agencies, institutions, etc.
        # 'PERCENT': LOC,  # Percentage (including “%”)
        # 'PERIOD',
        'PER': PER,
        'PERSON': PER,  # People, including fictional
        'PRODUCT': ORG,  # Vehicles, weapons, foods, etc. (Not services)
        # 'QUANTITY',
        # 'QUANTITY': 'number',  # Measurements, as of weight or distance
        # 'TIME': 'date',  # Times smaller than a day
        'WORK': ORG,  # OF ART Titles of books, songs, etc.
        'WORK_OF_ART': ORG,

        # POLYGLOT
        # ========
        'I-LOC': LOC,
        'I-ORG': ORG,
        'I-PER': PER,
    }
    return tr.get(old_type, None)