Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregation Extension #684

Merged
merged 23 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased] - TBD

### Added

* Add base support for the Aggregation extension [#684](https://github.com/stac-utils/stac-fastapi/pull/684)

### Changed

* moved `AsyncBaseFiltersClient` and `BaseFiltersClient` classes in `stac_fastapi.extensions.core.filter.client` submodule ([#704](https://github.com/stac-utils/stac-fastapi/pull/704))
Expand Down
1 change: 1 addition & 0 deletions stac_fastapi/api/stac_fastapi/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ApiExtensions(enum.Enum):
query = "query"
sort = "sort"
transaction = "transaction"
aggregation = "aggregation"


class AddOns(enum.Enum):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""stac_api.extensions.core module."""

from .aggregation import AggregationExtension
from .context import ContextExtension
from .fields import FieldsExtension
from .filter import FilterExtension
Expand All @@ -9,6 +10,7 @@
from .transaction import TransactionExtension

__all__ = (
"AggregationExtension",
"ContextExtension",
"FieldsExtension",
"FilterExtension",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Aggregation extension module."""

from .aggregation import AggregationExtension

__all__ = ["AggregationExtension"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Aggregation Extension."""
from enum import Enum
from typing import List, Union

import attr
from fastapi import APIRouter, FastAPI

from stac_fastapi.api.models import CollectionUri, EmptyRequest
from stac_fastapi.api.routes import create_async_endpoint
from stac_fastapi.types.extension import ApiExtension

from .client import AsyncBaseAggregationClient, BaseAggregationClient
from .request import AggregationExtensionGetRequest, AggregationExtensionPostRequest


class AggregationConformanceClasses(str, Enum):
"""Conformance classes for the Aggregation extension.

See
https://github.com/stac-api-extensions/aggregation
"""

AGGREGATION = "https://api.stacspec.org/v0.3.0/aggregation"


@attr.s
class AggregationExtension(ApiExtension):
"""Aggregation Extension.

The purpose of the Aggregation Extension is to provide an endpoint similar to
the Search endpoint (/search), but which will provide aggregated information
on matching Items rather than the Items themselves. This is highly influenced
by the Elasticsearch and OpenSearch aggregation endpoint, but with a more
regular structure for responses.

The Aggregation extension adds several endpoints which allow the retrieval of
available aggregation fields and aggregation buckets based on a seearch query:
GET /aggregations
POST /aggregations
GET /collections/{collection_id}/aggregations
POST /collections/{collection_id}/aggregations
GET /aggregate
POST /aggregate
GET /collections/{collection_id}/aggregate
POST /collections/{collection_id}/aggregate

https://github.com/stac-api-extensions/aggregation/blob/main/README.md

Attributes:
conformance_classes: Conformance classes provided by the extension
"""

GET = AggregationExtensionGetRequest
POST = AggregationExtensionPostRequest

client: Union[AsyncBaseAggregationClient, BaseAggregationClient] = attr.ib(
factory=BaseAggregationClient
)

conformance_classes: List[str] = attr.ib(
default=[AggregationConformanceClasses.AGGREGATION]
)
router: APIRouter = attr.ib(factory=APIRouter)

def register(self, app: FastAPI) -> None:
"""Register the extension with a FastAPI application.

Args:
app: target FastAPI application.

Returns:
None
"""
self.router.prefix = app.state.router_prefix
self.router.add_api_route(
name="Aggregations",
path="/aggregations",
methods=["GET", "POST"],
endpoint=create_async_endpoint(self.client.get_aggregations, EmptyRequest),
)
self.router.add_api_route(
name="Collection Aggregations",
path="/collections/{collection_id}/aggregations",
methods=["GET", "POST"],
endpoint=create_async_endpoint(self.client.get_aggregations, CollectionUri),
)
self.router.add_api_route(
name="Aggregate",
path="/aggregate",
methods=["GET"],
endpoint=create_async_endpoint(self.client.aggregate, self.GET),
)
self.router.add_api_route(
name="Aggregate",
path="/aggregate",
methods=["POST"],
endpoint=create_async_endpoint(self.client.aggregate, self.POST),
)
self.router.add_api_route(
name="Collection Aggregate",
path="/collections/{collection_id}/aggregate",
methods=["GET"],
endpoint=create_async_endpoint(self.client.aggregate, self.GET),
)
self.router.add_api_route(
name="Collection Aggregate",
path="/collections/{collection_id}/aggregate",
methods=["POST"],
endpoint=create_async_endpoint(self.client.aggregate, self.POST),
)
app.include_router(self.router, tags=["Aggregation Extension"])
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Aggregation extensions clients."""

import abc
from typing import List, Optional, Union

import attr
from geojson_pydantic.geometries import Geometry
from stac_pydantic.shared import BBox

from stac_fastapi.types.rfc3339 import DateTimeType

from .types import Aggregation, AggregationCollection


@attr.s
class BaseAggregationClient(abc.ABC):
"""Defines a pattern for implementing the STAC aggregation extension."""

# BUCKET = Bucket
# AGGREGAION = Aggregation
# AGGREGATION_COLLECTION = AggregationCollection

def get_aggregations(
self, collection_id: Optional[str] = None, **kwargs
) -> AggregationCollection:
"""Get the aggregations available for the given collection_id.

If collection_id is None, returns the available aggregations over all
collections.
"""
return AggregationCollection(
type="AggregationCollection",
aggregations=[Aggregation(name="total_count", data_type="integer")],
links=[
{
"rel": "root",
"type": "application/json",
"href": "https://example.org/",
},
{
"rel": "self",
"type": "application/json",
"href": "https://example.org/aggregations",
},
],
)

def aggregate(
self, collection_id: Optional[str] = None, **kwargs
) -> AggregationCollection:
"""Return the aggregation buckets for a given search result"""
return AggregationCollection(
type="AggregationCollection",
aggregations=[],
links=[
{
"rel": "root",
"type": "application/json",
"href": "https://example.org/",
},
{
"rel": "self",
"type": "application/json",
"href": "https://example.org/aggregations",
},
],
)


@attr.s
class AsyncBaseAggregationClient(abc.ABC):
"""Defines an async pattern for implementing the STAC aggregation extension."""

# BUCKET = Bucket
# AGGREGAION = Aggregation
# AGGREGATION_COLLECTION = AggregationCollection

async def get_aggregations(
self, collection_id: Optional[str] = None, **kwargs
) -> AggregationCollection:
"""Get the aggregations available for the given collection_id.

If collection_id is None, returns the available aggregations over all
collections.
"""
return AggregationCollection(
type="AggregationCollection",
aggregations=[Aggregation(name="total_count", data_type="integer")],
links=[
{
"rel": "root",
"type": "application/json",
"href": "https://example.org/",
},
{
"rel": "self",
"type": "application/json",
"href": "https://example.org/aggregations",
},
],
)

async def aggregate(
self,
collection_id: Optional[str] = None,
aggregations: Optional[Union[str, List[str]]] = None,
collections: Optional[List[str]] = None,
ids: Optional[List[str]] = None,
bbox: Optional[BBox] = None,
intersects: Optional[Geometry] = None,
datetime: Optional[DateTimeType] = None,
limit: Optional[int] = 10,
**kwargs,
) -> AggregationCollection:
"""Return the aggregation buckets for a given search result"""
return AggregationCollection(
type="AggregationCollection",
aggregations=[],
links=[
{
"rel": "root",
"type": "application/json",
"href": "https://example.org/",
},
{
"rel": "self",
"type": "application/json",
"href": "https://example.org/aggregations",
},
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Request model for the Aggregation extension."""

from typing import List, Optional, Union

import attr

from stac_fastapi.extensions.core.filter.request import (
FilterExtensionGetRequest,
FilterExtensionPostRequest,
)
from stac_fastapi.types.search import BaseSearchGetRequest, BaseSearchPostRequest


@attr.s
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can filter be included in this? I'm not sure how the extensions interact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I have tested with filter included on an implementation and it works great.

I am also not clear on how extension interact. For instance, could collections, datetime, bbox, and intersects be excluded here since they are part of the core search? I'm inclined to keep so the request class correlates directly to the extension spec.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i think you should extend BaseSearchGetRequest and BaseSearchPostRequest and just add the aggregations param. You also pickup limit that's ignored, but that's find for a datamodel like this since there's a lot less duplication

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed it to be build off base Search and Filter. This makes the implementation dependent on the Filter extension. But I don't think that is an issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jamesfisher-gis sorry I'm just realizing this now but I don't think we should do this, at least I don't see in the aggregate extension why the filter attributes should be enabled with the aggregations ones.

I think it will be up to the implementers to add the filter extension and also handle them in the client.

Copy link
Contributor Author

@jamesfisher-geo jamesfisher-geo Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey. That makes sense.

I have a couple other small bug fixes for aggregation. I will submit a PR today that removes the Filter extension dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe wait because I will submit a PR that takes care of #713 soon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jamesfisher-gis in fact, go ahead because my PR is a no-go in fact 😓

class AggregationExtensionGetRequest(BaseSearchGetRequest, FilterExtensionGetRequest):
"""Aggregation Extension GET request model."""

aggregations: Optional[str] = attr.ib(default=None)


class AggregationExtensionPostRequest(BaseSearchPostRequest, FilterExtensionPostRequest):
"""Aggregation Extension POST request model."""

aggregations: Optional[Union[str, List[str]]] = attr.ib(default=None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Aggregation Extension types."""

from typing import Any, Dict, List, Literal, Optional, Union

from pydantic import Field
from typing_extensions import TypedDict

from stac_fastapi.types.rfc3339 import DateTimeType


class Bucket(TypedDict, total=False):
"""A STAC aggregation bucket."""

key: str
data_type: str
frequency: Optional[Dict] = None
_from: Optional[Union[int, float]] = Field(alias="from", default=None)
to: Optional[Optional[Union[int, float]]] = None


class Aggregation(TypedDict, total=False):
"""A STAC aggregation."""

name: str
data_type: str
buckets: Optional[List[Bucket]] = None
overflow: Optional[int] = None
value: Optional[Union[str, int, DateTimeType]] = None


class AggregationCollection(TypedDict, total=False):
"""STAC Item Aggregation Collection."""

type: Literal["AggregationCollection"]
aggregations: List[Aggregation]
links: List[Dict[str, Any]]
Loading