Skip to content

Archives

snoop.data.analyzers.archives #

Tasks that unpack archives and return their structure and contents.

Tables are also implemented as archives, with each row being unpacked into a text file.

Functions#

archive_walk(path) #

Generates simple dicts with archive listing for the archive.

Source code in snoop/data/analyzers/archives.py
def archive_walk(path):
    """Generates simple dicts with archive listing for the archive."""

    for entry in os.scandir(path):
        if entry.is_dir(follow_symlinks=False):
            yield {
                'type': 'directory',
                'name': entry.name,
                'children': list(archive_walk(entry.path)),
            }
        else:
            yield {
                'type': 'file',
                'name': entry.name,
                'path': entry.path
            }

call_readpst(pst_path, output_dir, **kw) #

Helper function that calls a readpst process.

Source code in snoop/data/analyzers/archives.py
def call_readpst(pst_path, output_dir, **kw):
    """Helper function that calls a `readpst` process."""
    try:
        subprocess.check_output([
            'readpst',
            '-D',
            '-M',
            '-e',
            '-o',
            str(output_dir),
            '-teajc',
            str(pst_path),
        ], stderr=subprocess.STDOUT)

    except subprocess.CalledProcessError:
        raise SnoopTaskBroken('readpst failed', 'readpst_error')

can_unpack_with_7z(blob) #

Check if the object can be unpacked with 7z. Will check both guessed extensions and mime type.

Source code in snoop/data/analyzers/archives.py
def can_unpack_with_7z(blob):
    """Check if the object can be unpacked with 7z. Will check both guessed extensions and mime type."""
    ext = mimetypes.guess_extension(blob.mime_type)
    return blob.mime_type in SEVENZIP_MIME_TYPES \
        or ext in SEVENZIP_ACCEPTED_EXTENSIONS

check_recursion(listing, blob_pk) #

Raise exception if archive (blob_pk) is contained in itself (listing).

Source code in snoop/data/analyzers/archives.py
def check_recursion(listing, blob_pk):
    """Raise exception if archive (blob_pk) is contained in itself (listing)."""

    for item in listing:
        if item['type'] == 'file':
            if item['blob_pk'] == blob_pk:
                raise RuntimeError(f"Recursion detected, blob_pk={blob_pk}")

        elif item['type'] == 'directory':
            check_recursion(item['children'], blob_pk)

create_blobs(dirlisting, unlink = True) #

Create blobs for files in archive listing created by [snoop.data.analyzers.archive_walk][].

Parameters:

Name Type Description Default
- unlink

if enabled (default), will delete files just after they are saved to blob storage.

required
Source code in snoop/data/analyzers/archives.py
def create_blobs(dirlisting, unlink=True):
    """Create blobs for files in archive listing created by [snoop.data.analyzers.archive_walk][].

    Args:
        - unlink: if enabled (default), will delete files just after they are saved to blob storage.
    """

    for entry in dirlisting:
        if entry['type'] == 'file':
            path = Path(entry['path'])
            entry['blob_pk'] = models.Blob.create_from_file(path).pk
            if unlink:
                os.unlink(path)
            del entry['path']
        else:
            create_blobs(entry['children'], unlink=unlink)

get_table_info(table_path, mime_type, mime_encoding) #

Returns a dict with table sheets, column names, row and column counts, pyexcel type, extra arguments.

Source code in snoop/data/analyzers/archives.py
def get_table_info(table_path, mime_type, mime_encoding):
    """Returns a dict with table sheets, column names, row and column counts, pyexcel type, extra arguments.
    """

    pyexcel_filetype = TABLE_MIME_TYPE_OPERATOR_TABLE[mime_type]
    TEXT_FILETYPES = ['csv', 'tsv', 'html']
    dialect = None
    extra_kw = dict()

    rv = {
        'sheets': [],
        'sheet-columns': {},
        'sheet-row-count': {},
        'sheet-col-count': {},
        'extra-kw': {},
        'text-mode': False,
        'pyexcel-filetype': pyexcel_filetype,
    }

    f1 = None
    f2 = None
    try:
        if pyexcel_filetype in TEXT_FILETYPES:
            f1 = open(table_path, 'rt', encoding=mime_encoding)
            f2 = open(table_path, 'rt', encoding=mime_encoding)
            rv['text-mode'] = True

            if pyexcel_filetype in ['csv', 'tsv']:
                dialect = guess_csv_settings(f2, mime_encoding)
                f2.seek(0)
                if dialect:
                    extra_kw['delimiter'] = dialect.delimiter
                    extra_kw['lineterminator'] = dialect.lineterminator
                    extra_kw['escapechar'] = dialect.escapechar
                    extra_kw['quotechar'] = dialect.quotechar
                    extra_kw['quoting'] = dialect.quoting
                    extra_kw['skipinitialspace'] = dialect.skipinitialspace

                    extra_kw['encoding'] = mime_encoding
                    rv['extra-kw'] = extra_kw
        else:
            f1 = open(table_path, 'rb')
            f2 = open(table_path, 'rb')
        sheets = list(
            pyexcel.iget_book(
                file_stream=f1,
                file_type=pyexcel_filetype,
                auto_detect_float=False,
                auto_detect_int=False,
                auto_detect_datetime=False,
                # skip_hidden_sheets=False,
            )
        )
        for sheet in sheets:
            # get rows and count them
            rows = pyexcel.iget_array(
                file_stream=f2,
                file_type=pyexcel_filetype,
                sheet_name=sheet.name,
                auto_detect_float=False,
                auto_detect_int=False,
                auto_detect_datetime=False,
                **extra_kw,
            )
            try:
                row_count = _get_row_count(rows)
                f2.seek(0)
            except Exception as e:
                log.error('cannot determine row count for table "%s"!', table_path)
                log.exception(e)
                return None

            rows = pyexcel.iget_array(
                file_stream=f2,
                file_type=pyexcel_filetype,
                sheet_name=sheet.name,
                auto_detect_float=False,
                auto_detect_int=False,
                auto_detect_datetime=False,
                **extra_kw,
            )
            try:
                row = list(next(rows))
                col_count = len(row)
            except StopIteration:
                row = []
                col_count = 0
                log.warning('no rows found for doc "%s"', table_path)
            colnames = sheet.colnames or collections.current().default_table_head_by_len.get(col_count) or []
            rv['sheets'].append(sheet.name)
            rv['sheet-columns'][sheet.name] = colnames
            rv['sheet-row-count'][sheet.name] = row_count
            rv['sheet-col-count'][sheet.name] = col_count
    finally:
        if f1:
            f1.close()
        if f2:
            f2.close()
        pyexcel.free_resources()

    return rv

guess_csv_settings(file_stream, mime_encoding) #

Returns the csv.Dialect object if file contains start of CSV, or None otherwise.

Source code in snoop/data/analyzers/archives.py
def guess_csv_settings(file_stream, mime_encoding):
    """Returns the csv.Dialect object if file contains start of CSV, or None otherwise."""

    GUESS_READ_LEN = 8192
    text = file_stream.read(GUESS_READ_LEN)
    if isinstance(text, bytes):
        mime_encoding = mime_encoding or 'latin-1'
        if mime_encoding.startswith('unknown'):
            mime_encoding = 'latin-1'

        text = text.decode(mime_encoding, errors='backslashreplace')
    try:
        return csv.Sniffer().sniff(text, CSV_DELIMITER_LIST)
    except csv.Error:
        return None

is_archive(blob) #

Checks if mime type is a known archive or table type.

Source code in snoop/data/analyzers/archives.py
def is_archive(blob):
    """Checks if mime type is a known archive or table type.
    """

    return (
        is_table(blob)
        or (blob.mime_type in ARCHIVES_MIME_TYPES)
        or can_unpack_with_7z(blob)
    )

is_table(blob) #

Check if blob is table type.

LibMagic can't detect CSV and TSV, so we use the python sniff module to check, and overwrite mime type if we found a match.

Source code in snoop/data/analyzers/archives.py
def is_table(blob):
    """Check if blob is table type.

    LibMagic can't detect CSV and TSV, so we use the python sniff module to check, and overwrite mime type
    if we found a match.
    """
    if blob.mime_type == 'text/plain':
        with blob.open() as f:
            dialect = guess_csv_settings(f, blob.mime_encoding)
        if not dialect:
            return False
        csv_delim = dialect.delimiter
        if csv_delim == '\t':
            blob.mime_type = 'text/tab-separated-values'
        else:
            blob.mime_type = 'text/csv'
        blob.save()
        return True
    return blob.mime_type in TABLE_MIME_TYPES

unarchive(blob) #

Task to extract from an archive (or archive-looking) file its children.

Runs on archives, email archives and any other file types that can contain another file (such as documents that embed images).

Source code in snoop/data/analyzers/archives.py
@snoop_task('archives.unarchive', version=4, queue='filesystem')
@returns_json_blob
def unarchive(blob):
    """Task to extract from an archive (or archive-looking) file its children.

    Runs on archives, email archives and any other file types that can contain another file (such as
    documents that embed images).
    """
    unpack_func = None
    if blob.mime_type in TABLE_MIME_TYPES:
        if not collections.current().unpack_tables_enabled:
            log.warning('unpacking tables disabled')
            return None
        unpack_func = unpack_table
    elif blob.mime_type in READPST_MIME_TYPES:
        unpack_func = call_readpst
    elif blob.mime_type in MBOX_MIME_TYPES:
        unpack_func = unpack_mbox
    elif blob.mime_type in PDF_MIME_TYPES:
        if not collections.current().unpack_pdf_enabled:
            log.warning('unpacking pdf disabled')
            return None
        unpack_func = unpack_pdf
    else:
        if can_unpack_with_7z(blob):
            return unarchive_7z(blob)
        else:
            log.warning('unarchive: unknown mime type')
            return None

    with blob.mount_path() as blob_path:
        with collections.current().mount_blobs_root(readonly=False) as blobs_root:
            base = Path(blobs_root) / 'tmp' / 'archives'
            base.mkdir(parents=True, exist_ok=True)
            with tempfile.TemporaryDirectory(prefix=blob.pk, dir=base) as temp_dir:
                t0 = time.time()
                log.info('starting unpack...')
                os.chdir(temp_dir)
                unpack_func(blob_path, temp_dir,
                            mime_type=blob.mime_type,
                            mime_encoding=blob.mime_encoding)
                log.info('unpack done in: %s seconds', time.time() - t0)
                if not os.listdir(temp_dir):
                    log.warning('extraction resulted in no files. exiting...')
                    return None

                t0 = time.time()
                log.info('starting archive listing...')
                listing = list(archive_walk(Path(temp_dir)))
                log.info('archive listing done in: %s seconds', time.time() - t0)

                t0 = time.time()
                log.info('creating archive blobs...')
                create_blobs(listing)
                log.info('create archive blobs done in: %s seconds', time.time() - t0)

    t0 = time.time()
    log.info('checking recursion archive blobs...')
    check_recursion(listing, blob.pk)
    log.info('check recursion done in: %s seconds', time.time() - t0)
    return listing

unarchive_7z(blob) #

Attempt to unarchive using fuse mount; if that fails, just unpack whole file.

Source code in snoop/data/analyzers/archives.py
def unarchive_7z(blob):
    """Attempt to unarchive using fuse mount; if that fails, just unpack whole file."""

    # for mounting, change the PWD into a temp dir, because
    # the fuse-7z mounting library sometimes enjoys a
    # large core dump on the PWD.
    with tempfile.TemporaryDirectory(prefix='unarchive-7z-pwd-') as pwd:
        os.chdir(pwd)
        return unarchive_7z_impl(blob)

unarchive_7z_impl(blob) #

Old method of unpacking archives: simply calling 7z on them.

Source code in snoop/data/analyzers/archives.py
def unarchive_7z_impl(blob):
    """Old method of unpacking archives: simply calling 7z on them."""

    with blob.mount_path() as blob_path:
        with collections.current().mount_blobs_root(readonly=False) as blobs_root:
            base = Path(blobs_root) / 'tmp' / 'archives'
            base.mkdir(parents=True, exist_ok=True)
            with tempfile.TemporaryDirectory(prefix=blob.pk, dir=base) as temp_dir:
                log.info('extracting %s into: %s', blob_path, temp_dir)
                unpack_7z(blob_path, temp_dir)

                listing = list(archive_walk(Path(temp_dir)))
                log.info('obtained listing with %s items.', len(listing))
                create_blobs(listing)
                log.info('created all blobs.')

    check_recursion(listing, blob.pk)
    return listing

unpack_7z(archive_path, output_dir) #

Helper function that calls a 7z process.

Source code in snoop/data/analyzers/archives.py
def unpack_7z(archive_path, output_dir):
    """Helper function that calls a `7z` process."""

    # the -mmt switch enables multithreading. But for decompression this only affects
    # bzip2 files: see https://sevenzip.osdn.jp/chm/cmdline/switches/method.htm

    # first try new 7z version (7zz)
    try:
        subprocess.check_output([
            '7zz',
            '-y',
            '-pp',
            f'-mmt={settings.UNARCHIVE_THREADS}',
            'x',
            str(archive_path),
            '-o' + str(output_dir),
        ], stderr=subprocess.STDOUT)
    except subprocess.CalledProcessError:
        log.info('7zz extraction failed. Trying older 7z version')
        try:
            # if it fails try the older version
            subprocess.check_output([
                '7z',
                '-y',
                '-pp',
                'x',
                str(archive_path),
                '-o' + str(output_dir),
            ], stderr=subprocess.STDOUT)
        except subprocess.CalledProcessError:
            raise SnoopTaskBroken("7z extraction failed", '7z_error')

unpack_mbox(mbox_path, output_dir, **kw) #

Split a MBOX into emails.

Source code in snoop/data/analyzers/archives.py
def unpack_mbox(mbox_path, output_dir, **kw):
    """Split a MBOX into emails."""

    mbox_object = mailbox.mbox(mbox_path)

    for key in mbox_object.keys():
        message_bytes = mbox_object.get_bytes(key)
        hash = sha1(str(message_bytes).encode('utf-8')).hexdigest()
        eml_path = Path(output_dir) / hash[:2] / '{}.eml'.format(hash)
        eml_path.parent.mkdir(parents=True, exist_ok=True)
        with eml_path.open('wb') as f:
            f.write(message_bytes)

unpack_pdf(pdf_path, output_dir, **kw) #

Extract images from pdf by calling pdfimages.

Source code in snoop/data/analyzers/archives.py
def unpack_pdf(pdf_path, output_dir, **kw):
    """Extract images from pdf by calling `pdfimages`."""

    try:
        subprocess.check_call(
            [
                'pdfimages',
                str(pdf_path),
                # '-all',  # only output common image types
                '-j', '-png', '-ccitt',
                '-p',
                'page',
            ],
            stderr=subprocess.STDOUT,
            cwd=output_dir,
        )

        # As per pdfimages help text, use
        # fax2tiff to re-create the tiff files from .ccitt, then get pngs from tiff.
        # Using its own tiff convertor outputs images with reversed color.
        fax = (Path(output_dir) / 'fax.tif')
        for ccitt in Path(output_dir).glob('*.ccitt'):
            params = ccitt.with_suffix('.params')
            png = ccitt.with_suffix('.png')

            with params.open('r') as f:
                params_text = f.read().strip()
            subprocess.check_call(f'fax2tiff {str(ccitt)} {params_text}',
                                  cwd=output_dir, shell=True)
            ccitt.unlink()
            params.unlink()

            subprocess.check_call(f'convert {str(fax)} {str(png)}',
                                  shell=True, cwd=output_dir)
            fax.unlink()

    except subprocess.CalledProcessError:
        raise SnoopTaskBroken("pdfimages extraction failed", 'pdfimages_error')

unpack_table(table_path, output_path, mime_type = None, mime_encoding = None, **kw) #

Unpack table (csv, excel, etc.) into text files, one for each row.

Source code in snoop/data/analyzers/archives.py
def unpack_table(table_path, output_path, mime_type=None, mime_encoding=None, **kw):
    """Unpack table (csv, excel, etc.) into text files, one for each row."""

    output_path = Path(output_path)
    assert mime_type is not None
    pyexcel_filetype = TABLE_MIME_TYPE_OPERATOR_TABLE[mime_type]
    TEXT_FILETYPES = ['csv', 'tsv', 'html']
    dialect = None
    extra_kw = dict()
    f1 = None
    f2 = None
    try:
        if pyexcel_filetype in TEXT_FILETYPES:
            f1 = open(table_path, 'rt', encoding=mime_encoding)
            f2 = open(table_path, 'rt', encoding=mime_encoding)

            if pyexcel_filetype in ['csv', 'tsv']:
                dialect = guess_csv_settings(f2, mime_encoding)
                f2.seek(0)
                if dialect:
                    extra_kw['delimiter'] = dialect.delimiter
                    extra_kw['lineterminator'] = dialect.lineterminator
                    extra_kw['escapechar'] = dialect.escapechar
                    extra_kw['quotechar'] = dialect.quotechar
                    extra_kw['quoting'] = dialect.quoting
                    extra_kw['skipinitialspace'] = dialect.skipinitialspace

                    extra_kw['encoding'] = mime_encoding
        else:
            f1 = open(table_path, 'rb')
            f2 = open(table_path, 'rb')
        sheets = list(
            pyexcel.iget_book(
                file_stream=f1,
                file_type=pyexcel_filetype,
                auto_detect_float=False,
                auto_detect_int=False,
                auto_detect_datetime=False,
                # skip_hidden_sheets=False,
            )
        )
        for sheet in sheets:
            if sheet.name:
                sheet_output_path = output_path / sheet.name
            else:
                sheet_output_path = output_path
            # get rows and count them
            rows = pyexcel.iget_array(
                file_stream=f2,
                file_type=pyexcel_filetype,
                sheet_name=sheet.name,
                auto_detect_float=False,
                auto_detect_int=False,
                auto_detect_datetime=False,
                **extra_kw,
            )
            try:
                row_count = _get_row_count(rows)
                f2.seek(0)
            except Exception as e:
                log.error('cannot determine row count for table "%s"!', table_path)
                log.exception(e)
                raise SnoopTaskBroken('table read error', 'table_error')

            # split large tables, so our in-memory archive crawler doesn't crash.
            # only do the split for sizes bigger than 1.5X the limit, so we avoid
            # splitting relatively small tables.
            # Or do it if the table type isn't CSV or TSV.
            if row_count > int(1.5 * settings.TABLES_SPLIT_FILE_ROW_COUNT) \
                    or pyexcel_filetype not in TEXT_FILETYPES:
                log.info('splitting sheet "%s" with %s rows into pieces...', sheet.name, row_count)
                os.makedirs(str(sheet_output_path), exist_ok=True)

                for i in range(0, row_count, settings.TABLES_SPLIT_FILE_ROW_COUNT):
                    start_row = i
                    row_limit = settings.TABLES_SPLIT_FILE_ROW_COUNT
                    end_row = min(start_row + row_limit, row_count)
                    split_file_path = str(sheet_output_path / f'split-rows-{start_row}-{end_row}.csv')
                    log.info('writing file %s', split_file_path)
                    dest_mime_encoding = mime_encoding
                    if not dest_mime_encoding or dest_mime_encoding == 'binary':
                        dest_mime_encoding = 'utf-8'
                    pyexcel.isave_as(
                        file_stream=f2,
                        file_type=pyexcel_filetype,
                        sheet_name=sheet.name,
                        auto_detect_float=False,
                        auto_detect_int=False,
                        auto_detect_datetime=False,
                        start_row=start_row,
                        row_limit=row_limit,
                        dest_file_name=split_file_path,
                        dest_delimiter=':',
                        dest_encoding=dest_mime_encoding,
                        **extra_kw,
                    )
                    f2.seek(0)
                return

            # get row iterator again, now to read rows and explode them
            if collections.current().explode_table_rows:
                log.info('exploding rows from table...')
                os.makedirs(str(sheet_output_path), exist_ok=True)

                rows = pyexcel.iget_array(
                    file_stream=f2,
                    file_type=pyexcel_filetype,
                    sheet_name=sheet.name,
                    auto_detect_float=False,
                    auto_detect_int=False,
                    auto_detect_datetime=False,
                    **extra_kw,
                )
                for i, row in enumerate(rows):
                    row = list(row)
                    colnames = sheet.colnames or \
                        collections.current().default_table_head_by_len.get(len(row))
                    _do_explode_row(
                        i, row, sheet_output_path,
                        sheet_name=sheet.name, colnames=colnames,
                        mime_encoding=mime_encoding,
                    )
    finally:
        if f1:
            f1.close()
        if f2:
            f2.close()
        pyexcel.free_resources()