Skip to content

Email #

Tasks that handle parsing e-mail.


dump_part(message, depends_on) #

Recursive function to extract text and attachments from multipart email.

For text/html multipart fragments we use Tika to extract the text.


Name Type Description Default

the multipart message.


dict with dependent functions; passed from the task function here to order the Tika processing (for text extraction) if needed.

Source code in snoop/data/analyzers/
def dump_part(message, depends_on):
    """Recursive function to extract text and attachments from multipart email.

    For `text/html` multipart fragments we use Tika to extract the text.

        message: the multipart message.
        depends_on: dict with dependent functions; passed from the task function here to order the Tika
            processing (for text extraction) if needed.

    rv = {'headers': get_headers(message)}

    if message.is_multipart():
        rv['parts'] = [
            dump_part(part, depends_on)
            for part in message.get_payload()
        return rv

    content_type = message.get_content_type()

        payload_bytes = message.get_payload(decode=True)
    except:  # noqa: E722
        log.exception("Error getting email payload")
        raise SnoopTaskBroken("Error getting payload", "email_get_payload")

    if pgp.is_encrypted(payload_bytes):
        payload_bytes = pgp.decrypt(payload_bytes)
        rv['pgp'] = True

    if content_type == 'text/plain':
        result = chardet.detect(payload_bytes)
        charset = result.get('encoding') or 'latin1'
            rv['text'] = payload_bytes.decode(charset, errors='replace')
        except (UnicodeDecodeError, LookupError) as e:
            charset = 'latin1'
            rv['text'] = payload_bytes.decode(charset, errors='replace')

    if content_type in html.HTML_MIME_TYPES:
        with models.Blob.create() as writer:

        rmeta_blob = require_dependency(
            f'tika-html-{}', depends_on,
            lambda: tika.rmeta.laterz(writer.blob),

        if isinstance(rmeta_blob, models.Blob):
            with as f:
                rmeta_data = json.load(f)
            rv['text'] = rmeta_data[0].get('X-TIKA:content', "") if rmeta_data else ""
            log.warning('tika HTML for Email Text failed!')
        # Sometimes, even Tika leaves in some HTML tags...
        if rv['text']:
                rv['text'] = html.clean_str(rv['text'])
            except Exception as e:
                log.warning('HTML cleanup module failed: ' + str(e))

    if message.get_content_disposition():
        raw_filename = message.get_filename()
        if raw_filename:
            filename = read_header(raw_filename)

            with models.Blob.create() as writer:

            rv['attachment'] = {
                'name': filename,

    return rv

email_meta(email_data) #

Returns ready-to-index fields extracted from the emails.parse Task.

The fields include important email headers combined into fields, as well as a dump of all the email headers.

Source code in snoop/data/analyzers/
def email_meta(email_data):
    """Returns ready-to-index fields extracted from the
    [emails.parse Task][].

    The fields include important email headers combined into fields, as well as a dump of all the email

    def iter_parts(email_data):
        yield email_data
        for part in email_data.get('parts') or []:
            yield from iter_parts(part)

    if not email_data:
        return {}

    headers = email_data['headers']

    text_bits = []
    pgp = False
    for part in iter_parts(email_data):
        part_text = part.get('text')
        if part_text:

        if part.get('pgp'):
            pgp = True

    ret = {}
    convert = {
        'to': ['To', 'Cc', 'Bcc', 'Resent-To', 'Resent-Cc', 'Resent-Bcc'],
        'to-direct': ['To', 'Resent-To'],
        'cc': ['Cc', 'Resent-Cc'],
        'bcc': ['Bcc', 'Resent-Bcc'],
        'from': ['From', 'Resent-From'],
        'message-id': ['Message-Id'],
        'thread-index': ['Thread-Index'],
        'in-reply-to': ['In-Reply-To', 'References', 'Original-Message-ID', 'Resent-Message-Id'],

    for target_field in convert:
        all_values = []
        for header in convert[target_field]:
            for val in headers.get(header, []):
                for line in val.strip().splitlines():
                    line = line.strip()
                    if line and line not in all_values:

        ret[target_field] = all_values

    message_date = None
    message_raw_date = headers.get('Date', [None])[0]
    if message_raw_date:
        message_date = zulu(parse_date(message_raw_date))

    to_domains = [_extract_domain(to) for to in ret['to']]
    from_domains = [_extract_domain(f) for f in ret['from']]
    email_domains = list(set(to_domains + from_domains))

        'email-domains': [d.lower() for d in email_domains if d],
        'subject': headers.get('Subject', [''])[0],
        'text': '\n\n'.join(text_bits).strip(),
        'pgp': pgp,
        'date': message_date,
        'email-header-key': list(set(headers.keys())),
        'email-header': sum(([k + '=' + v for v in headers[k]] for k in headers), start=[]),

    # delete empty values for all headers
    for k in list(ret.keys()):
        if not ret[k]:
            del ret[k]
    return ret

get_headers(message) #

Extract dict with headers from email message.

Source code in snoop/data/analyzers/
def get_headers(message):
    """Extract dict with headers from email message."""

    rv = defaultdict(list)

    for key in message.keys():
        for raw_header in message.get_all(key):

    return dict(rv)

iter_parts(message, numbers = []) #

Yields multipart messages into identifiable parts.

The numbers are the positions in each part of the tree.

Source code in snoop/data/analyzers/
def iter_parts(message, numbers=[]):
    """Yields multipart messages into identifiable parts.

    The numbers are the positions in each part of the tree.
    if message.is_multipart():
        for n, part in enumerate(message.get_payload(), 1):
            yield from iter_parts(part, numbers + [str(n)])
        yield '.'.join(numbers), message

lookup_other_encodings(name: str) -> CodecInfo #

Used to set ucs-2le as an alias of utf-16-le in the codec registry.

Used with codecs.regiter when importing this function.

Source code in snoop/data/analyzers/
def lookup_other_encodings(name: str) -> codecs.CodecInfo:
    """Used to set `ucs-2le` as an alias of `utf-16-le` in the codec registry.

    Used with [codecs.regiter](
    when importing this function.

    if name == 'ucs-2le':
        return codecs.lookup('utf-16-le')

msg_to_eml(blob) #

Task to convert .msg emails into .eml.

Source code in snoop/data/analyzers/
def msg_to_eml(blob):
    """Task to convert `.msg` emails into `.eml`."""

    with tempfile.TemporaryDirectory() as temp_dir:
        msg_path = Path(temp_dir) / 'email.msg'
        eml_path = msg_path.with_suffix('.eml')

            with, need_seek=True) as f:
                    ['msgconvert', '--outfile', eml_path, msg_path],
        except subprocess.CalledProcessError as e:
            # This may as well be a non-permanent error, but we have no way to tell
            if e.output:
                output = e.output.decode('latin-1')
                output = "(no output)"
            raise SnoopTaskBroken('running msgconvert failed: ' + output,

        return models.Blob.create_from_file(eml_path)

parse(blob, **depends_on) #

Task function to parse emails into a dict with its structure.

Source code in snoop/data/analyzers/
@snoop_task('email.parse', version=3)
def parse(blob, **depends_on):
    """Task function to parse emails into a dict with its structure."""

    with as f:
        message_bytes =

    if message_bytes[:3] == BYTE_ORDER_MARK:
        message_bytes = message_bytes[3:]

    message = email.message_from_bytes(message_bytes)
    data = dump_part(message, depends_on)

    return data

parse_date(raw_date) #

Parse the date format inside emails, returning None if failed.

Source code in snoop/data/analyzers/
def parse_date(raw_date):
    """Parse the date format inside emails, returning `None` if failed."""
        return email.utils.parsedate_to_datetime(raw_date)
    except TypeError as e:
        log.exception(f'error in parsing date: "{raw_date}"  {str(e)}')
        return None

read_header(raw_header) #

Parse multi-encoding header value.

Under RFC 822, headers can be encoded in more than one character encoding. This is needed to create header lines like Subject: トピック when you can't express Subject in the Japanese encoding. (In this documentation both are UTF-8, but in various datasets, older Windows Cyrillic encodings have this problem).

See email.header.make_header and email.header.decode_header.

Source code in snoop/data/analyzers/
def read_header(raw_header):
    """Parse multi-encoding header value.

    Under RFC 822, headers can be encoded in more than one character encoding. This is needed to create
    header lines like `Subject: トピック ` when you can't express `Subject` in the Japanese encoding. (In
    this documentation both are UTF-8, but in various datasets, older Windows Cyrillic encodings have this

    See [email.header.make_header](
    and [email.header.decode_header](
    """  # noqa: E501

        return str(
    except UnicodeDecodeError:
        return str(raw_header)