Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): hide deprecated path_spec option from config #5944

Merged
merged 10 commits into from
Oct 4, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ class Config:

table_name: Optional[str] = Field(
default=None,
description="Display name of the dataset.Combination of named variableds from include path and strings",
description="Display name of the dataset.Combination of named variables from include path and strings",
)

enable_compression: bool = Field(
default=True,
description="Enable or disable processing compressed files. Currenly .gz and .bz files are supported.",
description="Enable or disable processing compressed files. Currently .gz and .bz files are supported.",
)

sample_files: bool = Field(
Expand Down
99 changes: 50 additions & 49 deletions metadata-ingestion/src/datahub/ingestion/source/s3/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
EnvBasedSourceConfigBase,
PlatformSourceConfigBase,
)
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig
from datahub.ingestion.source.aws.path_spec import PathSpec
from datahub.ingestion.source.aws.s3_util import get_bucket_name
Expand All @@ -20,18 +21,14 @@


class DataLakeSourceConfig(PlatformSourceConfigBase, EnvBasedSourceConfigBase):
path_specs: Optional[List[PathSpec]] = Field(
description="List of PathSpec. See below the details about PathSpec"
)
path_spec: Optional[PathSpec] = Field(
description="Path spec will be deprecated in favour of path_specs option."
path_specs: List[PathSpec] = Field(
description="List of PathSpec. See [below](#path-spec) the details about PathSpec"
)
platform: str = Field(
default="", description="The platform that this source connects to"
)
platform_instance: Optional[str] = Field(
default=None,
description="The instance of the platform that all assets produced by this recipe belong to",
# The platform field already exists, but we want to override the type/default/docs.
default="",
description="The platform that this source connects to (either 's3' or 'file'). "
"If not specified, the platform will be inferred from the path_specs.",
)
aws_config: Optional[AwsConnectionConfig] = Field(
default=None, description="AWS configuration"
Expand Down Expand Up @@ -64,51 +61,55 @@ class DataLakeSourceConfig(PlatformSourceConfigBase, EnvBasedSourceConfigBase):
description="Maximum number of rows to use when inferring schemas for TSV and CSV files.",
)

@pydantic.root_validator(pre=False)
def validate_platform(cls, values: Dict) -> Dict:
if not values.get("path_specs") and not values.get("path_spec"):
raise ValueError("Either path_specs or path_spec needs to be specified")
_rename_path_spec_to_plural = pydantic_renamed_field(
"path_spec", "path_specs", lambda path_spec: [path_spec]
)

if values.get("path_specs") and values.get("path_spec"):
@pydantic.validator("path_specs", always=True)
def check_path_specs_and_infer_platform(
cls, path_specs: List[PathSpec], values: Dict
) -> List[PathSpec]:
if len(path_specs) == 0:
raise ValueError("path_specs must not be empty")

# Check that all path specs have the same platform.
guessed_platforms = set(
"s3" if path_spec.is_s3 else "file" for path_spec in path_specs
)
if len(guessed_platforms) > 1:
raise ValueError(
"Either path_specs or path_spec needs to be specified but not both"
f"Cannot have multiple platforms in path_specs: {guessed_platforms}"
)
guessed_platform = guessed_platforms.pop()

if values.get("path_spec"):
logger.warning(
"path_spec config property is deprecated, please use path_specs instead of it."
# If platform is s3, check that they're all the same bucket.
if guessed_platform == "s3":
bucket_names = set(
get_bucket_name(path_spec.include) for path_spec in path_specs
)
if len(bucket_names) > 1:
raise ValueError(
f"All path_specs should reference the same s3 bucket. Got {bucket_names}"
)

# Ensure s3 configs aren't used for file sources.
if guessed_platform != "s3" and (
values.get("use_s3_object_tags") or values.get("use_s3_bucket_tags")
):
raise ValueError(
"Cannot grab s3 object/bucket tags when platform is not s3. Remove the flag or use s3."
)
values["path_specs"] = [values.get("path_spec")]

bucket_name: str = ""
for path_spec in values.get("path_specs", []):
if path_spec.is_s3:
platform = "s3"
else:
if values.get("use_s3_object_tags") or values.get("use_s3_bucket_tags"):
raise ValueError(
"cannot grab s3 tags for platform != s3. Remove the flag or use s3."
)

platform = "file"

if values.get("platform", ""):
if platform == "s3" and values["platform"] != platform:
raise ValueError("all path_spec should belong to the same platform")
else:
values["platform"] = platform
logger.debug(f'Setting config "platform": {values.get("platform")}')

if platform == "s3":
if bucket_name == "":
bucket_name = get_bucket_name(path_spec.include)
else:
if bucket_name != get_bucket_name(path_spec.include):
raise ValueError(
"all path_spec should reference the same s3 bucket"
)

return values
# Infer platform if not specified.
if values.get("platform") and values["platform"] != guessed_platform:
raise ValueError(
f"All path_specs belong to {guessed_platform} platform, but platform is set to {values['platform']}"
)
else:
logger.debug(f'Setting config "platform": {guessed_platform}')
values["platform"] = guessed_platform

return path_specs

@pydantic.root_validator()
def ensure_profiling_pattern_is_passed_to_profiling(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_delta_lake(pytestconfig, source_file, tmp_path, mock_time):
)


def test_data_lake_incorrect_config_raises_error(tmp_path, mock_time):
def test_delta_lake_incorrect_config_raises_error(tmp_path, mock_time):
config_dict = {}
config_dict["sink"] = {
"type": "file",
Expand Down
Loading