Skip to content

S3

snoop.data.s3 #

Module for managing S3-fuse mounts.

A limited number of mounts can be used at one time.

To decide what mounts are kept, we use a Least Recently Used caching strategy.

Functions#

adjust_s3_mounts(mount_name, old_info, bucket, mount_mode, password_file_path, address, target_path, logfile_path) #

Implement Least Recently used cache for the S3 mounts.

  • check all mount PIDs: if any process is dead, remove from list
  • if mount exists in list, update timestamp, return
  • if mount doesn't exist, then:
  • create new mount, save PID
  • create new entry with pid and timestamp
  • if above mount limit, then send signals to unmount and stop
Source code in snoop/data/s3.py
def adjust_s3_mounts(mount_name, old_info,
                     bucket, mount_mode,
                     password_file_path, address, target_path, logfile_path):
    """Implement Least Recently used cache for the S3 mounts.

    - check all mount PIDs: if any process is dead, remove from list
    - if mount exists in list, update timestamp, return
    - if mount doesn't exist, then:
      - create new mount, save PID
      - create new entry with pid and timestamp
      - if above mount limit, then send signals to unmount and stop
    """

    def _clear_dead(info):
        # check all mount PIDs: if any process is dead, remove from list
        pids_alive = {p.pid for p in psutil.process_iter()}
        info = dict(info)
        for key, value in list(info.items()):
            if value['pid'] not in pids_alive:
                logger.info('old mount dead, key=%s', key)
                del info[key]
        return info

    # need to clear dead before checking, in case something died by itself
    new_info = _clear_dead(old_info)

    # if mount exists in list, update timestamp, return
    if mount_name in new_info:
        logger.debug('found old mount still alive: %s', mount_name)
        new_info[mount_name]['timestamp'] = timestamp()
        return new_info

    # create new mount
    logger.info('creating new mount: %s', mount_name)
    clean_makedirs(target_path)
    pid = mount_s3fs(bucket, mount_mode, password_file_path, address, target_path, logfile_path)

    # create new entry with pid and timestamp
    new_info[mount_name] = {
        'pid': pid, 'timestamp': timestamp(),
        'target': target_path,
    }

    # if above mount limit, then send signals to unmount and stop
    if len(new_info) > settings.SNOOP_S3FS_MOUNT_LIMIT:
        count_mounts_to_remove = len(new_info) - settings.SNOOP_S3FS_MOUNT_LIMIT
        mounts_sorted_by_timestamp = sorted(list(new_info.keys()), key=lambda x: new_info[x]['timestamp'])
        mounts_to_stop = mounts_sorted_by_timestamp[:count_mounts_to_remove]
        for _ in range(2):
            for mount in mounts_to_stop:
                pid = new_info[mount]['pid']
                target = new_info[mount]['target']
                logger.info('removing old mount: pid=%s target=%s', pid, target)

                try:
                    umount_s3fs(target, pid)
                except Exception as e:
                    logger.exception('failed to run "umount_s3fs" for target=%s (%s)', target, e)
            time.sleep(0.001)

        new_info = _clear_dead(old_info)

    return new_info

clean_makedirs(path) #

Helper function that works like os.makedirs(path, exist_ok=True), but also takes care to remove any file that might be at the path instead of a folder.

Source code in snoop/data/s3.py
def clean_makedirs(path):
    """Helper function that works like `os.makedirs(path, exist_ok=True)`,
    but also takes care to remove any file that might be at the path instead of a folder.
    """
    RETRIES = 3
    SLEEP = 0.05
    for retry in range(RETRIES):
        try:
            if os.path.isdir(path):
                return

            # try first makedirs, to get the parents
            try:
                os.makedirs(path, exist_ok=True)
                return
            except OSError:
                pass

            # if it's a normal file, remove that
            if os.path.exists(path) and not os.path.isdir(path):
                try:
                    os.remove(path)
                except Exception as e:
                    logger.exception(e)

            if os.path.exists(path):
                logger.error('os.remove() did not remove the file!')

            try:
                os.makedirs(path, exist_ok=True)
                return
            except OSError:
                assert os.path.exists(path), "dir not created after second os.makedirs()!"

                if not os.path.isdir(path):
                    os.makedirs(path, exist_ok=True)
                    assert os.path.isdir(path), \
                        "dir not created after removing file and running os.makedirs()!"
        except Exception as e:
            logger.warning('retrying clean_makedirs() %s/%s %s', retry, RETRIES, str(e))
            time.sleep(SLEEP)

clear_mounts() #

Unmount all S3 volumes and clear out the metadata and logs. Used when Celery process restarts.

Source code in snoop/data/s3.py
def clear_mounts():
    """Unmount all S3 volumes and clear out the metadata and logs.
    Used when Celery process restarts."""

    worker_base_path = _get_worker_base_path()
    if not os.path.isdir(worker_base_path):
        return

    mount_info_path = os.path.join(worker_base_path, 'mount-info.json')
    mount_info = {}
    if os.path.isfile(mount_info_path):
        with open_exclusive(mount_info_path, 'a+') as f:
            f.seek(0)
            info_str = f.read()

            logger.debug('read mount info: %s', info_str)
            if info_str:
                try:
                    mount_info = json.loads(info_str)
                except Exception as e:
                    logger.debug('clear mounts info corrupted: %s', e)
        for mount_value in mount_info.values():
            umount_s3fs(mount_value['target'], mount_value['pid'])
        os.remove(mount_info_path)
    # in case json is kaputt, let's use `find` to try to unmount all targets
    targets = subprocess.check_output(
        (
            f'find "{worker_base_path}" '
            ' -mindepth 1 -maxdepth 3 -xdev -type d -name target'
        ),
        shell=True,
    ).decode('ascii').strip().splitlines()
    for target in targets:
        target = target.strip()
        if target:
            try:
                umount_s3fs(target)
            except Exception as e:
                logger.warning('could not umount s3fs! %s', str(e))
                raise
    if not os.path.isdir(worker_base_path):
        return
    # keep using `find -xdev` to avoid deleting stuff inside mounts.
    subprocess.call(
        (
            f'find "{worker_base_path}" '
            ' -xdev -type f -delete'
        ),
        shell=True,
    )
    subprocess.call(
        (
            f'find "{worker_base_path}" '
            ' -xdev -type d -empty -delete'
        ),
        shell=True,
    )

get_mount(mount_name, bucket, mount_mode, access_key, secret_key, address) #

Ensure requested S3fs is mounted, while also unmounting least recently used mounts over the limit.

Source code in snoop/data/s3.py
def get_mount(mount_name, bucket, mount_mode, access_key, secret_key, address):
    """Ensure requested S3fs is mounted, while also
    unmounting least recently used mounts over the limit."""

    worker_base_path = _get_worker_base_path()
    mount_info_path = os.path.join(worker_base_path, 'mount-info.json')
    base_path = os.path.join(worker_base_path, mount_name)
    target_path = os.path.join(base_path, 'target')
    logfile_path = os.path.join(base_path, 'mount-log.txt')
    password_file_path = os.path.join(base_path, 'password-file')

    clean_makedirs(base_path)

    # write password file
    with open(password_file_path, 'wb') as pass_temp:
        password_file = pass_temp.name
        subprocess.check_call(f"chmod 600 {password_file}", shell=True)
        pass_str = (access_key + ':' + secret_key).encode('latin-1')
        pass_temp.write(pass_str)
        pass_temp.close()

    with open_exclusive(mount_info_path, 'a+') as f:
        f.seek(0)
        old_info_str = f.read()

        logger.debug('read mount info: %s', old_info_str)
        if old_info_str:
            try:
                old_info = json.loads(old_info_str)
            except Exception as e:
                logger.warning('old mount info corrupted: %s', e)
                old_info = dict()
        else:
            old_info = dict()

        t0 = time.time()
        new_info = adjust_s3_mounts(
            mount_name, old_info,
            bucket, mount_mode, password_file_path, address, target_path, logfile_path
        )

        f.seek(0)
        f.truncate()
        json.dump(new_info, f)

        # wait until the mount was done correctly before returning - in total about 60s
        for retry in range(60):
            if target_is_mounted(target_path):
                dt = round(time.time() - t0, 3)
                logging.debug('mount %s working after %s sec', target_path, dt)
                break
            time.sleep(0.01 + 0.04 * retry)
        else:
            dt = round(time.time() - t0, 3)
            raise RuntimeError(f's3 mount did not start for {target_path} after {dt} sec!')

    return target_path

mount_s3fs(bucket, mount_mode, password_file, address, target, logfile) #

Run subprocess to mount s3fs disk to target. Will wait until completed.

Source code in snoop/data/s3.py
def mount_s3fs(bucket, mount_mode, password_file, address, target, logfile):
    """Run subprocess to mount s3fs disk to target. Will wait until completed."""

    # unused options:
    #    -o use_cache={cache} \\
    #    -o multipart_copy_size=32 \\
    # don't use cache -- it downloads whole file when requested, which
    # does not work on very large archives (would need same amount of temp space)

    cmd_bash = f"""
    s3fs \\
        -f \\
        -o {mount_mode} \\
        -o allow_other \\
        -o max_dirty_data=64 \\
        -o passwd_file={password_file}  \\
        -o use_path_request_style  \\
        -o url=http://{address} \\
        {bucket} {target} > {logfile} \\
        2>&1 & echo $!
    """
    logger.info('running s3fs process: %s', cmd_bash)
    tracer.count("mount_s3fs_start")
    output = subprocess.check_output(cmd_bash, shell=True)
    pid = int(output)
    logger.info('s3fs process started with pid %s', pid)
    return pid

target_is_mounted(path) #

Returns True if the path is a linux mount point

Source code in snoop/data/s3.py
def target_is_mounted(path):
    """Returns True if the path is a linux mount point"""
    return 0 == subprocess.call(
        'findmnt ' + str(path),
        shell=True,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
    )

timestamp() #

Returns current timestamp float for the mount LRU Cache.

Source code in snoop/data/s3.py
def timestamp():
    """Returns current timestamp float for the mount LRU Cache."""

    return time.time()

umount_s3fs(target, pid = None) #

Run subprocess to umount s3fs disk from target. Will wait until completed.

Source code in snoop/data/s3.py
def umount_s3fs(target, pid=None):
    """Run subprocess to umount s3fs disk from target. Will wait until completed."""

    def _pid_alive():
        if pid:
            return pid in [p.pid for p in psutil.process_iter()]
        return False

    def _data_mounted():
        try:
            return bool(os.listdir(target))
        except Exception:
            return False

    subprocess.run(f"umount {target}", shell=True, check=False)
    subprocess.run(f"rmdir {target}", shell=True, check=False)
    if _pid_alive():
        subprocess.run(f"kill {pid}", shell=True, check=False)

    if not _pid_alive() and not _data_mounted() and not os.path.isdir(target):
        return

    for retry in range(10):
        subprocess.run(f"umount {target}", shell=True, check=False)
        subprocess.run(f"rmdir {target}", shell=True, check=False)

        if _pid_alive():
            try:
                os.kill(pid, signal.SIGSTOP)
            except Exception as e:
                logger.exception('failed to send SIGSTOP to mount, pid=%s (%s)', pid, e)

            try:
                os.kill(pid, signal.SIGKILL)
            except Exception as e:
                logger.exception('failed to send SIGKILL to mount, pid=%s (%s)', pid, e)

        if _data_mounted():
            subprocess.check_call(f"""
                umount {target} || umount -l {target} || true;
            """, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,)

        if not _data_mounted():
            try:
                os.rmdir(target)
            except Exception as e:
                logger.warning('Failed to os.rmdir() the target directory %s (%s)', target, e)

        if not _pid_alive() and not _data_mounted() and not os.path.isdir(target):
            tracer.count("umount_s3fs_success")
            return

        time.sleep(0.05 + 0.05 * retry)

    tracer.count("umount_s3fs_failed")
    raise RuntimeError(f'cannot remove old S3 mounts! target={target}, pid={pid}')