Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: copy core schema jsons in s3 and keep in sync with docdb #57

Merged
merged 19 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e16c97f
feat: populate job copies original core jsons to original_metadata dir
helen-m-lin May 21, 2024
78dae8b
feat: populate job overwrites core jsons with core fields from metada…
helen-m-lin May 21, 2024
77a5e4c
test: add tests for utils.py
helen-m-lin May 22, 2024
3050959
test: add tests for populate_s3_with_metadata_files.py
helen-m-lin May 22, 2024
c13424b
feat: remove metadata_nd_overwrite from JobSettings since always True
helen-m-lin May 22, 2024
8bdec32
fix: bug in utils.is_dict_corrupt
helen-m-lin May 23, 2024
7b98b6b
feat: populate job deletes corrupt core jsons after copy
helen-m-lin May 23, 2024
d94b4c3
refactor: add copy_then_overwrite_core_json_files to utils
helen-m-lin May 23, 2024
f919b74
feat: add check if /original_metadata folder already exists
helen-m-lin May 23, 2024
0c0a823
feat: aind_bucket_indexer will copy_then_overwrite_core_json_files wh…
helen-m-lin May 24, 2024
830a783
feat: aind_bucket_indexer will sync core metadata jsons on docdb updates
helen-m-lin May 24, 2024
46ef89d
feat: sync_core_json_files and unit tests
helen-m-lin May 24, 2024
d88e934
chore: docstrings and linters
helen-m-lin May 24, 2024
6353039
feat: add copy_original_md_subdir to IndexJobSettings
helen-m-lin May 24, 2024
2c6afdc
feat: use last modified dates from s3 as date_stamps
helen-m-lin May 24, 2024
7938285
test: update unit tests
helen-m-lin May 24, 2024
a5561c1
test: update unit test
helen-m-lin May 24, 2024
3f3a5c8
Merge branch 'main' into feat-55-core-schema-s3
helen-m-lin May 28, 2024
9491b8e
resolve merge conflicts
helen-m-lin May 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/aind_data_asset_indexer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Package"""

__version__ = "0.5.0"
117 changes: 94 additions & 23 deletions src/aind_data_asset_indexer/aind_bucket_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
build_docdb_location_to_id_map,
build_metadata_record_from_prefix,
compute_md5_hash,
copy_then_overwrite_core_json_files,
create_metadata_object_key,
does_s3_object_exist,
does_s3_prefix_exist,
download_json_file_from_s3,
get_dict_of_file_info,
get_s3_bucket_and_prefix,
get_s3_location,
is_record_location_valid,
iterate_through_top_level,
paginate_docdb,
sync_core_json_files,
upload_metadata_json_str_to_s3,
)

Expand Down Expand Up @@ -79,12 +83,11 @@ def _process_docdb_record(
None

"""
if not is_record_location_valid(
docdb_record, self.job_settings.s3_bucket
):
bucket = self.job_settings.s3_bucket
if not is_record_location_valid(docdb_record, bucket):
logging.warning(
f"Record location {docdb_record.get('location')} is not valid "
f"for bucket {self.job_settings.s3_bucket}!"
f"for bucket {bucket}!"
)
else:
s3_parts = get_s3_bucket_and_prefix(docdb_record["location"])
Expand All @@ -96,7 +99,8 @@ def _process_docdb_record(
)
if not does_file_exist_in_s3:
logging.warning(
f"File not found in S3 at s3://{s3_bucket}/{object_key}! "
f"File not found in S3 at "
f"{get_s3_location(s3_bucket, object_key)}! "
f"Removing metadata record from DocDb."
)
db = docdb_client[self.job_settings.doc_db_db_name]
Expand All @@ -117,6 +121,39 @@ def _process_docdb_record(
else s3_object_info["e_tag"].strip('"')
)
if record_md5_hash != s3_object_hash:
copy_exists_in_s3 = does_s3_prefix_exist(
s3_client=s3_client,
bucket=s3_bucket,
prefix=f"{prefix}/original_metadata",
)
if copy_exists_in_s3:
# if /original_metadata exists, then we only need to
# overwrite the top-level jsons of updated core_fields
sync_core_json_files(
metadata_json=record_as_json_str,
bucket=s3_bucket,
prefix=prefix,
s3_client=s3_client,
log_flag=True,
)
else:
# if /original_metadata does not exist, then we need
# to copy and overwrite all the core jsons using the
# new metadata.nd.json
copy_then_overwrite_core_json_files(
metadata_json=record_as_json_str,
bucket=s3_bucket,
prefix=prefix,
s3_client=s3_client,
log_flag=True,
copy_original_md_subdir=(
self.job_settings.copy_original_md_subdir
),
)
logging.info(
f"Uploading metadata record for: "
f"{docdb_record['location']}"
)
response = upload_metadata_json_str_to_s3(
bucket=s3_bucket,
prefix=prefix,
Expand All @@ -126,8 +163,8 @@ def _process_docdb_record(
logging.info(response)
else:
logging.info(
f"Objects are same. Skipping saving to "
f"s3://{self.job_settings.s3_bucket}/{prefix}."
f"Metadata records are same. Skipping saving to "
f"{docdb_record['location']}."
)

def _dask_task_to_process_record_list(
Expand Down Expand Up @@ -190,6 +227,16 @@ def _process_prefix(
location_to_id_map: Dict[str, str],
):
"""
Processes a prefix in S3.
# If metadata record exists in S3 and DocDB, do nothing.
# If record is in S3 but not DocDb, then copy it to DocDb if the
# location in the metadata record matches the actual location.
# Otherwise, log a warning.
# If record does not exist in both DocDB and S3, build a new metadata
# file and save it to S3 (assume Lambda function will save to DocDB).
# In both cases above, we also copy the original core json files to a
# subfolder and ensure the top level core jsons are in sync with the
# metadata.nd.json in S3.

Parameters
----------
Expand All @@ -203,34 +250,35 @@ def _process_prefix(

"""
# Check if metadata record exists
stripped_prefix = s3_prefix.strip("/")
location = f"s3://{self.job_settings.s3_bucket}/{stripped_prefix}"
bucket = self.job_settings.s3_bucket
location = get_s3_location(bucket=bucket, prefix=s3_prefix)
if location_to_id_map.get(location) is not None:
record_id = location_to_id_map.get(location)
else:
record_id = None
object_key = create_metadata_object_key(prefix=s3_prefix)
does_metadata_file_exist = does_s3_object_exist(
s3_client=s3_client,
bucket=self.job_settings.s3_bucket,
bucket=bucket,
key=object_key,
)
if does_metadata_file_exist:
# If record not in DocDb, then copy it to DocDb if the location
# in the metadata record matches the location the record lives in
# Otherwise, log a warning that the metadata record location does
# not make sense.
s3_full_location = get_s3_location(bucket, object_key)
if record_id is None:
json_contents = download_json_file_from_s3(
s3_client=s3_client,
bucket=self.job_settings.s3_bucket,
bucket=bucket,
object_key=object_key,
)
if json_contents:
# noinspection PyTypeChecker
if is_record_location_valid(
json_contents,
expected_bucket=self.job_settings.s3_bucket,
expected_bucket=bucket,
expected_prefix=s3_prefix,
):
db = docdb_client[self.job_settings.doc_db_db_name]
Expand All @@ -243,36 +291,60 @@ def _process_prefix(
upsert=True,
)
logging.info(response.raw_result)
# ensure core jsons are synced with metadata.nd.json
copy_then_overwrite_core_json_files(
metadata_json=json.dumps(
json_contents, default=str
),
bucket=bucket,
prefix=s3_prefix,
s3_client=s3_client,
log_flag=True,
copy_original_md_subdir=(
self.job_settings.copy_original_md_subdir
),
)
else:
logging.warning(
f"Location field in record "
f"{json_contents.get('location')} does not match "
f"actual location of record "
f"s3://{self.job_settings.s3_bucket}/{s3_prefix}!"
f"actual location of record {location}!"
)
else:
logging.warning(
f"Unable to download file from S3!"
f" s3://{self.job_settings.s3_bucket}/{object_key}"
f"Unable to download file from S3 for: "
f"{s3_full_location}!"
)
else:
logging.info(
f"Record for s3://{self.job_settings.s3_bucket}/"
f"{object_key} already exists in DocDb. Skipping."
f"Metadata record for {s3_full_location} "
f"already exists in DocDb. Skipping."
)
else: # metadata.nd.json file does not exist in S3. Create a new one.
# Build a new metadata file, save it to S3 and save it to DocDb.
# Also copy the original core json files to a subfolder and then
# overwrite them with the new fields from metadata.nd.json.
new_metadata_contents = build_metadata_record_from_prefix(
bucket=self.job_settings.s3_bucket,
bucket=bucket,
prefix=s3_prefix,
metadata_nd_overwrite=True,
s3_client=s3_client,
)
if new_metadata_contents is not None:
# noinspection PyTypeChecker
copy_then_overwrite_core_json_files(
metadata_json=new_metadata_contents,
bucket=bucket,
prefix=s3_prefix,
s3_client=s3_client,
log_flag=True,
copy_original_md_subdir=(
self.job_settings.copy_original_md_subdir
),
)
logging.info(f"Uploading metadata record for: {location}")
s3_response = upload_metadata_json_str_to_s3(
metadata_json=new_metadata_contents,
bucket=self.job_settings.s3_bucket,
bucket=bucket,
prefix=s3_prefix,
s3_client=s3_client,
)
Expand All @@ -281,8 +353,7 @@ def _process_prefix(
# then next index job will pick it up.
else:
logging.warning(
f"Was unable to build metadata record for: "
f"s3://{self.job_settings.s3_bucket}/{stripped_prefix}"
f"Unable to build metadata record for: {location}!"
)

def _dask_task_to_process_prefix_list(self, prefix_list: List[str]):
Expand Down
8 changes: 3 additions & 5 deletions src/aind_data_asset_indexer/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ class IndexJobSettings(BaseSettings):
"set to None, then all records will be processed."
),
)
metadata_nd_overwrite: bool = Field(
default=False,
copy_original_md_subdir: str = Field(
default="original_metadata",
description=(
"If set to True, will ignore the metadata.nd.json file and use "
"the core schemas to build a new one. If set to False, then use"
"the metadata.nd.json file if it exists in S3."
"Subdirectory to copy original core schema json files to."
),
)

Expand Down
4 changes: 3 additions & 1 deletion src/aind_data_asset_indexer/populate_aind_buckets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@


class PopulateAindBucketsJob:
"""Job to populate a list of aind buckets with metadata.nd.json files."""
"""Job to populate a list of aind buckets with metadata json files
and copy original core schema jsons to a subfolder.
"""

def __init__(self, job_settings: PopulateAindBucketsJobSettings):
"""Class constructor."""
Expand Down
39 changes: 28 additions & 11 deletions src/aind_data_asset_indexer/populate_s3_with_metadata_files.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Module to handle populating s3 bucket with metadata files."""

import argparse
import logging
import os
Expand All @@ -13,6 +14,8 @@
from aind_data_asset_indexer.models import IndexJobSettings
from aind_data_asset_indexer.utils import (
build_metadata_record_from_prefix,
copy_then_overwrite_core_json_files,
get_s3_location,
iterate_through_top_level,
upload_metadata_json_str_to_s3,
)
Expand All @@ -27,12 +30,12 @@ class AindPopulateMetadataJsonJob:
1) Crawl through an S3 bucket
2) Look inside each prefix that adheres to data asset naming convention
3) If the name is a data asset name, then it will look inside the prefix
4.0) If there is no metadata.nd.json file, then it will create one by using
any of the core json files it finds.
4.1) If the metadata_nd_overwrite option is set to False, then it will pass
a data asset if there is already a metadata.nd.json in that folder. If set
to True, then it will write a new metadata.nd.json file even if one already
exists.
4) It will create a metadata.nd.json by using any of the core json files
it finds. Any existing metadata.nd.json will be overwritten.
5.1) The contents of any existing core json files will be copied to
/original_metadata/{core_schema}.{date_stamp}.json.
5.2) The core json files will be overwritten with the new fields from
metadata.nd.json or deleted if they are not found in metadata.nd.json.
"""

def __init__(self, job_settings: IndexJobSettings):
Expand All @@ -42,6 +45,9 @@ def __init__(self, job_settings: IndexJobSettings):
def _process_prefix(self, prefix: str, s3_client: S3Client):
"""
For a given prefix, build a metadata record and upload it to S3.
Original core json files will be first copied to a subfolder,
and then overwritten with the new fields from metadata.nd.json,
or deleted if the new field is None.
Parameters
----------
prefix : str
Expand All @@ -52,25 +58,36 @@ def _process_prefix(self, prefix: str, s3_client: S3Client):
None

"""
bucket = self.job_settings.s3_bucket
location = get_s3_location(bucket=bucket, prefix=prefix)
md_record = build_metadata_record_from_prefix(
prefix=prefix,
s3_client=s3_client,
bucket=self.job_settings.s3_bucket,
metadata_nd_overwrite=self.job_settings.metadata_nd_overwrite,
bucket=bucket,
)
if md_record is not None:
copy_then_overwrite_core_json_files(
metadata_json=md_record,
bucket=bucket,
prefix=prefix,
s3_client=s3_client,
log_flag=True,
copy_original_md_subdir=(
self.job_settings.copy_original_md_subdir
),
)
logging.info(f"Uploading metadata record for: {location}")
# noinspection PyTypeChecker
response = upload_metadata_json_str_to_s3(
metadata_json=md_record,
bucket=self.job_settings.s3_bucket,
bucket=bucket,
prefix=prefix,
s3_client=s3_client,
)
logging.info(response)
else:
logging.warning(
f"Metadata record is None for "
f"s3://{self.job_settings.s3_bucket}/{prefix}!"
f"Unable to build metadata record for: {location}!"
)

def _dask_task_to_process_prefix_list(
Expand Down
Loading
Loading