Indexing
snoop.data.indexing
#
Elasticsearch index data mapping, settings, management and import/export.
We maintain an append-only document mapping with a number of wildcard fields. One wildcard field mapping example is the private tags field - a different field for every user.
Attributes#
MAX_TEXT_FIELD_SIZE
#
Max length of string to pass as text
and ocrtext.*
fields into Elasticsearch.
These are loaded into memory all at once, so this puts a limit on that size.
Elasticsearch default limit for this is 100M, but we're configuring a limit of 1800MB. Still, anything longer than 100-200MB takes a very long time to search and highlight using the default highlighter and analyzer.
Functions#
all_indices()
#
Return a list with all Elasticsearch indexes created for collections.
Source code in snoop/data/indexing.py
def all_indices():
"""Return a list with all Elasticsearch indexes created for collections."""
indices = requests.get(f'{ES_URL}/_cat/indices?format=json').json()
return [a['index'] for a in indices if not a['index'].startswith('.monitoring')]
bulk_index(items)
#
Index many documents provided as (id, body).
Source code in snoop/data/indexing.py
def bulk_index(items):
"""Index many documents provided as (id, body)."""
if not items:
return {'items': []}
def generate_data(items):
for _id, body in items:
action = {
"index": {
"_id": _id,
}
}
yield (json.dumps(action) + '\n').encode()
yield (json.dumps(body) + '\n').encode()
es_index = collections.current().es_index
r = requests.post(
f'{ES_URL}/{es_index}/{DOCUMENT_TYPE}/_bulk',
data=generate_data(items),
headers={'Content-Type': 'application/x-ndjson'},
)
check_response(r)
return r.json()
check_response(resp)
#
Raises exception if HTTP status code is not successful (2XX status code).
Source code in snoop/data/indexing.py
def check_response(resp):
"""Raises exception if HTTP status code is not successful (2XX status code)."""
if 200 <= resp.status_code < 300:
log.debug('Response: %r', resp)
else:
log.error('Response text: %s', resp.text)
raise RuntimeError('Put request failed: %r' % resp)
create_index()
#
Create Elasticsearch index for current collection.
Source code in snoop/data/indexing.py
def create_index():
"""Create Elasticsearch index for current collection."""
es_index = collections.current().es_index
url = f'{ES_URL}/{es_index}'
log.info("PUT %s", url)
put_resp = put_json(url, CONFIG)
check_response(put_resp)
delete_doc(id)
#
Deletes a single document from the current collection by its id.
Source code in snoop/data/indexing.py
def delete_doc(id):
"""Deletes a single document from the current collection by its id."""
es_index = collections.current().es_index
index_url = f'{ES_URL}/{es_index}'
resp = requests.delete(f'{index_url}/{DOCUMENT_TYPE}/{id}')
check_response(resp)
delete_index()
#
Delete the current collection's Elasticsearch index.
Source code in snoop/data/indexing.py
def delete_index():
"""Delete the current collection's Elasticsearch index."""
es_index = collections.current().es_index
delete_index_by_name(es_index)
delete_index_by_name(name)
#
Delete a whole Elasticsearch index.
Source code in snoop/data/indexing.py
def delete_index_by_name(name):
"""Delete a whole Elasticsearch index."""
url = f'{ES_URL}/{name}'
log.info("DELETE %s", url)
delete_resp = requests.delete(url)
log.debug('Response: %r', delete_resp)
export_index(stream = None)
#
Export a snapshot of the current collection index.
Source code in snoop/data/indexing.py
def export_index(stream=None):
"""Export a snapshot of the current collection index."""
es_index = collections.current().es_index
with snapshot_repo() as (repo, repo_path):
snapshot = f'{repo}/{es_index}'
log.info('Elasticsearch snapshot %r', snapshot)
log.info('Create snapshot')
snapshot_resp = put_json(snapshot, {
'indices': es_index,
'include_global_state': False,
})
check_response(snapshot_resp)
while True:
status_resp = requests.get(snapshot)
check_response(status_resp)
state = status_resp.json()['snapshots'][0]['state']
log.debug('Snapshot state: %r', state)
if state == 'SUCCESS':
log.info('Snapshot created successfully')
break
time.sleep(1)
log.info('Create tar archive')
subprocess.run(
'tar c *',
cwd=repo_path,
shell=True,
check=True,
stdout=stream,
)
import_index(delete = False, stream = None)
#
Import index snapshot into current collection index.
Source code in snoop/data/indexing.py
def import_index(delete=False, stream=None):
"""Import index snapshot into current collection index."""
es_index = collections.current().es_index
if delete:
delete_index(es_index)
with snapshot_repo() as (repo, repo_path):
log.info('Unpack tar archive')
with tarfile.open(mode='r|*', fileobj=stream or sys.stdin.buffer) as tar:
tar.extractall(repo_path)
snapshots_resp = requests.get(f'{repo}/*')
check_response(snapshots_resp)
for s in snapshots_resp.json()['snapshots']:
if s['state'] == 'SUCCESS':
[snapshot_index] = s['indices']
if snapshot_index != es_index:
continue
snapshot = f'{repo}/{s["snapshot"]}'
break
else:
raise RuntimeError(f"No snapshots for index {es_index}")
log.info("Starting restore for index %r as %r", snapshot_index, es_index)
restore = f'{snapshot}/_restore'
restore_resp = post_json(restore, {
'indices': es_index,
'include_global_state': False,
'include_aliases': False,
})
check_response(restore_resp)
assert restore_resp.json()['accepted']
status = f'{ES_URL}/{es_index}/_recovery'
while True:
status_resp = requests.get(status)
check_response(status_resp)
if not status_resp.json():
log.debug("Waiting for restore to start")
time.sleep(1)
continue
for shard in status_resp.json()[es_index]['shards']:
stage = shard['stage']
if stage != 'DONE':
log.debug("Shard %r stage=%r", shard['id'], stage)
time.sleep(1)
break
else:
log.info('Snapshot restored successfully')
break
index(id, data)
#
Index a single document in current collection.
Source code in snoop/data/indexing.py
def index(id, data):
"""Index a single document in current collection."""
es_index = collections.current().es_index
index_url = f'{ES_URL}/{es_index}'
resp = put_json(f'{index_url}/{DOCUMENT_TYPE}/{id}', data)
check_response(resp)
index_exists()
#
Check if current collection's Elasticsearch index exists.
Source code in snoop/data/indexing.py
def index_exists():
"""Check if current collection's Elasticsearch index exists."""
es_index = collections.current().es_index
head_resp = requests.head(f"{ES_URL}/{es_index}")
return head_resp.status_code == 200
post_json(url, data)
#
Helper method send HTTP POST requests to Elasticsearch.
Source code in snoop/data/indexing.py
def post_json(url, data):
"""Helper method send HTTP POST requests to Elasticsearch."""
return requests.post(
url,
data=json.dumps(data),
headers={'Content-Type': 'application/json'},
)
put_json(url, data)
#
Helper method send HTTP PUT requests to Elasticsearch.
Source code in snoop/data/indexing.py
def put_json(url, data):
"""Helper method send HTTP PUT requests to Elasticsearch."""
return requests.put(
url,
data=json.dumps(data),
headers={'Content-Type': 'application/json'},
)
snapshot_repo()
#
Context manager to create an Elasticsearch snapshot for current collection index.
Will delete the snapshot after context finished.
Source code in snoop/data/indexing.py
@contextmanager
def snapshot_repo():
"""Context manager to create an Elasticsearch snapshot for current collection index.
Will delete the snapshot after context finished.
"""
es_index = collections.current().es_index
id = f'{es_index}-{datetime.utcnow().isoformat().lower()}'
repo = f'{ES_URL}/_snapshot/{id}'
repo_path = f'/opt/hoover/es-snapshots/{id}'
log.info('Create snapshot repo')
repo_resp = put_json(repo, {
'type': 'fs',
'settings': {
'location': repo_path,
'compress': True,
},
})
check_response(repo_resp)
try:
yield (repo, repo_path)
finally:
log.info('Delete snapshot repo')
delete_resp = requests.delete(repo)
check_response(delete_resp)
log.info('Remove repo files')
shutil.rmtree(repo_path)
update_mapping()
#
Update mapping and settings for current Elasticsearch index.
Source code in snoop/data/indexing.py
def update_mapping():
"""Update mapping and settings for current Elasticsearch index."""
es_index = collections.current().es_index
url = f'{ES_URL}/{es_index}/_mapping/{DOCUMENT_TYPE}'
log.info("PUT %s", url)
put_resp = put_json(url, MAPPINGS[DOCUMENT_TYPE])
check_response(put_resp)
index_settings = {
"index": {
"refresh_interval": collections.current().refresh_interval,
"max_result_window": collections.current().max_result_window,
"number_of_replicas": 0,
}
}
url = f'{ES_URL}/{es_index}/_settings'
log.info("PUT %s", url)
put_resp = put_json(url, index_settings)
check_response(put_resp)
update_refresh_interval(interval = None)
#
Override the current collection index refresh interval.
Useful when bulk indexing, to switch to -1
and back.
When None
is given as an argument (default argument)
will reset to default value (set by environment variable).
Source code in snoop/data/indexing.py
def update_refresh_interval(interval=None):
"""Override the current collection index refresh interval.
Useful when bulk indexing, to switch to `-1` and back.
When `None` is given as an argument (default argument)
will reset to default value (set by environment variable)."""
if interval is None:
interval = collections.current().refresh_interval
es_index = collections.current().es_index
url = f'{ES_URL}/{es_index}/_mapping/{DOCUMENT_TYPE}'
log.info("PUT %s", url)
put_resp = put_json(url, MAPPINGS[DOCUMENT_TYPE])
check_response(put_resp)
index_settings = {
"index": {
"refresh_interval": interval,
}
}
url = f'{ES_URL}/{es_index}/_settings'
log.info("PUT %s", url)
put_resp = put_json(url, index_settings)
check_response(put_resp)