Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull Request: TypeHinting and Readability #3

Merged
merged 9 commits into from
Apr 29, 2023
45 changes: 30 additions & 15 deletions xpublish_opendap/dap_xarray.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Convert xarray.Datasets to OpenDAP datasets."""
import logging
from typing import (
Any,
)

import numpy as np
import opendap_protocol as dap
import xarray as xr

logger = logging.getLogger("api")
logger: logging.Logger = logging.getLogger("api")

dtype_dap = {
np.ubyte: dap.Byte,
Expand All @@ -16,16 +19,18 @@
np.float32: dap.Float32,
np.float64: dap.Float64,
np.str_: dap.String,
# Not a direct mapping
np.int64: dap.Float64,
np.int64: dap.Float64, # not a direct mapping
}
dtype_dap = {np.dtype(k): v for k, v in dtype_dap.items()}
dap_dtypes_dict: dict[np.dtype, dap.DAPAtom] = {
np.dtype(k): v for k, v in dtype_dap.items()
}
del dtype_dap


def dap_dtype(da: xr.DataArray):
"""Return a DAP type for the xr.DataArray."""
try:
return dtype_dap[da.dtype]
return dap_dtypes_dict[da.dtype]
except KeyError as e:
logger.warning(
f"Unable to match dtype for {da.name}. "
Expand All @@ -34,21 +39,31 @@ def dap_dtype(da: xr.DataArray):
return dap.String


def dap_attribute(key: str, value):
def dap_attribute(key: str, value: Any) -> dap.Attribute:
"""Create a DAP attribute."""
if isinstance(value, int):
dtype = dap.Int32
elif isinstance(value, float):
dtype = dap.Float64
else:
dtype = dap.String
return dap.Attribute(name=key, value=value, dtype=dtype)

return dap.Attribute(
name=key,
value=value,
dtype=dtype,
)
xaviernogueira marked this conversation as resolved.
Show resolved Hide resolved


def dap_dimension(da: xr.DataArray) -> dap.Array:
"""Transform an xarray dimension into a DAP dimension."""
encoded_da = xr.conventions.encode_cf_variable(da.variable)
dim = dap.Array(name=da.name, data=encoded_da.values, dtype=dap_dtype(encoded_da))
encoded_da: xr.DataArray = xr.conventions.encode_cf_variable(da.variable)

dim = dap.Array(
name=da.name,
data=encoded_da.values,
dtype=dap_dtype(encoded_da),
)

for key, value in encoded_da.attrs.items():
dim.append(dap_attribute(key, value))
Expand All @@ -58,32 +73,32 @@ def dap_dimension(da: xr.DataArray) -> dap.Array:

def dap_grid(da: xr.DataArray, dims: dict[str, dap.Array]) -> dap.Grid:
"""Transform an xarray DataArray into a DAP Grid."""
data_array = dap.Grid(
data_grid = dap.Grid(
name=da.name,
data=da.astype(da.encoding["dtype"]).data,
dtype=dap_dtype(da),
dimensions=[dims[dim] for dim in da.dims],
)

for key, value in da.attrs.items():
data_array.append(dap_attribute(key, value))
data_grid.append(dap_attribute(key, value))

return data_array
return data_grid


def dap_dataset(ds: xr.Dataset, name: str) -> dap.Dataset:
"""Create a DAP Dataset for an xarray Dataset."""
dataset = dap.Dataset(name=name)

dims = {}
dims: dict[str, dap.Array] = {}
for dim in ds.dims:
dims[dim] = dap_dimension(ds[dim])

dataset.append(*dims.values())

for var in ds.data_vars:
data_array = dap_grid(ds[var], dims)
dataset.append(data_array)
data_grid = dap_grid(ds[var], dims)
dataset.append(data_grid)

for key, value in ds.attrs.items():
dataset.append(dap_attribute(key, value))
Expand Down
39 changes: 24 additions & 15 deletions xpublish_opendap/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,34 @@
import cachey
import opendap_protocol as dap
import xarray as xr
from fastapi import APIRouter, Depends, Request
from fastapi import (
APIRouter,
Depends,
Request,
)
from fastapi.responses import StreamingResponse
from xpublish import Dependencies, Plugin, hookimpl
from xpublish import (
Dependencies,
Plugin,
hookimpl,
)

from .dap_xarray import dap_dataset
from xpublish_opendap import dap_xarray

logger = logging.getLogger("uvicorn")
logger: logging.Logger = logging.getLogger("uvicorn")


class OpenDapPlugin(Plugin):
"""Provide OpenDAP endpoints for Xpublish datasets."""
"""OpenDAP plugin for xpublish."""

name = "opendap"
name: str = "opendap"

dataset_router_prefix = "/opendap"
dataset_router_prefix: str = "/opendap"
dataset_router_tags: list[str] = ["opendap"]

@hookimpl
def dataset_router(self, deps: Dependencies):
"""Provide OpenDAP dataset router."""
def dataset_router(self, deps: Dependencies) -> APIRouter:
"""Create an OpenDAP router for xpublish."""
router = APIRouter(
prefix=self.dataset_router_prefix,
tags=self.dataset_router_tags,
Expand All @@ -37,13 +45,15 @@ def get_dap_dataset(
dataset_id: str,
ds: xr.Dataset = Depends(deps.dataset),
cache: cachey.Cache = Depends(deps.cache),
):
) -> dap.Dataset:
"""Get a dataset that has been translated to opendap."""
# get cached dataset if it exists
cache_key = f"opendap_dataset_{dataset_id}"
dataset = cache.get(cache_key)

# if not, convert the xarray dataset to opendap
if dataset is None:
dataset = dap_dataset(ds, dataset_id)
dataset = dap_xarray.dap_dataset(ds, dataset_id)

cache.put(cache_key, dataset, 99999)

Expand All @@ -52,14 +62,13 @@ def get_dap_dataset(
def dap_constraint(request: Request) -> str:
"""Parse DAP constraints from request."""
constraint = parse.unquote(request.url.components[3])

return constraint

@router.get(".dds")
def dds_response(
constraint=Depends(dap_constraint),
dataset: dap.Dataset = Depends(get_dap_dataset),
):
) -> StreamingResponse:
"""OpenDAP DDS response (types and dimension metadata)."""
return StreamingResponse(
dataset.dds(constraint=constraint),
Expand All @@ -70,7 +79,7 @@ def dds_response(
def das_response(
constraint=Depends(dap_constraint),
dataset: dap.Dataset = Depends(get_dap_dataset),
):
) -> StreamingResponse:
"""OpenDAP DAS response (attribute metadata)."""
return StreamingResponse(
dataset.das(constraint=constraint),
Expand All @@ -81,7 +90,7 @@ def das_response(
def dods_response(
constraint=Depends(dap_constraint),
dataset: dap.Dataset = Depends(get_dap_dataset),
):
) -> StreamingResponse:
"""OpenDAP dods response (data access)."""
return StreamingResponse(
dataset.dods(constraint=constraint),
Expand Down