From f0e11ca16e73b8eb99318846189d86d882cb6517 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 3 Aug 2020 12:23:28 +0200 Subject: [PATCH] Improving storage performance (#1659) * added expo generator for more fine grained sleep * deque is more memory and time efficient * removing time wasters * updating file metadata after upload the metadata will be synced by a background worker spawned for for each file when a new upload url is generated * trying to fix codeclimate complaints * - added missing annotations - using logging ensure_future - freformatted code Co-authored-by: Andrei Neagu --- .../src/simcore_service_storage/dsm.py | 195 ++++++++++-------- .../src/simcore_service_storage/utils.py | 17 ++ 2 files changed, 129 insertions(+), 83 deletions(-) diff --git a/services/storage/src/simcore_service_storage/dsm.py b/services/storage/src/simcore_service_storage/dsm.py index 8bb62efb76c..6dbab9e75f2 100644 --- a/services/storage/src/simcore_service_storage/dsm.py +++ b/services/storage/src/simcore_service_storage/dsm.py @@ -7,6 +7,7 @@ from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Dict, List, Optional, Tuple +from collections import deque import aiobotocore import aiofiles @@ -22,7 +23,9 @@ from s3wrapper.s3_client import S3Client from servicelib.aiopg_utils import DBAPIError, PostgresRetryPolicyUponOperation from servicelib.client_session import get_client_session +from servicelib.utils import fire_and_forget_task +from .utils import expo from .datcore_wrapper import DatcoreWrapper from .models import ( DatasetMetaData, @@ -198,7 +201,7 @@ async def list_files( Can filter upon regular expression (for now only on key: value pairs of the FileMetaData) """ - data = [] + data = deque() if location == SIMCORE_S3_STR: async with self.engine.acquire() as conn: query = sa.select([file_meta_data]).where( @@ -234,87 +237,29 @@ async def list_files( # there seems to be no project whatsoever for user_id return [] - # only keep files from non-deleted project --> This needs to be fixed - clean_data = [] + # only keep files from non-deleted project + clean_data = deque() for dx in data: d = dx.fmd - if d.project_id in uuid_name_dict: - d.project_name = uuid_name_dict[d.project_id] - if d.node_id in uuid_name_dict: - d.node_name = uuid_name_dict[d.node_id] - - d.raw_file_path = str( - Path(d.project_id) / Path(d.node_id) / Path(d.file_name) - ) - d.display_file_path = d.raw_file_path - d.file_id = d.file_uuid - if d.node_name and d.project_name: - d.display_file_path = str( - Path(d.project_name) - / Path(d.node_name) - / Path(d.file_name) - ) - async with self.engine.acquire() as conn: - query = ( - file_meta_data.update() - .where( - and_( - file_meta_data.c.node_id == d.node_id, - file_meta_data.c.user_id == d.user_id, - ) - ) - .values( - project_name=d.project_name, - node_name=d.node_name, - raw_file_path=d.raw_file_path, - file_id=d.file_id, - display_file_path=d.display_file_path, - ) - ) - await conn.execute(query) - clean_data.append(dx) + if d.project_id not in uuid_name_dict: + continue - data = clean_data + d.project_name = uuid_name_dict[d.project_id] + if d.node_id in uuid_name_dict: + d.node_name = uuid_name_dict[d.node_id] - # same as above, make sure file is physically present on s3 - clean_data = [] - # FIXME: MaG: This is inefficient: Do this automatically when file is modified - session = aiobotocore.get_session() - async with session.create_client( - "s3", - endpoint_url=self.s3_client.endpoint_url, - aws_access_key_id=self.s3_client.access_key, - aws_secret_access_key=self.s3_client.secret_key, - ) as client: - responses = await asyncio.gather( - *[ - client.list_objects_v2( - Bucket=_d.bucket_name, Prefix=_d.object_name - ) - for _d in [__d.fmd for __d in data] - ] + d.raw_file_path = str( + Path(d.project_id) / Path(d.node_id) / Path(d.file_name) ) - for dx, resp in zip(data, responses): - if "Contents" in resp: - clean_data.append(dx) - d = dx.fmd - d.file_size = resp["Contents"][0]["Size"] - d.last_modified = str(resp["Contents"][0]["LastModified"]) - async with self.engine.acquire() as conn: - query = ( - file_meta_data.update() - .where( - and_( - file_meta_data.c.node_id == d.node_id, - file_meta_data.c.user_id == d.user_id, - ) - ) - .values( - file_size=d.file_size, - last_modified=d.last_modified, - ) - ) - await conn.execute(query) + d.display_file_path = d.raw_file_path + d.file_id = d.file_uuid + if d.node_name and d.project_name: + d.display_file_path = str( + Path(d.project_name) / Path(d.node_name) / Path(d.file_name) + ) + # once the data was sync to postgres metadata table at this point + clean_data.append(dx) + data = clean_data elif location == DATCORE_STR: @@ -324,7 +269,7 @@ async def list_files( if uuid_filter: _query = re.compile(uuid_filter, re.IGNORECASE) - filtered_data = [] + filtered_data = deque() for dx in data: d = dx.fmd if _query.search(d.file_uuid): @@ -334,7 +279,7 @@ async def list_files( if regex: _query = re.compile(regex, re.IGNORECASE) - filtered_data = [] + filtered_data = deque() for dx in data: d = dx.fmd _vars = vars(d) @@ -344,7 +289,7 @@ async def list_files( break return filtered_data - return data + return list(data) async def list_files_dataset( self, user_id: str, location: str, dataset_id: str @@ -468,9 +413,80 @@ async def upload_file_to_datcore( # actually we have to query the master db + async def metadata_file_updater( + self, + file_uuid: str, + bucket_name: str, + object_name: str, + file_size: int, + last_modified: str, + max_update_retries: int = 50, + ): + """ + Will retry max_update_retries to update the metadata on the file after an upload. + If it is not successfull it will exit and log an error. + + Note: MinIO bucket notifications are not available with S3, that's why we have the + following hacky solution + """ + current_iteraction = 0 + + session = aiobotocore.get_session() + async with session.create_client( + "s3", + endpoint_url=self.s3_client.endpoint_url, + aws_access_key_id=self.s3_client.access_key, + aws_secret_access_key=self.s3_client.secret_key, + ) as client: + current_iteraction += 1 + continue_loop = True + sleep_generator = expo() + update_succeeded = False + + while continue_loop: + result = await client.list_objects_v2( + Bucket=bucket_name, Prefix=object_name + ) + sleep_amount = next(sleep_generator) + continue_loop = current_iteraction <= max_update_retries + + if "Contents" not in result: + logger.info("File '%s' was not found in the bucket", object_name) + await asyncio.sleep(sleep_amount) + continue + + new_file_size = result["Contents"][0]["Size"] + new_last_modified = str(result["Contents"][0]["LastModified"]) + if file_size == new_file_size or last_modified == new_last_modified: + logger.info("File '%s' did not change yet", object_name) + await asyncio.sleep(sleep_amount) + continue + + # finally update the data in the database and exit + continue_loop = False + + logger.info( + "Obtained this from S3: new_file_size=%s new_last_modified=%s", + new_file_size, + new_last_modified, + ) + + async with self.engine.acquire() as conn: + query = ( + file_meta_data.update() + .where(file_meta_data.c.file_uuid == file_uuid) + .values( + file_size=new_file_size, last_modified=new_last_modified + ) + ) # primary key search is faster + await conn.execute(query) + update_succeeded = True + if not update_succeeded: + logger.error("Could not update file metadata for '%s'", file_uuid) + async def upload_link(self, user_id: str, file_uuid: str): @retry(**postgres_service_retry_policy_kwargs) - async def _execute_query(): + async def _execute_query() -> Tuple[int, str]: async with self.engine.acquire() as conn: fmd = FileMetaData() fmd.simcore_from_uuid(file_uuid, self.simcore_bucket_name) @@ -484,11 +500,24 @@ async def _execute_query(): if exists is None: ins = file_meta_data.insert().values(**vars(fmd)) await conn.execute(ins) + return fmd.file_size, fmd.last_modified - await _execute_query() + file_size, last_modified = await _execute_query() bucket_name = self.simcore_bucket_name object_name = file_uuid + + # a parallel task is tarted which will update the metadata of the updated file + # once the update has finished. + fire_and_forget_task( + self.metadata_file_updater( + file_uuid=file_uuid, + bucket_name=bucket_name, + object_name=object_name, + file_size=file_size, + last_modified=last_modified, + ) + ) return self.s3_client.create_presigned_put_url(bucket_name, object_name) async def copy_file_s3_s3(self, user_id: str, dest_uuid: str, source_uuid: str): @@ -742,7 +771,7 @@ async def deep_copy_project_simcore_s3( await conn.execute(ins) async def delete_project_simcore_s3( - self, user_id: str, project_id: str, node_id: Optional[str]=None + self, user_id: str, project_id: str, node_id: Optional[str] = None ) -> web.Response: """ Deletes all files from a given node in a project in simcore.s3 and updated db accordingly. If node_id is not given, then all the project files db entries are deleted. diff --git a/services/storage/src/simcore_service_storage/utils.py b/services/storage/src/simcore_service_storage/utils.py index 4e7bc86a56c..7eaaefc358b 100644 --- a/services/storage/src/simcore_service_storage/utils.py +++ b/services/storage/src/simcore_service_storage/utils.py @@ -39,3 +39,20 @@ async def assert_enpoint_is_ok( def is_url(location): return bool(URL(str(location)).host) + + +def expo(base=1.2, factor=0.1, max_value=2): + """Generator for exponential decay. + Args: + base: The mathematical base of the exponentiation operation + factor: Factor to multiply the exponentation by. + max_value: The maximum value until it will yield + """ + n = 0 + while True: + a = factor * base ** n + if max_value is None or a < max_value: + yield a + n += 1 + else: + yield max_value \ No newline at end of file