Skip to content

Commit

Permalink
fix(ingest): hide deprecated path_spec option from config (datahub-pr…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and david-leifker committed Oct 6, 2022
1 parent 5388808 commit 25d3931
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 1,130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ class Config:

table_name: Optional[str] = Field(
default=None,
description="Display name of the dataset.Combination of named variableds from include path and strings",
description="Display name of the dataset.Combination of named variables from include path and strings",
)

enable_compression: bool = Field(
default=True,
description="Enable or disable processing compressed files. Currenly .gz and .bz files are supported.",
description="Enable or disable processing compressed files. Currently .gz and .bz files are supported.",
)

sample_files: bool = Field(
Expand Down
99 changes: 50 additions & 49 deletions metadata-ingestion/src/datahub/ingestion/source/s3/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
EnvBasedSourceConfigBase,
PlatformSourceConfigBase,
)
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig
from datahub.ingestion.source.aws.path_spec import PathSpec
from datahub.ingestion.source.aws.s3_util import get_bucket_name
Expand All @@ -20,18 +21,14 @@


class DataLakeSourceConfig(PlatformSourceConfigBase, EnvBasedSourceConfigBase):
path_specs: Optional[List[PathSpec]] = Field(
description="List of PathSpec. See below the details about PathSpec"
)
path_spec: Optional[PathSpec] = Field(
description="Path spec will be deprecated in favour of path_specs option."
path_specs: List[PathSpec] = Field(
description="List of PathSpec. See [below](#path-spec) the details about PathSpec"
)
platform: str = Field(
default="", description="The platform that this source connects to"
)
platform_instance: Optional[str] = Field(
default=None,
description="The instance of the platform that all assets produced by this recipe belong to",
# The platform field already exists, but we want to override the type/default/docs.
default="",
description="The platform that this source connects to (either 's3' or 'file'). "
"If not specified, the platform will be inferred from the path_specs.",
)
aws_config: Optional[AwsConnectionConfig] = Field(
default=None, description="AWS configuration"
Expand Down Expand Up @@ -64,51 +61,55 @@ class DataLakeSourceConfig(PlatformSourceConfigBase, EnvBasedSourceConfigBase):
description="Maximum number of rows to use when inferring schemas for TSV and CSV files.",
)

@pydantic.root_validator(pre=False)
def validate_platform(cls, values: Dict) -> Dict:
if not values.get("path_specs") and not values.get("path_spec"):
raise ValueError("Either path_specs or path_spec needs to be specified")
_rename_path_spec_to_plural = pydantic_renamed_field(
"path_spec", "path_specs", lambda path_spec: [path_spec]
)

if values.get("path_specs") and values.get("path_spec"):
@pydantic.validator("path_specs", always=True)
def check_path_specs_and_infer_platform(
cls, path_specs: List[PathSpec], values: Dict
) -> List[PathSpec]:
if len(path_specs) == 0:
raise ValueError("path_specs must not be empty")

# Check that all path specs have the same platform.
guessed_platforms = set(
"s3" if path_spec.is_s3 else "file" for path_spec in path_specs
)
if len(guessed_platforms) > 1:
raise ValueError(
"Either path_specs or path_spec needs to be specified but not both"
f"Cannot have multiple platforms in path_specs: {guessed_platforms}"
)
guessed_platform = guessed_platforms.pop()

if values.get("path_spec"):
logger.warning(
"path_spec config property is deprecated, please use path_specs instead of it."
# If platform is s3, check that they're all the same bucket.
if guessed_platform == "s3":
bucket_names = set(
get_bucket_name(path_spec.include) for path_spec in path_specs
)
if len(bucket_names) > 1:
raise ValueError(
f"All path_specs should reference the same s3 bucket. Got {bucket_names}"
)

# Ensure s3 configs aren't used for file sources.
if guessed_platform != "s3" and (
values.get("use_s3_object_tags") or values.get("use_s3_bucket_tags")
):
raise ValueError(
"Cannot grab s3 object/bucket tags when platform is not s3. Remove the flag or use s3."
)
values["path_specs"] = [values.get("path_spec")]

bucket_name: str = ""
for path_spec in values.get("path_specs", []):
if path_spec.is_s3:
platform = "s3"
else:
if values.get("use_s3_object_tags") or values.get("use_s3_bucket_tags"):
raise ValueError(
"cannot grab s3 tags for platform != s3. Remove the flag or use s3."
)

platform = "file"

if values.get("platform", ""):
if platform == "s3" and values["platform"] != platform:
raise ValueError("all path_spec should belong to the same platform")
else:
values["platform"] = platform
logger.debug(f'Setting config "platform": {values.get("platform")}')

if platform == "s3":
if bucket_name == "":
bucket_name = get_bucket_name(path_spec.include)
else:
if bucket_name != get_bucket_name(path_spec.include):
raise ValueError(
"all path_spec should reference the same s3 bucket"
)

return values
# Infer platform if not specified.
if values.get("platform") and values["platform"] != guessed_platform:
raise ValueError(
f"All path_specs belong to {guessed_platform} platform, but platform is set to {values['platform']}"
)
else:
logger.debug(f'Setting config "platform": {guessed_platform}')
values["platform"] = guessed_platform

return path_specs

@pydantic.root_validator()
def ensure_profiling_pattern_is_passed_to_profiling(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_delta_lake(pytestconfig, source_file, tmp_path, mock_time):
)


def test_data_lake_incorrect_config_raises_error(tmp_path, mock_time):
def test_delta_lake_incorrect_config_raises_error(tmp_path, mock_time):
config_dict = {}
config_dict["sink"] = {
"type": "file",
Expand Down
Loading

0 comments on commit 25d3931

Please sign in to comment.