diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py index 98db275e754c0f..b74c096d0798e8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py @@ -1,6 +1,6 @@ import logging from dataclasses import dataclass, field -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from pydantic import Field, validator from pyiceberg.catalog import Catalog, load_catalog @@ -59,7 +59,7 @@ class IcebergSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin) default=None, description="Iceberg Stateful Ingestion Config." ) # The catalog configuration is using a dictionary to be open and flexible. All the keys and values are handled by pyiceberg. This will future-proof any configuration change done by pyiceberg. - catalog: Dict[str, Dict[str, str]] = Field( + catalog: Dict[str, Dict[str, Any]] = Field( description="Catalog configuration where to find Iceberg tables. Only one catalog specification is supported. The format is the same as [pyiceberg's catalog configuration](https://py.iceberg.apache.org/configuration/), where the catalog name is specified as the object name and attributes are set as key-value pairs.", ) table_pattern: AllowDenyPattern = Field( diff --git a/metadata-ingestion/tests/unit/test_iceberg.py b/metadata-ingestion/tests/unit/test_iceberg.py index 5df7a2f3aa944a..c8c6c6ac8a85d3 100644 --- a/metadata-ingestion/tests/unit/test_iceberg.py +++ b/metadata-ingestion/tests/unit/test_iceberg.py @@ -1,6 +1,6 @@ import uuid from decimal import Decimal -from typing import Any, Optional +from typing import Any, Dict, List, Optional import pytest from pydantic import ValidationError @@ -124,6 +124,34 @@ def test_config_for_tests(): with_iceberg_source() +def test_config_support_nested_dicts(): + """ + Test that Iceberg source supports nested dictionaries inside its configuration, as allowed by pyiceberg. + """ + catalog = { + "test": { + "type": "rest", + "nested_dict": { + "nested_key": "nested_value", + "nested_array": ["a1", "a2"], + "subnested_dict": {"subnested_key": "subnested_value"}, + }, + } + } + test_config = IcebergSourceConfig(catalog=catalog) + assert isinstance(test_config.catalog["test"]["nested_dict"], Dict) + assert test_config.catalog["test"]["nested_dict"]["nested_key"] == "nested_value" + assert isinstance(test_config.catalog["test"]["nested_dict"]["nested_array"], List) + assert test_config.catalog["test"]["nested_dict"]["nested_array"][0] == "a1" + assert isinstance( + test_config.catalog["test"]["nested_dict"]["subnested_dict"], Dict + ) + assert ( + test_config.catalog["test"]["nested_dict"]["subnested_dict"]["subnested_key"] + == "subnested_value" + ) + + @pytest.mark.parametrize( "iceberg_type, expected_schema_field_type", [