Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest/athena): Replace s3_staging_dir parameter in Athena source with query_result_location #7044

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/sql/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pyathena.model import AthenaTableMetadata
from sqlalchemy.engine.reflection import Inspector

from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.emitter.mcp_builder import DatabaseKey, gen_containers
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -50,17 +51,34 @@ class AthenaConfig(SQLAlchemyConfig):
default=3600,
description="Duration to assume the AWS Role for. Maximum of 43200 (12 hours)",
)
s3_staging_dir: str = pydantic.Field(
description="Staging s3 location where the Athena query results will be stored"
s3_staging_dir: Optional[str] = pydantic.Field(
default=None,
deprecated=True,
description="[deprecated in favor of `query_result_location`] S3 query location",
)
work_group: str = pydantic.Field(
description="The name of your Amazon Athena Workgroups"
)
catalog_name: str = pydantic.Field(
default="awsdatacatalog", description="Athena Catalog Name"
default="awsdatacatalog",
description="Athena Catalog Name",
)

query_result_location: str = pydantic.Field(
description="S3 path to the [query result bucket](https://docs.aws.amazon.com/athena/latest/ug/querying.html#query-results-specify-location) which should be used by AWS Athena to store results of the"
"queries executed by DataHub."
)

include_views = False # not supported for Athena
# overwrite default behavior of SQLAlchemyConfing
include_views: Optional[bool] = pydantic.Field(
default=False, description="Whether views should be ingested."
)

_s3_staging_dir_population = pydantic_renamed_field(
old_name="s3_staging_dir",
new_name="query_result_location",
print_warning=True,
)

def get_sql_alchemy_url(self):
return make_sqlalchemy_uri(
Expand All @@ -70,7 +88,8 @@ def get_sql_alchemy_url(self):
f"athena.{self.aws_region}.amazonaws.com:443",
self.database,
uri_opts={
"s3_staging_dir": self.s3_staging_dir,
# as an URI option `s3_staging_dir` is still used due to PyAthena
"s3_staging_dir": self.query_result_location,
"work_group": self.work_group,
"catalog_name": self.catalog_name,
"role_arn": self.aws_role_arn,
Expand Down
42 changes: 40 additions & 2 deletions metadata-ingestion/tests/unit/test_athena_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,22 @@


@pytest.mark.integration
def test_athena_uri():
def test_athena_config_query_location_old_plus_new_value_not_allowed():
from datahub.ingestion.source.sql.athena import AthenaConfig

with pytest.raises(ValueError):
AthenaConfig.parse_obj(
{
"aws_region": "us-west-1",
"s3_staging_dir": "s3://sample-staging-dir/",
"query_result_location": "s3://query_result_location",
"work_group": "test-workgroup",
}
)


@pytest.mark.integration
def test_athena_config_staging_dir_is_set_as_query_result():
from datahub.ingestion.source.sql.athena import AthenaConfig

config = AthenaConfig.parse_obj(
Expand All @@ -21,9 +36,32 @@ def test_athena_uri():
"work_group": "test-workgroup",
}
)

expected_config = AthenaConfig.parse_obj(
{
"aws_region": "us-west-1",
"query_result_location": "s3://sample-staging-dir/",
"work_group": "test-workgroup",
}
)

assert config.json() == expected_config.json()


@pytest.mark.integration
def test_athena_uri():
from datahub.ingestion.source.sql.athena import AthenaConfig

config = AthenaConfig.parse_obj(
{
"aws_region": "us-west-1",
"query_result_location": "s3://query-result-location/",
"work_group": "test-workgroup",
}
)
assert (
config.get_sql_alchemy_url()
== "awsathena+rest://@athena.us-west-1.amazonaws.com:443/?s3_staging_dir=s3%3A%2F%2Fsample-staging-dir%2F&work_group=test-workgroup&catalog_name=awsdatacatalog&duration_seconds=3600"
== "awsathena+rest://@athena.us-west-1.amazonaws.com:443/?s3_staging_dir=s3%3A%2F%2Fquery-result-location%2F&work_group=test-workgroup&catalog_name=awsdatacatalog&duration_seconds=3600"
)


Expand Down