Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): json serializable fix #12246

Merged
merged 2 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,55 @@ def __init__(self, sink: Sink, report_recipe: bool, ctx: PipelineContext) -> Non
aspect_value=source_info_aspect,
)

@staticmethod
def _convert_sets_to_lists(obj: Any) -> Any:
"""
Recursively converts all sets to lists in a Python object.
Works with nested dictionaries, lists, and sets.

Args:
obj: Any Python object that might contain sets

Returns:
The object with all sets converted to lists
"""
if isinstance(obj, dict):
return {
key: DatahubIngestionRunSummaryProvider._convert_sets_to_lists(value)
for key, value in obj.items()
}
elif isinstance(obj, list):
return [
DatahubIngestionRunSummaryProvider._convert_sets_to_lists(element)
for element in obj
]
elif isinstance(obj, set):
return [
DatahubIngestionRunSummaryProvider._convert_sets_to_lists(element)
for element in obj
]
elif isinstance(obj, tuple):
return tuple(
DatahubIngestionRunSummaryProvider._convert_sets_to_lists(element)
for element in obj
)
else:
return obj

def _get_recipe_to_report(self, ctx: PipelineContext) -> str:
assert ctx.pipeline_config
if not self.report_recipe or not ctx.pipeline_config.get_raw_dict():
return ""
else:
return json.dumps(redact_raw_config(ctx.pipeline_config.get_raw_dict()))
redacted_recipe = redact_raw_config(ctx.pipeline_config.get_raw_dict())
# This is required otherwise json dumps will fail
# with a TypeError: Object of type set is not JSON serializable
converted_recipe = (
DatahubIngestionRunSummaryProvider._convert_sets_to_lists(
redacted_recipe
)
)
return json.dumps(converted_recipe)

def _emit_aspect(self, entity_urn: Urn, aspect_value: _Aspect) -> None:
self.sink.write_record_async(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any, Dict, List, Set, Tuple, Union

import pytest

from datahub.ingestion.reporting.datahub_ingestion_run_summary_provider import (
Expand Down Expand Up @@ -50,3 +52,136 @@ def test_default_config():
typed_config = DatahubIngestionRunSummaryProviderConfig.parse_obj({})
assert typed_config.sink is None
assert typed_config.report_recipe is True


def test_simple_set() -> None:
"""Test conversion of a simple set"""
input_data: Set[int] = {1, 2, 3}
expected: List[int] = [1, 2, 3]
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
assert sorted(result) == sorted(expected)
assert isinstance(result, list)


def test_nested_dict_with_sets() -> None:
"""Test conversion of nested dictionary containing sets"""
input_data: Dict[str, Union[Set[int], Dict[str, Set[str]]]] = {
"set1": {1, 2, 3},
"dict1": {"set2": {"a", "b"}},
}
expected = {
"set1": [1, 2, 3],
"dict1": {"set2": ["a", "b"]},
}
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)

def sort_nested_lists(d):
return {
k: (
sorted(v)
if isinstance(v, list)
else (sort_nested_lists(v) if isinstance(v, dict) else v)
)
for k, v in d.items()
}

assert sort_nested_lists(result) == sort_nested_lists(expected)


def test_nested_lists_with_sets() -> None:
"""Test conversion of nested lists containing sets"""
input_data = [{1, 2}, [{3, 4}, {5, 6}]]
expected = [[1, 2], [[3, 4], [5, 6]]]
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
assert [
sorted(x)
if isinstance(x, list) and len(x) > 0 and not isinstance(x[0], list)
else x
for x in result
] == [
sorted(x)
if isinstance(x, list) and len(x) > 0 and not isinstance(x[0], list)
else x
for x in expected
]


def test_tuple_with_sets() -> None:
"""Test conversion of tuples containing sets"""
input_data = (1, {2, 3}, 4)
expected = (1, [2, 3], 4)
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
assert (result[0], sorted(result[1]), result[2]) == (
expected[0],
sorted(expected[1]),
expected[2],
)
assert isinstance(result, tuple)


def test_mixed_nested_structure() -> None:
"""Test conversion of a complex nested structure"""
input_data = {
"simple_set": {1, 2, 3},
"nested_dict": {
"another_set": {"a", "b", "c"},
"mixed_list": [1, {2, 3}, {"x", "y"}],
},
"tuple_with_set": (1, {4, 5}, 6),
"list_of_sets": [{1, 2}, {3, 4}],
}
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)

# Verify structure types
assert isinstance(result["simple_set"], list)
assert isinstance(result["nested_dict"]["another_set"], list)
assert isinstance(result["nested_dict"]["mixed_list"][1], list)
assert isinstance(result["nested_dict"]["mixed_list"][2], list)
assert isinstance(result["tuple_with_set"], tuple)
assert isinstance(result["tuple_with_set"][1], list)
assert isinstance(result["list_of_sets"][0], list)


def test_non_set_data() -> None:
"""Test that non-set data remains unchanged"""
input_data = {
"string": "hello",
"int": 42,
"float": 3.14,
"bool": True,
"none": None,
"list": [1, 2, 3],
"dict": {"a": 1, "b": 2},
}
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
assert result == input_data


def test_empty_structures() -> None:
"""Test handling of empty structures"""
input_data: Dict[
str, Union[Set[Any], Dict[Any, Any], List[Any], Tuple[Any, ...]]
] = {"empty_set": set(), "empty_dict": {}, "empty_list": [], "empty_tuple": ()}
expected: Dict[
str, Union[List[Any], Dict[Any, Any], List[Any], Tuple[Any, ...]]
] = {"empty_set": [], "empty_dict": {}, "empty_list": [], "empty_tuple": ()}
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
assert result == expected


def test_json_serializable() -> None:
"""Test that the converted structure is JSON serializable"""
import json

input_data = {
"set": {1, 2, 3},
"nested": {"set": {"a", "b"}},
"mixed": [1, {2, 3}, {"x"}],
}
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
try:
json.dumps(result)
serializable = True
except TypeError:
serializable = False
assert serializable
Loading