Skip to content

Commit

Permalink
Adding readme, adding finalize and create methods, splitting them int…
Browse files Browse the repository at this point in the history
…o their own modules
  • Loading branch information
kylemann16 committed Nov 14, 2024
1 parent ce241f2 commit f9d921c
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 60 deletions.
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
### StacEm
A STAC Directory created using WESM JSON. This document will walk you through the usage of this python project, `stacem`.

#### STAC Structure
In this project, we're creating a very straight forward STAC structure that's very similar to a normal directory structure you may use on your personal computer.

At the coarsest layer, we have the `Catalog`, which is written at `catalog.json` in your base directory. This `Catalog` organizes all of our projects underneath it, and houses the links to each one.

Each of these projects and the coarse metadata associated with it will be stored in a `Collection` within the project directory in a file called `collection.json`. This `Collection` will tell us about the project itself with key information like the date ranges for its creation and the bounding box for the data within it.

Each `Collection` will have a list of links to the individual STAC `Items`, which are the lowest level we'll get in this structure. Here, an `Item` references a specific `laz` file and accompanying `xml` document. Using the metadata from the WESM file, as well as information gathered from a `PDAL info` that's run over the pointcloud, we create a much more granular metadata file, with much more accurate bounding box information and information about the pointcloud itself.

The directory structure will look like this:

```
./
- catalog.json
- AK_ANCHORAGE_2015
- collection.json
- USGS_LPC_Anchorage_Lidar_63236780
- USGS_LPC_Anchorage_Lidar_63236780.json
- USGS_LPC_Anchorage_Lidar_63716807
- USGS_LPC_Anchorage_Lidar_63716807.json
...
- USGS_LPC_Anchorage_Lidar_63716806
- USGS_LPC_Anchorage_Lidar_63716806.json
```

#### Usage
In order to create this structure, we'll use 2 different python modules: `stacem_create` and `stacem_finalize`.

##### stacem_create
The `Create` module will build our structure from the ground up, starting with leaf nodes. `Create` will iterate through each project and process all of the pointclouds that are found in the `lpc` link key from the WESM JSON object. It will pluck out the necessary information from the `PDAL info` call, including stats about the point schema, the coordinate systems, and bounding box. If something goes wrong with this part, we will write out any errors to the `errors` key in the STAC `Item` and continue to publishing it. This will allow us to track any problems with pointcloud files much easier.

Once each `Item` has been created for a specific project, a `Collection` will be made using those `Items` as children, and a `collection.json` file will be written to the local project directory.

##### stacem_finalize
The `Finalize` command will troll through the directory structure that we created in the `Create` step, and will collect all of the `Collections` into a list, to be added as children to our overarching `Catalog`. This `Catalog` will then be written out to the base directory as a `catalog.json` file.

##### Updating
This project was created with the goal of continuously updating one structure. On the schedule selected by the user, this module can be rerun over a previously made structure, updating any key-value pairs that have been added, as well as looking at `ETag` values of pointcloud files to determine whether or not a new run of `pdal info` is needed.

If there is nothing new being added, and no reprocessessing occurs, then the `Create` module will effectily be copying any data from s3 to your local machine.

##### Pushing to S3
Once you've created your local structure, all you need to do is perform something akin to a `s3 sync` call. `s3 sync` is a good function to have on deck, because it will not perform any unnecessary upload operations.
5 changes: 4 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ dependencies:
- dask
- pyproj
- pystac
- boto3
- boto3
- pip:
- requests
- pystac[validation]
10 changes: 5 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[project]
name = "usgs_stac"
name = "stacem"
requires-python = ">=3.9"
description = "Create STAC from USGS WESM JSON url."
readme = "README.md"
Expand All @@ -10,8 +10,8 @@ dependencies = [ ]
dynamic = ["version"]

[project.urls]
homepage = "https://github.com/hobuinc/silvimetric"
repository = "https://github.com/hobuinc/silvimetric"
homepage = "https://github.com/hobuinc/stacem"
repository = "https://github.com/hobuinc/stacem"

[tool.setuptools]
package-dir = {"" = "src"}
Expand All @@ -25,8 +25,8 @@ requires = ["setuptools>=64.0"]
build-backend = "setuptools.build_meta"

[project.scripts]
silvimetric = "silvimetric.cli.cli:cli"

'stacem_create' = "usgs_stac.create:create_cmd"
'stacem_finalize' = "usgs_stac.finalize:finalize_cmd"

[tool.pytest.ini_options]
testpaths = [ "tests" ]
11 changes: 0 additions & 11 deletions src/usgs_stac/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,6 @@ def save_local(self):
"""
p = os.path.join(self.dst, "catalog.json")

## Find the collections available in this dst directory
_, dirs, _ = next(os.walk(self.dst))
for d in dirs:
newp = os.path.join(self.dst, d)
_, _, files = next(os.walk(newp))
if 'collection.json' in files:
col_href = urljoin(self.href,
os.path.join(d, 'collection.json'))
link = pystac.Link(rel='child', target=col_href,
media_type='application/json', title=d)
self.catalog.add_link(link)


self.catalog.save_object(True, p)
Expand Down
52 changes: 52 additions & 0 deletions src/usgs_stac/create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import os
from dask.distributed import Client

from .catalog import MetaCatalog
from .metadata_common import (
logger,
DEFAULT_LOCAL_DST,
DEFAULT_S3_BUCKET,
DEFAULT_S3_PATH,
DEFAULT_WESM_URL
)

def create():
"""
Create command. Creates STAC from scratch or from previously created
STAC structure. If it has already been made before, then this command
will update Items to reflect any new information that has been added.
"""
client = Client()

import argparse
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument("--wesm_url", type=str, help="Url for WESM JSON",
default=DEFAULT_WESM_URL)
parser.add_argument("--local_dst", type=str, help="Destination directory",
default=DEFAULT_LOCAL_DST)
parser.add_argument("--s3_bucket", type=str, help="S3 destination bucket",
default=DEFAULT_S3_BUCKET)
parser.add_argument("--s3_path", type=str, help="S3 path within destination bucket",
default=DEFAULT_S3_PATH)
parser.add_argument("--update", type=bool, help="Update flag. If set, will "
"update JSON objects that already exist with new values.", default=True)

try:
args = parser.parse_args()
s3_dst = f"https://{args.s3_bucket}.s3.amazonaws.com/{args.s3_path}"

logger.info(f"WESM url: {args.wesm_url}")
logger.info(f"Local dest: {args.local_dst}")
logger.info(f"Remote dest: {s3_dst}")
logger.info(f"Updating: {args.update}")

m = MetaCatalog(args.wesm_url, args.local_dst, s3_dst, args.update)
logger.info(f"Dask Dashboard Link: {client.cluster.dashboard_link}")
m.set_children()
except Exception as e:
logger.error(e.args)
parser.print_help()
exit(1)

if __name__ == '__main__':
create()
111 changes: 111 additions & 0 deletions src/usgs_stac/finalize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import os

from urllib.parse import urljoin

import boto3
import pystac
import json
import dask

from dask.diagnostics import ProgressBar

from .metadata_common import logger, DEFAULT_LOCAL_DST

s3 = boto3.client('s3')

@dask.delayed
def process_one_s3(bucket, path):
obj = s3.get_object(Bucket=bucket, Key=path)
col_dict = json.loads(obj['Body'].read())
c = pystac.Collection.from_dict(col_dict)
return c

def s3_collections(bucket, prefix):
count = 0
pager = s3.get_paginator('list_objects_v2')
it = pager.paginate(Bucket=bucket, Prefix=prefix)
objs = it.search("Contents[?contains(Key, `collection.json`)][]")
f_list = []

for i in objs:
path = i['Key']
print(f"Found collection {path}, {count}")
f_list.append(process_one_s3(bucket, path))
count = count + 1

return f_list

def local_collections(dst):
## Find the collections available in this dst directory
cols_list = []
_, dirs, _ = next(os.walk(dst))
for d in dirs:
newp = os.path.join(dst, d)
_, _, files = next(os.walk(newp))
if 'collection.json' in files:
cols_list.append(pystac.Collection.from_file(
os.path.join(newp, 'collection.json')))

return cols_list

def finalize():
"""
This will create a STAC Catalog that references each of the child Collections
present in the source directory that is passed in. By default this will look
for the local directory, but if an s3 bucket and path is passed in, then it
will use that instead.
"""
import argparse
parser = argparse.ArgumentParser(add_help=True)
parser.add_argument("--local_path", type=str, help="Finalize will use this "
"argument for location by default if no s3 bucket is supplied.",
default=DEFAULT_LOCAL_DST)
parser.add_argument("--s3_bucket", type=str, help="S3 destination bucket. "
"Finalize will use this argument if it's supplied.")
parser.add_argument("--s3_path", type=str,
help="S3 path within destination bucket", default='/')
parser.add_argument("--out_dst", type=str, help="Catalog destination path.",
default="./catalog.json")

args = parser.parse_args()

logger.info(f"Local dest: {args.local_path}")
logger.info(f"S3 Bucket: {args.s3_bucket}")
logger.info(f"S3 Path: {args.s3_path}")
logger.info(f"Output path: {args.out_dst}")

try:
cat = pystac.Catalog(id='WESM Catalog',
description='Catalog representing WESM metadata and associated'
' point cloud files.')
if args.s3_bucket is not None:
future_cols = s3_collections(args.s3_bucket, args.s3_path)

with ProgressBar():
cols = dask.compute(*future_cols)

else:
if not os.path.isdir(args.local_path):
raise ValueError("Invalid '--local_path'.")
cols = local_collections(args.local_path)

if not cols:
raise Exception("No collections found.")

# find catalog path from first collection in array
c1 = cols[0]
c1:pystac.Collection
catalog_path = c1.get_root_link().href
cat.add_children(cols)
cat.set_self_href(catalog_path)
cat.save_object(args.out_dst)

logger.info("Done! Catalog saved to {args.out_dst}.")

except Exception as e:
logger.error(e.args)
parser.print_help()
exit(1)

if __name__ == '__main__':
finalize()
15 changes: 11 additions & 4 deletions src/usgs_stac/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,13 @@ def process(self):
return self.link
except Exception as e:
logger.error(f"{self.id} failed with error {e.args}")
return None
try:
self.errors.append(str(e.args))
self.from_metadata()
self.save_local()
return self.link
except:
return None

def save_local(self):
if self.item is not None:
Expand Down Expand Up @@ -98,6 +104,8 @@ def match_etag(self, item) -> bool:
# if pc_path fails, default to previous item
self.etag = prev_etag

if prev_etag is None:
return False
if self.etag is None:
return False
elif prev_etag is not None and prev_etag == self.etag:
Expand All @@ -121,10 +129,9 @@ def get_properties(self):
"onemeter_reason": self.meta.onemeter_reason,
"lpc_category": self.meta.lpc_category,
"lpc_reason": self.meta.lpc_reason,
"ql": self.meta.ql
"ql": self.meta.ql,
"errors": self.errors
}
if self.errors:
properties["errors"] = self.errors
return properties

def add_assets(self, item):
Expand Down
38 changes: 0 additions & 38 deletions src/usgs_stac/main.py

This file was deleted.

5 changes: 5 additions & 0 deletions src/usgs_stac/metadata_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
from html.parser import HTMLParser
from requests.adapters import HTTPAdapter, Retry

DEFAULT_WESM_URL = 'https://apps.nationalmap.gov/lidar-explorer/lidar_ndx.json'
DEFAULT_S3_BUCKET = 'usgs-lidar-public'
DEFAULT_S3_PATH = 'wesm_stac'
DEFAULT_LOCAL_DST = './wesm_stac'

logger = logging.getLogger('wesm_stac')
ch = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
Expand Down
Loading

0 comments on commit f9d921c

Please sign in to comment.