Skip to content

Commit

Permalink
Refactor 52 log messages (#100)
Browse files Browse the repository at this point in the history
* refactor: logs and scripts dir

---------

Co-authored-by: Helen Lin <[email protected]>
  • Loading branch information
jtyoung84 and helen-m-lin authored Sep 20, 2024
1 parent 073faa6 commit 96817ca
Show file tree
Hide file tree
Showing 12 changed files with 682 additions and 805 deletions.
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ FROM python:3.10-slim
WORKDIR /app

ADD src ./src
ADD scripts ./scripts
ADD pyproject.toml .
ADD setup.py .

RUN apt-get update
RUN pip install . --no-cache-dir
RUN pip install awscli

RUN chmod +x ./src/aind_data_asset_indexer/run.sh
CMD ["./src/aind_data_asset_indexer/run.sh"]
RUN chmod +x ./scripts/run.sh
CMD ["./scripts/run.sh"]
File renamed without changes.
31 changes: 17 additions & 14 deletions src/aind_data_asset_indexer/aind_bucket_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,14 @@ def _resolve_schema_information(
object_key = create_object_key(
prefix=prefix, filename=core_schema_file_name
)
common_kwargs["core_schema_info_in_root"] = (
get_dict_of_file_info(
s3_client=s3_client,
bucket=self.job_settings.s3_bucket,
keys=[object_key],
).get(object_key)
common_kwargs[
"core_schema_info_in_root"
] = get_dict_of_file_info(
s3_client=s3_client,
bucket=self.job_settings.s3_bucket,
keys=[object_key],
).get(
object_key
)
self._copy_file_from_root_to_subdir(**common_kwargs)
# If field is null, a file exists in the root folder, and
Expand Down Expand Up @@ -391,7 +393,7 @@ def _process_docdb_record(
response = collection.delete_one(
filter={"_id": docdb_record["_id"]}
)
logging.info(response.raw_result)
logging.debug(response.raw_result)
else: # There is a metadata.nd.json file in S3.
# Schema info in root level directory
s3_core_schema_info = get_dict_of_core_schema_file_info(
Expand Down Expand Up @@ -422,9 +424,9 @@ def _process_docdb_record(
)
db = docdb_client[self.job_settings.doc_db_db_name]
collection = db[self.job_settings.doc_db_collection_name]
fields_to_update["last_modified"] = (
datetime.utcnow().isoformat()
)
fields_to_update[
"last_modified"
] = datetime.utcnow().isoformat()
response = collection.update_one(
{"_id": docdb_record["_id"]},
{"$set": fields_to_update},
Expand Down Expand Up @@ -580,20 +582,22 @@ def _process_prefix(
]
if "_id" in json_contents:
# TODO: check is_dict_corrupt(json_contents)
logging.info(
f"Adding record to docdb for: {location}"
)
response = collection.update_one(
{"_id": json_contents["_id"]},
{"$set": json_contents},
upsert=True,
)
logging.info(response.raw_result)
logging.debug(response.raw_result)
cond_copy_then_sync_core_json_files(
metadata_json=json.dumps(
json_contents, default=str
),
bucket=bucket,
prefix=s3_prefix,
s3_client=s3_client,
log_flag=True,
copy_original_md_subdir=(
self.job_settings.copy_original_md_subdir
),
Expand Down Expand Up @@ -635,7 +639,6 @@ def _process_prefix(
bucket=bucket,
prefix=s3_prefix,
s3_client=s3_client,
log_flag=True,
copy_original_md_subdir=(
self.job_settings.copy_original_md_subdir
),
Expand All @@ -648,7 +651,7 @@ def _process_prefix(
prefix=s3_prefix,
s3_client=s3_client,
)
logging.info(s3_response)
logging.debug(s3_response)
# Assume Lambda function will move it to DocDb. If it doesn't,
# then next index job will pick it up.
else:
Expand Down
5 changes: 3 additions & 2 deletions src/aind_data_asset_indexer/codeocean_bucket_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def _process_codeocean_record(
{"$set": json_contents},
upsert=True,
)
logging.info(x.raw_result)
logging.debug(x.raw_result)
else:
logging.warning(
f"Unable to build metadata record for: {location}!"
Expand Down Expand Up @@ -363,10 +363,11 @@ def _dask_task_to_delete_record_list(self, record_list: List[str]):
db = docdb_client[self.job_settings.doc_db_db_name]
collection = db[self.job_settings.doc_db_collection_name]
try:
logging.info(f"Removing {len(record_list)} records")
response = collection.delete_many(
filter={"_id": {"$in": record_list}}
)
logging.info(response.raw_result)
logging.debug(response.raw_result)
except Exception as e:
logging.error(f"Error deleting records: {repr(e)}")
docdb_client.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def _process_prefix(self, prefix: str, s3_client: S3Client):
bucket=bucket,
prefix=prefix,
s3_client=s3_client,
log_flag=True,
copy_original_md_subdir=(
self.job_settings.copy_original_md_subdir
),
Expand All @@ -92,7 +91,7 @@ def _process_prefix(self, prefix: str, s3_client: S3Client):
prefix=prefix,
s3_client=s3_client,
)
logging.info(response)
logging.debug(response)
else:
logging.warning(
f"Unable to build metadata record for: {location}!"
Expand Down
113 changes: 23 additions & 90 deletions src/aind_data_asset_indexer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,6 @@
]


def _log_message(
message: str, log_level: int = logging.INFO, log_flag: bool = True
) -> None:
"""
Log a message using the given log level. If log_flag is False,
then it will not log anything.
Parameters
----------
message : str
log_level : int
Default is logging.INFO
log_flag : bool
Default is True
Returns
-------
None
"""
if not log_flag:
return
if log_level not in [
logging.DEBUG,
logging.INFO,
logging.WARNING,
logging.ERROR,
logging.CRITICAL,
]:
raise ValueError("Invalid log level")
logging.log(log_level, message)


def create_object_key(prefix: str, filename: str) -> str:
"""
For a given s3 prefix and filename, create the expected
Expand Down Expand Up @@ -656,7 +624,6 @@ def cond_copy_then_sync_core_json_files(
prefix: str,
s3_client: S3Client,
copy_original_md_subdir: str = "original_metadata",
log_flag: bool = False,
) -> None:
"""
For a given bucket and prefix
Expand All @@ -675,8 +642,6 @@ def cond_copy_then_sync_core_json_files(
The prefix for the S3 object keys.
s3_client : S3Client
The S3 client object.
log_flag: bool
Flag indicating whether to log operations. Default is False.
copy_original_md_subdir : str
Subdirectory to copy original core schema json files to.
Default is 'original_metadata'.
Expand All @@ -692,27 +657,22 @@ def cond_copy_then_sync_core_json_files(
prefix=prefix,
copy_subdir=copy_original_md_subdir,
):
_log_message(
message=(
"Copy of original metadata already exists at "
f"s3://{bucket}/{prefix}/{copy_original_md_subdir}"
),
log_flag=log_flag,
logging.warning(
"Copy of original metadata already exists at "
f"s3://{bucket}/{prefix}/{copy_original_md_subdir}"
)
else:
copy_core_json_files(
bucket=bucket,
prefix=prefix,
s3_client=s3_client,
copy_original_md_subdir=copy_original_md_subdir,
log_flag=log_flag,
)
sync_core_json_files(
metadata_json=metadata_json,
bucket=bucket,
prefix=prefix,
s3_client=s3_client,
log_flag=log_flag,
)


Expand All @@ -721,7 +681,6 @@ def copy_core_json_files(
prefix: str,
s3_client: S3Client,
copy_original_md_subdir: str,
log_flag: bool = False,
) -> None:
"""
For a given bucket and prefix, copy the core schema files to a
Expand All @@ -735,8 +694,6 @@ def copy_core_json_files(
The prefix for the S3 object keys.
s3_client : S3Client
The S3 client object.
log_flag: bool
Flag indicating whether to log operations. Default is False.
copy_original_md_subdir : str
Subdirectory to copy original core schema json files to.
For example, 'original_metadata'.
Expand Down Expand Up @@ -766,23 +723,17 @@ def copy_core_json_files(
filename=file_name.replace(".json", f".{date_stamp}.json"),
)
# Copy original core json files to /original_metadata
_log_message(
message=f"Copying {source} to {target} in s3://{bucket}",
log_flag=log_flag,
)
logging.info(f"Copying {source} to {target} in s3://{bucket}")
response = s3_client.copy_object(
Bucket=bucket,
CopySource={"Bucket": bucket, "Key": source},
Key=target,
)
_log_message(message=response, log_flag=log_flag)
logging.debug(response)
else:
_log_message(
message=(
f"Source file {source_location} does not exist. "
f"Skipping copy."
),
log_flag=log_flag,
logging.info(
f"Source file {source_location} does not exist. "
f"Skipping copy."
)


Expand All @@ -791,7 +742,6 @@ def sync_core_json_files(
bucket: str,
prefix: str,
s3_client: S3Client,
log_flag: bool = False,
) -> None:
"""
Sync the core schema files with the core fields from metadata.nd.json.
Expand All @@ -810,8 +760,6 @@ def sync_core_json_files(
The prefix for the S3 object keys.
s3_client : S3Client
The S3 client object.
log_flag: bool
Flag indicating whether to log operations. Default is False.
Returns
-------
Expand All @@ -838,66 +786,51 @@ def sync_core_json_files(
# Core schema jsons are created if they don't already exist.
# Otherwise, they are only updated if their contents are outdated.
if core_files_infos[object_key] is None:
_log_message(
message=(f"Uploading new {field_name} to {location}"),
log_flag=log_flag,
)
logging.info(f"Uploading new {field_name} to {location}")
response = upload_json_str_to_s3(
bucket=bucket,
object_key=object_key,
json_str=field_contents_str,
s3_client=s3_client,
)
_log_message(message=response, log_flag=log_flag)
logging.debug(response)
else:
s3_object_hash = core_files_infos[object_key]["e_tag"].strip(
'"'
)
core_field_md5_hash = compute_md5_hash(field_contents_str)
if core_field_md5_hash != s3_object_hash:
_log_message(
message=(
f"Uploading updated {field_name} to {location}"
),
log_flag=log_flag,
logging.info(
f"Uploading updated {field_name} to {location}"
)
response = upload_json_str_to_s3(
bucket=bucket,
object_key=object_key,
json_str=field_contents_str,
s3_client=s3_client,
)
_log_message(message=response, log_flag=log_flag)
logging.debug(response)
else:
_log_message(
message=(
f"{field_name} is up-to-date in {location}. "
f"Skipping."
),
log_flag=log_flag,
logging.info(
f"{field_name} is up-to-date in {location}. "
f"Skipping."
)
else:
# If a core field is None but the core json exists,
# delete the core json.
if core_files_infos[object_key] is not None:
_log_message(
message=(
f"{field_name} not found in metadata.nd.json for "
f"{prefix} but {location} exists! Deleting."
),
log_flag=log_flag,
logging.info(
f"{field_name} not found in metadata.nd.json for "
f"{prefix} but {location} exists! Deleting."
)
response = s3_client.delete_object(
Bucket=bucket, Key=object_key
)
_log_message(message=response, log_flag=log_flag)
logging.debug(response)
else:
_log_message(
message=(
f"{field_name} not found in metadata.nd.json for "
f"{prefix} nor in {location}! Skipping."
),
log_flag=log_flag,
logging.info(
f"{field_name} not found in metadata.nd.json for "
f"{prefix} nor in {location}! Skipping."
)


Expand Down
Loading

0 comments on commit 96817ca

Please sign in to comment.