Skip to content

Commit

Permalink
Improving storage performance (#1659)
Browse files Browse the repository at this point in the history
* added expo generator for more fine grained sleep

* deque is more memory and time efficient

* removing time wasters

* updating file metadata after upload
the metadata will be synced by a background worker spawned for
for each file when a new upload url is generated

* trying to fix codeclimate complaints

* - added missing annotations
- using logging ensure_future
- freformatted code

Co-authored-by: Andrei Neagu <[email protected]>
  • Loading branch information
GitHK and Andrei Neagu authored Aug 3, 2020
1 parent a12817d commit f0e11ca
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 83 deletions.
195 changes: 112 additions & 83 deletions services/storage/src/simcore_service_storage/dsm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from collections import deque

import aiobotocore
import aiofiles
Expand All @@ -22,7 +23,9 @@
from s3wrapper.s3_client import S3Client
from servicelib.aiopg_utils import DBAPIError, PostgresRetryPolicyUponOperation
from servicelib.client_session import get_client_session
from servicelib.utils import fire_and_forget_task

from .utils import expo
from .datcore_wrapper import DatcoreWrapper
from .models import (
DatasetMetaData,
Expand Down Expand Up @@ -198,7 +201,7 @@ async def list_files(
Can filter upon regular expression (for now only on key: value pairs of the FileMetaData)
"""
data = []
data = deque()
if location == SIMCORE_S3_STR:
async with self.engine.acquire() as conn:
query = sa.select([file_meta_data]).where(
Expand Down Expand Up @@ -234,87 +237,29 @@ async def list_files(
# there seems to be no project whatsoever for user_id
return []

# only keep files from non-deleted project --> This needs to be fixed
clean_data = []
# only keep files from non-deleted project
clean_data = deque()
for dx in data:
d = dx.fmd
if d.project_id in uuid_name_dict:
d.project_name = uuid_name_dict[d.project_id]
if d.node_id in uuid_name_dict:
d.node_name = uuid_name_dict[d.node_id]

d.raw_file_path = str(
Path(d.project_id) / Path(d.node_id) / Path(d.file_name)
)
d.display_file_path = d.raw_file_path
d.file_id = d.file_uuid
if d.node_name and d.project_name:
d.display_file_path = str(
Path(d.project_name)
/ Path(d.node_name)
/ Path(d.file_name)
)
async with self.engine.acquire() as conn:
query = (
file_meta_data.update()
.where(
and_(
file_meta_data.c.node_id == d.node_id,
file_meta_data.c.user_id == d.user_id,
)
)
.values(
project_name=d.project_name,
node_name=d.node_name,
raw_file_path=d.raw_file_path,
file_id=d.file_id,
display_file_path=d.display_file_path,
)
)
await conn.execute(query)
clean_data.append(dx)
if d.project_id not in uuid_name_dict:
continue

data = clean_data
d.project_name = uuid_name_dict[d.project_id]
if d.node_id in uuid_name_dict:
d.node_name = uuid_name_dict[d.node_id]

# same as above, make sure file is physically present on s3
clean_data = []
# FIXME: MaG: This is inefficient: Do this automatically when file is modified
session = aiobotocore.get_session()
async with session.create_client(
"s3",
endpoint_url=self.s3_client.endpoint_url,
aws_access_key_id=self.s3_client.access_key,
aws_secret_access_key=self.s3_client.secret_key,
) as client:
responses = await asyncio.gather(
*[
client.list_objects_v2(
Bucket=_d.bucket_name, Prefix=_d.object_name
)
for _d in [__d.fmd for __d in data]
]
d.raw_file_path = str(
Path(d.project_id) / Path(d.node_id) / Path(d.file_name)
)
for dx, resp in zip(data, responses):
if "Contents" in resp:
clean_data.append(dx)
d = dx.fmd
d.file_size = resp["Contents"][0]["Size"]
d.last_modified = str(resp["Contents"][0]["LastModified"])
async with self.engine.acquire() as conn:
query = (
file_meta_data.update()
.where(
and_(
file_meta_data.c.node_id == d.node_id,
file_meta_data.c.user_id == d.user_id,
)
)
.values(
file_size=d.file_size,
last_modified=d.last_modified,
)
)
await conn.execute(query)
d.display_file_path = d.raw_file_path
d.file_id = d.file_uuid
if d.node_name and d.project_name:
d.display_file_path = str(
Path(d.project_name) / Path(d.node_name) / Path(d.file_name)
)
# once the data was sync to postgres metadata table at this point
clean_data.append(dx)

data = clean_data

elif location == DATCORE_STR:
Expand All @@ -324,7 +269,7 @@ async def list_files(

if uuid_filter:
_query = re.compile(uuid_filter, re.IGNORECASE)
filtered_data = []
filtered_data = deque()
for dx in data:
d = dx.fmd
if _query.search(d.file_uuid):
Expand All @@ -334,7 +279,7 @@ async def list_files(

if regex:
_query = re.compile(regex, re.IGNORECASE)
filtered_data = []
filtered_data = deque()
for dx in data:
d = dx.fmd
_vars = vars(d)
Expand All @@ -344,7 +289,7 @@ async def list_files(
break
return filtered_data

return data
return list(data)

async def list_files_dataset(
self, user_id: str, location: str, dataset_id: str
Expand Down Expand Up @@ -468,9 +413,80 @@ async def upload_file_to_datcore(

# actually we have to query the master db

async def metadata_file_updater(
self,
file_uuid: str,
bucket_name: str,
object_name: str,
file_size: int,
last_modified: str,
max_update_retries: int = 50,
):
"""
Will retry max_update_retries to update the metadata on the file after an upload.
If it is not successfull it will exit and log an error.
Note: MinIO bucket notifications are not available with S3, that's why we have the
following hacky solution
"""
current_iteraction = 0

session = aiobotocore.get_session()
async with session.create_client(
"s3",
endpoint_url=self.s3_client.endpoint_url,
aws_access_key_id=self.s3_client.access_key,
aws_secret_access_key=self.s3_client.secret_key,
) as client:
current_iteraction += 1
continue_loop = True
sleep_generator = expo()
update_succeeded = False

while continue_loop:
result = await client.list_objects_v2(
Bucket=bucket_name, Prefix=object_name
)
sleep_amount = next(sleep_generator)
continue_loop = current_iteraction <= max_update_retries

if "Contents" not in result:
logger.info("File '%s' was not found in the bucket", object_name)
await asyncio.sleep(sleep_amount)
continue

new_file_size = result["Contents"][0]["Size"]
new_last_modified = str(result["Contents"][0]["LastModified"])
if file_size == new_file_size or last_modified == new_last_modified:
logger.info("File '%s' did not change yet", object_name)
await asyncio.sleep(sleep_amount)
continue

# finally update the data in the database and exit
continue_loop = False

logger.info(
"Obtained this from S3: new_file_size=%s new_last_modified=%s",
new_file_size,
new_last_modified,
)

async with self.engine.acquire() as conn:
query = (
file_meta_data.update()
.where(file_meta_data.c.file_uuid == file_uuid)
.values(
file_size=new_file_size, last_modified=new_last_modified
)
) # primary key search is faster
await conn.execute(query)
update_succeeded = True
if not update_succeeded:
logger.error("Could not update file metadata for '%s'", file_uuid)

async def upload_link(self, user_id: str, file_uuid: str):
@retry(**postgres_service_retry_policy_kwargs)
async def _execute_query():
async def _execute_query() -> Tuple[int, str]:
async with self.engine.acquire() as conn:
fmd = FileMetaData()
fmd.simcore_from_uuid(file_uuid, self.simcore_bucket_name)
Expand All @@ -484,11 +500,24 @@ async def _execute_query():
if exists is None:
ins = file_meta_data.insert().values(**vars(fmd))
await conn.execute(ins)
return fmd.file_size, fmd.last_modified

await _execute_query()
file_size, last_modified = await _execute_query()

bucket_name = self.simcore_bucket_name
object_name = file_uuid

# a parallel task is tarted which will update the metadata of the updated file
# once the update has finished.
fire_and_forget_task(
self.metadata_file_updater(
file_uuid=file_uuid,
bucket_name=bucket_name,
object_name=object_name,
file_size=file_size,
last_modified=last_modified,
)
)
return self.s3_client.create_presigned_put_url(bucket_name, object_name)

async def copy_file_s3_s3(self, user_id: str, dest_uuid: str, source_uuid: str):
Expand Down Expand Up @@ -742,7 +771,7 @@ async def deep_copy_project_simcore_s3(
await conn.execute(ins)

async def delete_project_simcore_s3(
self, user_id: str, project_id: str, node_id: Optional[str]=None
self, user_id: str, project_id: str, node_id: Optional[str] = None
) -> web.Response:
""" Deletes all files from a given node in a project in simcore.s3 and updated db accordingly.
If node_id is not given, then all the project files db entries are deleted.
Expand Down
17 changes: 17 additions & 0 deletions services/storage/src/simcore_service_storage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,20 @@ async def assert_enpoint_is_ok(

def is_url(location):
return bool(URL(str(location)).host)


def expo(base=1.2, factor=0.1, max_value=2):
"""Generator for exponential decay.
Args:
base: The mathematical base of the exponentiation operation
factor: Factor to multiply the exponentation by.
max_value: The maximum value until it will yield
"""
n = 0
while True:
a = factor * base ** n
if max_value is None or a < max_value:
yield a
n += 1
else:
yield max_value

0 comments on commit f0e11ca

Please sign in to comment.