-
Notifications
You must be signed in to change notification settings - Fork 6
Ingest subset of HLS data for EJ story #146
Comments
@freitagb @xhagrg @slesaad I think this work is probably custom enough that it won't lend itself well to the existing pipelines so I am happy to start the work of manually copying and updating the metadata with some one-off code (unless you see a clearer path--please let me know if you do!). After we have the metadata ingested, do you think the COGs and metadata can be transferred to MCP using exiting pipelines? |
If there are custom filename changes etc we will need to prepare the files accordingly. However, if we can define the collection detail and the COGs are in one of the UAH buckets, we should be able to transfer the data, create the stac records, and create the collection to hold it using the current publication process. |
Do we need to do this? Can we not rely on the stac metadata generation process? |
HLS EJ Subset Update:
|
The subset of HLS data uploaded to the UAH covid-eo-data bucket is now loaded in delta backend staging stack. Both Landsat (L30) and Sentinel (S30) collections have 2021 data in Louisiana and 2017 data in Puerto Rico (see browser links to zoom in on locations). Note that the band combinations are different for the L30 and S30 collections. The load for these tiles is a little slow but this will hopefully be fixed up with the addition of a crs header in the files soon. HLS S30
HLS L30
Ingest code
# python3.8
import os
import json
import boto3
import base64
from botocore.exceptions import ClientError
from pystac.validation import validate_dict
from pypgstac import pypgstac
def get_secret(secret_name:str, profile_name:str=None) -> None:
"""Retrieve secrets from AWS Secrets Manager
Args:
secret_name (str): name of aws secrets manager secret containing database connection secrets
profile_name (str, optional): optional name of aws profile for use in debugger only
Returns:
secrets (dict): decrypted secrets in dict
"""
# Create a Secrets Manager client
if profile_name:
session = boto3.session.Session(profile_name=profile_name)
else:
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager'
)
# In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
# See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
# We rethrow the exception by default.
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
raise e
else:
# Decrypts secret using the associated KMS key.
# Depending on whether the secret is a string or binary, one of these fields will be populated.
if 'SecretString' in get_secret_value_response:
return json.loads(get_secret_value_response['SecretString'])
else:
return json.loads(base64.b64decode(get_secret_value_response['SecretBinary']))
def get_dsn_string(secret:dict) -> str:
"""Form database connection string from a dictionary of connection secrets
Args:
secret (dict): dictionary containing connection secrets including username, database name, host, and password
Returns:
dsn (str): full database data source name
"""
try:
return f"postgres://{secret['username']}:{secret['password']}@{secret['host']}:{secret['port']}/{secret['dbname']}"
except Exception as e:
raise e
# AWS
profile_name = "local-aws-profile-name"
secret_name = "the-secret-name"
# to walk bucket and manage metadata
s3 = boto3.resource("s3")
# to verify keys exist
session = boto3.Session(profile_name=profile_name)
s3_client = session.client('s3')
bucket_name = "covid-eo-data"
BUCKET = boto3.Session().resource("s3").Bucket(bucket_name)
# Collection specific config L30
collection_json_file = "docs-local/hls-l30-002-ej.json"
coll_prefix = "hlsl30-ej"
# Collection specific config S30
# collection_json_file = "docs-local/hls-s30-002-ej.json"
# coll_prefix = "hlss30-ej"
tmp_collection_file = collection_json_file.replace(".json", "-nd.json")
tmp_items_file = collection_json_file.replace(".json", "-items-nd.json")
# 2017 MGRS ID = 19QHA, 2021 MGRS ID = 15RYP
s3_prefixes = [f"{coll_prefix}/2017/19QHA", f"{coll_prefix}/2021/15RYP"]
dry_run = True
if __name__ == "__main__":
with open(collection_json_file) as fl:
collection = json.loads(fl.read())
# Load connection info
con_secrets = get_secret(secret_name, profile_name=profile_name)
dsn = get_dsn_string(
con_secrets
)
# Load collection into pgstac
with open(tmp_collection_file, "w") as f:
f.write(f"{json.dumps(collection)}\n")
print(f"Collection written to {tmp_collection_file}")
if not dry_run:
# Load collection into pgstac and remove temporary file
pypgstac.load(
table="collections",
file=tmp_collection_file,
dsn=dsn,
method="insert_ignore", # use insert_ignore to avoid overwritting existing collection, upsert to replace
)
os.remove(tmp_collection_file)
# Now find items in S3 and update metadata references
updated_stac_items = []
for prefix in s3_prefixes:
# Find all stac metadata
stac_objs = [i for i in BUCKET.objects.filter(Prefix=prefix) if '_stac.json' in i.key]
for stac_obj in stac_objs:
updated_metadata_key = stac_obj.key.replace(".json", "-ej.json")
print(f"Creating and verifying {updated_metadata_key=}")
obj = s3.Object(stac_obj.bucket_name, stac_obj.key)
data = obj.get()['Body'].read().decode('utf-8')
item = json.loads(data)
# Validate source item
try:
validate_dict(item)
except Exception as e:
print(f"WARNING source item is invalid {item['id']} exception={e}")
raise
# Get s3 path for item
stac_metadata_filename = os.path.basename(obj.key)
stac_item_prefix = f"{obj.key.replace(stac_metadata_filename, '')}"
# Update item id
updated_id = f"{item['id']}-ej"
item["id"] = updated_id
# Update collection references
item["collection"] = collection["id"]
# Update asset hrefs
assets = item.get("assets")
updated_assets = {}
for asset_key in assets.keys():
asset = assets[asset_key]
asset_filename = os.path.basename(asset["href"])
updated_prefix = os.path.join(stac_item_prefix, asset_filename)
if "Contents" in s3_client.list_objects_v2(Bucket=bucket_name, Prefix=updated_prefix):
updated_href = f"s3://{bucket_name}/{updated_prefix}"
asset["href"] = updated_href
updated_assets[asset_key] = asset
else:
print(f"WARNING {updated_prefix=} does not exist")
raise
# Update item links
derived_from_link = next((link for link in item.get("links") if link["rel"]=="self"))
derived_from_link["rel"] = "derived_from"
cite_as_link = next((link for link in item.get("links") if link["rel"]=="cite-as"))
item["links"] = [derived_from_link, cite_as_link]
# Validate item
try:
validate_dict(item)
except Exception as e:
print(f"WARNING updated item is invalid {item['id']} exception={e}")
raise
# Append updated metadata to array for bulk pgstac update
updated_stac_items.append(item)
if not dry_run:
# Upload updated metadata to s3
print(f"Uploading {updated_metadata_key}")
s3_client.put_object(
Body=json.dumps(item),
Bucket=bucket_name,
Key=updated_metadata_key
)
del item
# Write items to tmp ndjson file
with open(tmp_items_file, "w") as f:
f.write("\n".join([json.dumps(x) for x in sorted(updated_stac_items, key=lambda x: x["properties"]["datetime"])]))
if not dry_run:
# Load items into pgstac and then delete temp file
pypgstac.load(
table="items",
file=tmp_items_file,
dsn=dsn,
method="insert_ignore", # use insert_ignore to avoid overwritting existing items or upsert to replace
)
os.remove(tmp_items_file)
print("fin.") |
Epic
#89
Description
Ingest STAC metadata for a small subset of HLS data that have been uploaded directly to IMPACT-owned bucket(s) into new
hls(l|s)30-ej
collections to unblock dashboard environmental justice story.Strategy/notes
New collections
-ej
)dashboard:*
properties.New Items
Walk the buckets in UAH
-ej
) and update the href for all assets to match the UAH bucket (this is necessary because item id needs to unique for the entire pgstac catalog)UAH account data location
Rationale
This temporary work around has two purposes:
Acceptance Criteria:
Checklist:
Concept diagramsThe text was updated successfully, but these errors were encountered: