Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP 84: Migrate GET ASSET EVENTS legacy API to fast API #43881

Merged
merged 44 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
cf5218c
AIP-84: Migrating GET Assets to fastAPI
amoghrajesh Nov 6, 2024
5a49280
matching response to legacy
amoghrajesh Nov 6, 2024
962572b
Adding unit tests - part 1
amoghrajesh Nov 8, 2024
428cb6c
Update airflow/api_fastapi/common/parameters.py
amoghrajesh Nov 8, 2024
a78d3cb
fixing the dag_ids filter
amoghrajesh Nov 8, 2024
882d20c
fixing the dag_ids filter
amoghrajesh Nov 8, 2024
25bb08e
Adding unit tests - part 2
amoghrajesh Nov 8, 2024
658479d
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 8, 2024
fa0cd23
fixing unit tests & updating parameter type
amoghrajesh Nov 8, 2024
dd791c2
review comments pierre
amoghrajesh Nov 8, 2024
06fa0a7
fixing last commit
amoghrajesh Nov 8, 2024
3bd803b
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 8, 2024
fc29d7d
Merge branch 'main' into AIP84-get-asset-to-fastapi
amoghrajesh Nov 8, 2024
7a97220
fixing unit tests
amoghrajesh Nov 9, 2024
8b6b09e
migrating get assets events endpoint to fastapi
vatsrahul1001 Nov 11, 2024
b7aaa9d
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 11, 2024
b808895
fixing test response
vatsrahul1001 Nov 11, 2024
1e18c0f
Merge branch 'AIP84-get-asset-events' of github.com:astronomer/airflo…
vatsrahul1001 Nov 11, 2024
52939e5
Adding tests for filtering
vatsrahul1001 Nov 12, 2024
08198f9
Merge branch 'main' of github.com:astronomer/airflow into AIP84-get-a…
vatsrahul1001 Nov 12, 2024
bf68954
Merge branch 'AIP84-get-asset-to-fastapi' of github.com:astronomer/ai…
vatsrahul1001 Nov 12, 2024
674aff9
pushing changes from AIP84-get-asset-to-fastapi
vatsrahul1001 Nov 12, 2024
0fac5ed
pushing changes from AIP84-get-asset-to-fastapi
vatsrahul1001 Nov 12, 2024
991edfd
resoving conflicts with main
vatsrahul1001 Nov 12, 2024
f30956a
resoving conflicts with main
vatsrahul1001 Nov 12, 2024
255dc81
address review comments
vatsrahul1001 Nov 13, 2024
35ed49c
resoving conflicts with main
vatsrahul1001 Nov 13, 2024
1b84e09
fixing test parametrize
vatsrahul1001 Nov 13, 2024
d9f3d2f
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 13, 2024
60b1f2d
updating sort params
vatsrahul1001 Nov 13, 2024
740cde0
Merge branch 'AIP84-get-asset-events' of github.com:astronomer/airflo…
vatsrahul1001 Nov 13, 2024
297d53a
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 13, 2024
e51fee1
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 13, 2024
4870d88
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 13, 2024
7eba6d4
address review comments
vatsrahul1001 Nov 13, 2024
4c1f154
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 13, 2024
a763033
address review comments
vatsrahul1001 Nov 13, 2024
ac74c18
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 13, 2024
60cf12c
getting main latest
vatsrahul1001 Nov 14, 2024
cdc0b1d
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 14, 2024
78a2c7e
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 14, 2024
18124fa
removing http 401 and 403 as its now added in root router in #43932
vatsrahul1001 Nov 14, 2024
61612ee
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 14, 2024
360cde5
Merge branch 'main' into AIP84-get-asset-events
vatsrahul1001 Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def get_assets(
return asset_collection_schema.dump(AssetCollection(assets=assets, total_entries=total_entries))


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@provide_session
@format_parameters({"limit": check_limit})
Expand Down
97 changes: 96 additions & 1 deletion airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from airflow.api_connexion.endpoints.task_instance_endpoint import _convert_ti_states
from airflow.models import Base, Connection
from airflow.models.asset import AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.asset import AssetEvent, AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.dag import DagModel, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
Expand Down Expand Up @@ -440,6 +440,86 @@ def to_orm(self, select: Select) -> Select:
)


class _AssetIdFilter(BaseParam[int]):
"""Filter on asset_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, asset_id: int | None = None) -> _AssetIdFilter:
return self.set_value(asset_id)


class _SourceDagIdFilter(BaseParam[str]):
"""Filter on source_dag_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_dag_id: str | None = None) -> _SourceDagIdFilter:
return self.set_value(source_dag_id)


class _SourceTaskIdFilter(BaseParam[str]):
"""Filter on source_task_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_task_id: str | None = None) -> _SourceTaskIdFilter:
return self.set_value(source_task_id)


class _SourceRunIdFilter(BaseParam[str]):
"""filter on source_run_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_run_id: str | None = None) -> _SourceRunIdFilter:
return self.set_value(source_run_id)


class _SourceMapIndexFilter(BaseParam[int]):
"""Filter on source_map_index."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_map_index: int | None = None) -> _SourceMapIndexFilter:
return self.set_value(source_map_index)


class Range(BaseModel, Generic[T]):
"""Range with a lower and upper bound."""

Expand Down Expand Up @@ -537,3 +617,18 @@ def depends_float(
QueryAssetDagIdPatternSearch = Annotated[
_DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
]
QueryAssetIdFilter = Annotated[_AssetIdFilter, Depends(_AssetIdFilter(AssetEvent.asset_id).depends)]


QuerySourceDagIdFilter = Annotated[
_SourceDagIdFilter, Depends(_SourceDagIdFilter(AssetEvent.source_dag_id).depends)
]
QuerySourceTaskIdFilter = Annotated[
_SourceTaskIdFilter, Depends(_SourceTaskIdFilter(AssetEvent.source_task_id).depends)
]
QuerySourceRunIdFilter = Annotated[
_SourceRunIdFilter, Depends(_SourceRunIdFilter(AssetEvent.source_run_id).depends)
]
QuerySourceMapIndexFilter = Annotated[
_SourceMapIndexFilter, Depends(_SourceMapIndexFilter(AssetEvent.source_map_index).depends)
]
44 changes: 43 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from datetime import datetime

from pydantic import BaseModel
from pydantic import BaseModel, Field, model_validator


class DagScheduleAssetReference(BaseModel):
Expand Down Expand Up @@ -64,3 +64,45 @@ class AssetCollectionResponse(BaseModel):

assets: list[AssetResponse]
total_entries: int


class DagRunAssetReference(BaseModel):
"""Serializable version of the DagRunAssetReference ORM SqlAlchemyModel."""
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved

run_id: str
dag_id: str
execution_date: datetime = Field(alias="logical_date")
start_date: datetime
end_date: datetime
state: str
data_interval_start: datetime
data_interval_end: datetime


class AssetEventResponse(BaseModel):
"""Asset event serializer for responses."""

id: int
asset_id: int
asset_uri: str
extra: dict | None = None
source_task_id: str | None = None
source_dag_id: str | None = None
source_run_id: str | None = None
source_map_index: int
created_dagruns: list[DagRunAssetReference]
timestamp: datetime

@model_validator(mode="before")
def rename_uri_to_asset_uri(cls, values):
"""Rename 'uri' to 'asset_uri' during serialization to match legacy response."""
if hasattr(values, "uri") and values.uri:
values.asset_uri = values.uri
return values
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved


class AssetEventCollectionResponse(BaseModel):
"""Asset collection response."""
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved

asset_events: list[AssetEventResponse]
total_entries: int
Loading