diff --git a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py index 28def68ccf3f5..c143a8b49f4b7 100644 --- a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py +++ b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py @@ -146,12 +146,55 @@ def __init__(self, sink: Sink, report_recipe: bool, ctx: PipelineContext) -> Non aspect_value=source_info_aspect, ) + @staticmethod + def _convert_sets_to_lists(obj: Any) -> Any: + """ + Recursively converts all sets to lists in a Python object. + Works with nested dictionaries, lists, and sets. + + Args: + obj: Any Python object that might contain sets + + Returns: + The object with all sets converted to lists + """ + if isinstance(obj, dict): + return { + key: DatahubIngestionRunSummaryProvider._convert_sets_to_lists(value) + for key, value in obj.items() + } + elif isinstance(obj, list): + return [ + DatahubIngestionRunSummaryProvider._convert_sets_to_lists(element) + for element in obj + ] + elif isinstance(obj, set): + return [ + DatahubIngestionRunSummaryProvider._convert_sets_to_lists(element) + for element in obj + ] + elif isinstance(obj, tuple): + return tuple( + DatahubIngestionRunSummaryProvider._convert_sets_to_lists(element) + for element in obj + ) + else: + return obj + def _get_recipe_to_report(self, ctx: PipelineContext) -> str: assert ctx.pipeline_config if not self.report_recipe or not ctx.pipeline_config.get_raw_dict(): return "" else: - return json.dumps(redact_raw_config(ctx.pipeline_config.get_raw_dict())) + redacted_recipe = redact_raw_config(ctx.pipeline_config.get_raw_dict()) + # This is required otherwise json dumps will fail + # with a TypeError: Object of type set is not JSON serializable + converted_recipe = ( + DatahubIngestionRunSummaryProvider._convert_sets_to_lists( + redacted_recipe + ) + ) + return json.dumps(converted_recipe) def _emit_aspect(self, entity_urn: Urn, aspect_value: _Aspect) -> None: self.sink.write_record_async( diff --git a/metadata-ingestion/tests/unit/reporting/test_datahub_ingestion_reporter.py b/metadata-ingestion/tests/unit/reporting/test_datahub_ingestion_reporter.py index 749ea03a7f20a..2ab6208e2dcc6 100644 --- a/metadata-ingestion/tests/unit/reporting/test_datahub_ingestion_reporter.py +++ b/metadata-ingestion/tests/unit/reporting/test_datahub_ingestion_reporter.py @@ -1,3 +1,5 @@ +from typing import Any, Dict, List, Set, Tuple, Union + import pytest from datahub.ingestion.reporting.datahub_ingestion_run_summary_provider import ( @@ -50,3 +52,136 @@ def test_default_config(): typed_config = DatahubIngestionRunSummaryProviderConfig.parse_obj({}) assert typed_config.sink is None assert typed_config.report_recipe is True + + +def test_simple_set() -> None: + """Test conversion of a simple set""" + input_data: Set[int] = {1, 2, 3} + expected: List[int] = [1, 2, 3] + result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data) + assert sorted(result) == sorted(expected) + assert isinstance(result, list) + + +def test_nested_dict_with_sets() -> None: + """Test conversion of nested dictionary containing sets""" + input_data: Dict[str, Union[Set[int], Dict[str, Set[str]]]] = { + "set1": {1, 2, 3}, + "dict1": {"set2": {"a", "b"}}, + } + expected = { + "set1": [1, 2, 3], + "dict1": {"set2": ["a", "b"]}, + } + result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data) + + def sort_nested_lists(d): + return { + k: ( + sorted(v) + if isinstance(v, list) + else (sort_nested_lists(v) if isinstance(v, dict) else v) + ) + for k, v in d.items() + } + + assert sort_nested_lists(result) == sort_nested_lists(expected) + + +def test_nested_lists_with_sets() -> None: + """Test conversion of nested lists containing sets""" + input_data = [{1, 2}, [{3, 4}, {5, 6}]] + expected = [[1, 2], [[3, 4], [5, 6]]] + result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data) + assert [ + sorted(x) + if isinstance(x, list) and len(x) > 0 and not isinstance(x[0], list) + else x + for x in result + ] == [ + sorted(x) + if isinstance(x, list) and len(x) > 0 and not isinstance(x[0], list) + else x + for x in expected + ] + + +def test_tuple_with_sets() -> None: + """Test conversion of tuples containing sets""" + input_data = (1, {2, 3}, 4) + expected = (1, [2, 3], 4) + result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data) + assert (result[0], sorted(result[1]), result[2]) == ( + expected[0], + sorted(expected[1]), + expected[2], + ) + assert isinstance(result, tuple) + + +def test_mixed_nested_structure() -> None: + """Test conversion of a complex nested structure""" + input_data = { + "simple_set": {1, 2, 3}, + "nested_dict": { + "another_set": {"a", "b", "c"}, + "mixed_list": [1, {2, 3}, {"x", "y"}], + }, + "tuple_with_set": (1, {4, 5}, 6), + "list_of_sets": [{1, 2}, {3, 4}], + } + result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data) + + # Verify structure types + assert isinstance(result["simple_set"], list) + assert isinstance(result["nested_dict"]["another_set"], list) + assert isinstance(result["nested_dict"]["mixed_list"][1], list) + assert isinstance(result["nested_dict"]["mixed_list"][2], list) + assert isinstance(result["tuple_with_set"], tuple) + assert isinstance(result["tuple_with_set"][1], list) + assert isinstance(result["list_of_sets"][0], list) + + +def test_non_set_data() -> None: + """Test that non-set data remains unchanged""" + input_data = { + "string": "hello", + "int": 42, + "float": 3.14, + "bool": True, + "none": None, + "list": [1, 2, 3], + "dict": {"a": 1, "b": 2}, + } + result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data) + assert result == input_data + + +def test_empty_structures() -> None: + """Test handling of empty structures""" + input_data: Dict[ + str, Union[Set[Any], Dict[Any, Any], List[Any], Tuple[Any, ...]] + ] = {"empty_set": set(), "empty_dict": {}, "empty_list": [], "empty_tuple": ()} + expected: Dict[ + str, Union[List[Any], Dict[Any, Any], List[Any], Tuple[Any, ...]] + ] = {"empty_set": [], "empty_dict": {}, "empty_list": [], "empty_tuple": ()} + result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data) + assert result == expected + + +def test_json_serializable() -> None: + """Test that the converted structure is JSON serializable""" + import json + + input_data = { + "set": {1, 2, 3}, + "nested": {"set": {"a", "b"}}, + "mixed": [1, {2, 3}, {"x"}], + } + result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data) + try: + json.dumps(result) + serializable = True + except TypeError: + serializable = False + assert serializable