Skip to content

Commit

Permalink
fix: tweak datahub-kafka requirement (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jan 30, 2024
1 parent bb11384 commit 680fe19
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 22 deletions.
2 changes: 1 addition & 1 deletion datahub-actions/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_long_description():
acryl_datahub_min_version = os.environ.get("ACRYL_DATAHUB_MIN_VERSION") or "0.12.1.2"

base_requirements = {
f"acryl-datahub[kafka]>={acryl_datahub_min_version}",
f"acryl-datahub[datahub-kafka]>={acryl_datahub_min_version}",
# Compatibility.
"typing_extensions>=3.7.4; python_version < '3.8'",
"mypy_extensions>=0.4.3",
Expand Down
8 changes: 6 additions & 2 deletions datahub-actions/src/datahub_actions/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,13 @@ class Pipeline:
_stats: PipelineStats = PipelineStats()

# Options
_retry_count: int = DEFAULT_RETRY_COUNT # Number of times a single event should be retried in case of processing error.
_retry_count: int = (
DEFAULT_RETRY_COUNT # Number of times a single event should be retried in case of processing error.
)
_failure_mode: FailureMode = DEFAULT_FAILURE_MODE
_failed_events_dir: str = DEFAULT_FAILED_EVENTS_DIR # The top-level path where failed events will be logged.
_failed_events_dir: str = (
DEFAULT_FAILED_EVENTS_DIR # The top-level path where failed events will be logged.
)

def __init__(
self,
Expand Down
8 changes: 5 additions & 3 deletions datahub-actions/src/datahub_actions/pipeline/pipeline_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ def create_action_context(
) -> PipelineContext:
return PipelineContext(
pipeline_name,
AcrylDataHubGraph(DataHubGraph(datahub_config))
if datahub_config is not None
else None,
(
AcrylDataHubGraph(DataHubGraph(datahub_config))
if datahub_config is not None
else None
),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def __init__(
f"Following terms need server-side resolution {terms_needing_resolution} but a DataHub server wasn't provided. Either use fully qualified glossary term ids (e.g. urn:li:glossaryTerm:ec428203-ce86-4db3-985d-5a8ee6df32ba) or provide a datahub_api config in your recipe."
)
for term_identifier in terms_needing_resolution:
self.glossary_term_registry[
term_identifier
] = self._resolve_term_id_to_urn(term_identifier)
self.glossary_term_registry[term_identifier] = (
self._resolve_term_id_to_urn(term_identifier)
)
nodes_needing_resolution = [
d
for d in glossary_entities
Expand All @@ -54,9 +54,9 @@ def __init__(
f"Following term groups (glossary nodes) need server-side resolution {nodes_needing_resolution} but a DataHub server wasn't provided. Either use fully qualified glossary term ids (e.g. urn:li:glossaryTerm:ec428203-ce86-4db3-985d-5a8ee6df32ba) or provide a datahub_api config in your recipe."
)
for node_identifier in nodes_needing_resolution:
self.glossary_node_registry[
node_identifier
] = self._resolve_node_id_to_urn(node_identifier)
self.glossary_node_registry[node_identifier] = (
self._resolve_node_id_to_urn(node_identifier)
)

def _resolve_term_id_to_urn(self, term_identifier: str) -> Optional[str]:
assert self.graph
Expand Down
20 changes: 10 additions & 10 deletions datahub-actions/src/datahub_actions/utils/name_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ def get_entity_name(
self, entity_urn: Urn, datahub_graph: Optional[DataHubGraph]
) -> str:
if datahub_graph:
container_props: Optional[
ContainerPropertiesClass
] = datahub_graph.get_aspect(
entity_urn=str(entity_urn), aspect_type=ContainerPropertiesClass
container_props: Optional[ContainerPropertiesClass] = (
datahub_graph.get_aspect(
entity_urn=str(entity_urn), aspect_type=ContainerPropertiesClass
)
)
if container_props and container_props.name:
return container_props.name
Expand Down Expand Up @@ -200,9 +200,9 @@ def get_entity_name(
if user_properties and user_properties.displayName:
entity_name = user_properties.displayName

editable_properties: Optional[
CorpUserEditableInfoClass
] = datahub_graph.get_aspect(str(entity_urn), CorpUserEditableInfoClass)
editable_properties: Optional[CorpUserEditableInfoClass] = (
datahub_graph.get_aspect(str(entity_urn), CorpUserEditableInfoClass)
)
if editable_properties and editable_properties.displayName:
entity_name = editable_properties.displayName

Expand All @@ -229,9 +229,9 @@ def get_entity_name(
self, entity_urn: Urn, datahub_graph: Optional[DataHubGraph]
) -> str:
if datahub_graph:
dashboard_properties: Optional[
DashboardInfoClass
] = datahub_graph.get_aspect(str(entity_urn), DashboardInfoClass)
dashboard_properties: Optional[DashboardInfoClass] = (
datahub_graph.get_aspect(str(entity_urn), DashboardInfoClass)
)
if dashboard_properties and dashboard_properties.title:
return dashboard_properties.title

Expand Down

0 comments on commit 680fe19

Please sign in to comment.