S3
snoop.data.s3
#
Module for managing S3-fuse mounts.
A limited number of mounts can be used at one time.
To decide what mounts are kept, we use a Least Recently Used caching strategy.
Functions#
adjust_s3_mounts(mount_name, old_info, bucket, mount_mode, password_file_path, address, target_path, logfile_path)
#
Implement Least Recently used cache for the S3 mounts.
- check all mount PIDs: if any process is dead, remove from list
- if mount exists in list, update timestamp, return
- if mount doesn't exist, then:
- create new mount, save PID
- create new entry with pid and timestamp
- if above mount limit, then send signals to unmount and stop
Source code in snoop/data/s3.py
def adjust_s3_mounts(mount_name, old_info,
bucket, mount_mode,
password_file_path, address, target_path, logfile_path):
"""Implement Least Recently used cache for the S3 mounts.
- check all mount PIDs: if any process is dead, remove from list
- if mount exists in list, update timestamp, return
- if mount doesn't exist, then:
- create new mount, save PID
- create new entry with pid and timestamp
- if above mount limit, then send signals to unmount and stop
"""
def _clear_dead(info):
# check all mount PIDs: if any process is dead, remove from list
pids_alive = {p.pid for p in psutil.process_iter()}
info = dict(info)
for key, value in list(info.items()):
if value['pid'] not in pids_alive:
logger.info('old mount dead, key=%s', key)
del info[key]
return info
# need to clear dead before checking, in case something died by itself
new_info = _clear_dead(old_info)
# if mount exists in list, update timestamp, return
if mount_name in new_info:
logger.debug('found old mount still alive: %s', mount_name)
new_info[mount_name]['timestamp'] = timestamp()
return new_info
# create new mount
logger.info('creating new mount: %s', mount_name)
clean_makedirs(target_path)
pid = mount_s3fs(bucket, mount_mode, password_file_path, address, target_path, logfile_path)
# create new entry with pid and timestamp
new_info[mount_name] = {
'pid': pid, 'timestamp': timestamp(),
'target': target_path,
}
# if above mount limit, then send signals to unmount and stop
if len(new_info) > settings.SNOOP_S3FS_MOUNT_LIMIT:
count_mounts_to_remove = len(new_info) - settings.SNOOP_S3FS_MOUNT_LIMIT
mounts_sorted_by_timestamp = sorted(list(new_info.keys()), key=lambda x: new_info[x]['timestamp'])
mounts_to_stop = mounts_sorted_by_timestamp[:count_mounts_to_remove]
for _ in range(2):
for mount in mounts_to_stop:
pid = new_info[mount]['pid']
target = new_info[mount]['target']
logger.info('removing old mount: pid=%s target=%s', pid, target)
try:
umount_s3fs(target, pid)
except Exception as e:
logger.exception('failed to run "umount_s3fs" for target=%s (%s)', target, e)
time.sleep(0.001)
new_info = _clear_dead(old_info)
return new_info
clean_makedirs(path)
#
Helper function that works like os.makedirs(path, exist_ok=True)
,
but also takes care to remove any file that might be at the path instead of a folder.
Source code in snoop/data/s3.py
def clean_makedirs(path):
"""Helper function that works like `os.makedirs(path, exist_ok=True)`,
but also takes care to remove any file that might be at the path instead of a folder.
"""
RETRIES = 3
SLEEP = 0.05
for retry in range(RETRIES):
try:
if os.path.isdir(path):
return
# try first makedirs, to get the parents
try:
os.makedirs(path, exist_ok=True)
return
except OSError:
pass
# if it's a normal file, remove that
if os.path.exists(path) and not os.path.isdir(path):
try:
os.remove(path)
except Exception as e:
logger.exception(e)
if os.path.exists(path):
logger.error('os.remove() did not remove the file!')
try:
os.makedirs(path, exist_ok=True)
return
except OSError:
assert os.path.exists(path), "dir not created after second os.makedirs()!"
if not os.path.isdir(path):
os.makedirs(path, exist_ok=True)
assert os.path.isdir(path), \
"dir not created after removing file and running os.makedirs()!"
except Exception as e:
logger.warning('retrying clean_makedirs() %s/%s %s', retry, RETRIES, str(e))
time.sleep(SLEEP)
clear_mounts()
#
Unmount all S3 volumes and clear out the metadata and logs. Used when Celery process restarts.
Source code in snoop/data/s3.py
def clear_mounts():
"""Unmount all S3 volumes and clear out the metadata and logs.
Used when Celery process restarts."""
worker_base_path = _get_worker_base_path()
if not os.path.isdir(worker_base_path):
return
mount_info_path = os.path.join(worker_base_path, 'mount-info.json')
mount_info = {}
if os.path.isfile(mount_info_path):
with open_exclusive(mount_info_path, 'a+') as f:
f.seek(0)
info_str = f.read()
logger.debug('read mount info: %s', info_str)
if info_str:
try:
mount_info = json.loads(info_str)
except Exception as e:
logger.debug('clear mounts info corrupted: %s', e)
for mount_value in mount_info.values():
umount_s3fs(mount_value['target'], mount_value['pid'])
os.remove(mount_info_path)
# in case json is kaputt, let's use `find` to try to unmount all targets
targets = subprocess.check_output(
(
f'find "{worker_base_path}" '
' -mindepth 1 -maxdepth 3 -xdev -type d -name target'
),
shell=True,
).decode('ascii').strip().splitlines()
for target in targets:
target = target.strip()
if target:
try:
umount_s3fs(target)
except Exception as e:
logger.warning('could not umount s3fs! %s', str(e))
raise
if not os.path.isdir(worker_base_path):
return
# keep using `find -xdev` to avoid deleting stuff inside mounts.
subprocess.call(
(
f'find "{worker_base_path}" '
' -xdev -type f -delete'
),
shell=True,
)
subprocess.call(
(
f'find "{worker_base_path}" '
' -xdev -type d -empty -delete'
),
shell=True,
)
get_mount(mount_name, bucket, mount_mode, access_key, secret_key, address)
#
Ensure requested S3fs is mounted, while also unmounting least recently used mounts over the limit.
Source code in snoop/data/s3.py
def get_mount(mount_name, bucket, mount_mode, access_key, secret_key, address):
"""Ensure requested S3fs is mounted, while also
unmounting least recently used mounts over the limit."""
worker_base_path = _get_worker_base_path()
mount_info_path = os.path.join(worker_base_path, 'mount-info.json')
base_path = os.path.join(worker_base_path, mount_name)
target_path = os.path.join(base_path, 'target')
logfile_path = os.path.join(base_path, 'mount-log.txt')
password_file_path = os.path.join(base_path, 'password-file')
clean_makedirs(base_path)
# write password file
with open(password_file_path, 'wb') as pass_temp:
password_file = pass_temp.name
subprocess.check_call(f"chmod 600 {password_file}", shell=True)
pass_str = (access_key + ':' + secret_key).encode('latin-1')
pass_temp.write(pass_str)
pass_temp.close()
with open_exclusive(mount_info_path, 'a+') as f:
f.seek(0)
old_info_str = f.read()
logger.debug('read mount info: %s', old_info_str)
if old_info_str:
try:
old_info = json.loads(old_info_str)
except Exception as e:
logger.warning('old mount info corrupted: %s', e)
old_info = dict()
else:
old_info = dict()
t0 = time.time()
new_info = adjust_s3_mounts(
mount_name, old_info,
bucket, mount_mode, password_file_path, address, target_path, logfile_path
)
f.seek(0)
f.truncate()
json.dump(new_info, f)
# wait until the mount was done correctly before returning - in total about 60s
for retry in range(60):
if target_is_mounted(target_path):
dt = round(time.time() - t0, 3)
logging.debug('mount %s working after %s sec', target_path, dt)
break
time.sleep(0.01 + 0.04 * retry)
else:
dt = round(time.time() - t0, 3)
raise RuntimeError(f's3 mount did not start for {target_path} after {dt} sec!')
return target_path
mount_s3fs(bucket, mount_mode, password_file, address, target, logfile)
#
Run subprocess to mount s3fs disk to target. Will wait until completed.
Source code in snoop/data/s3.py
def mount_s3fs(bucket, mount_mode, password_file, address, target, logfile):
"""Run subprocess to mount s3fs disk to target. Will wait until completed."""
# unused options:
# -o use_cache={cache} \\
# -o multipart_copy_size=32 \\
# don't use cache -- it downloads whole file when requested, which
# does not work on very large archives (would need same amount of temp space)
cmd_bash = f"""
s3fs \\
-f \\
-o {mount_mode} \\
-o allow_other \\
-o max_dirty_data=64 \\
-o passwd_file={password_file} \\
-o use_path_request_style \\
-o url=http://{address} \\
{bucket} {target} > {logfile} \\
2>&1 & echo $!
"""
logger.info('running s3fs process: %s', cmd_bash)
tracer.count("mount_s3fs_start")
output = subprocess.check_output(cmd_bash, shell=True)
pid = int(output)
logger.info('s3fs process started with pid %s', pid)
return pid
target_is_mounted(path)
#
Returns True if the path is a linux mount point
Source code in snoop/data/s3.py
def target_is_mounted(path):
"""Returns True if the path is a linux mount point"""
return 0 == subprocess.call(
'findmnt ' + str(path),
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
timestamp()
#
Returns current timestamp float for the mount LRU Cache.
Source code in snoop/data/s3.py
def timestamp():
"""Returns current timestamp float for the mount LRU Cache."""
return time.time()
umount_s3fs(target, pid = None)
#
Run subprocess to umount s3fs disk from target. Will wait until completed.
Source code in snoop/data/s3.py
def umount_s3fs(target, pid=None):
"""Run subprocess to umount s3fs disk from target. Will wait until completed."""
def _pid_alive():
if pid:
return pid in [p.pid for p in psutil.process_iter()]
return False
def _data_mounted():
try:
return bool(os.listdir(target))
except Exception:
return False
subprocess.run(f"umount {target}", shell=True, check=False)
subprocess.run(f"rmdir {target}", shell=True, check=False)
if _pid_alive():
subprocess.run(f"kill {pid}", shell=True, check=False)
if not _pid_alive() and not _data_mounted() and not os.path.isdir(target):
return
for retry in range(10):
subprocess.run(f"umount {target}", shell=True, check=False)
subprocess.run(f"rmdir {target}", shell=True, check=False)
if _pid_alive():
try:
os.kill(pid, signal.SIGSTOP)
except Exception as e:
logger.exception('failed to send SIGSTOP to mount, pid=%s (%s)', pid, e)
try:
os.kill(pid, signal.SIGKILL)
except Exception as e:
logger.exception('failed to send SIGKILL to mount, pid=%s (%s)', pid, e)
if _data_mounted():
subprocess.check_call(f"""
umount {target} || umount -l {target} || true;
""", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,)
if not _data_mounted():
try:
os.rmdir(target)
except Exception as e:
logger.warning('Failed to os.rmdir() the target directory %s (%s)', target, e)
if not _pid_alive() and not _data_mounted() and not os.path.isdir(target):
tracer.count("umount_s3fs_success")
return
time.sleep(0.05 + 0.05 * retry)
tracer.count("umount_s3fs_failed")
raise RuntimeError(f'cannot remove old S3 mounts! target={target}, pid={pid}')