Skip to content

Commit

Permalink
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Browse files Browse the repository at this point in the history
…estion' into jj--add-structured-logging-to-ingestion
  • Loading branch information
John Joyce authored and John Joyce committed Jul 3, 2024
2 parents d72da4a + 2dfef8d commit 6547a5a
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 76 deletions.
54 changes: 12 additions & 42 deletions metadata-ingestion/examples/recipes/redshift_to_datahub.dhub.yaml
Original file line number Diff line number Diff line change
@@ -1,47 +1,17 @@
---
# see https://datahubproject.io/docs/generated/ingestion/sources/redshift for complete documentation
source:
type: "redshift"
type: redshift
config:
# Coordinates
host_port: host:port
database: database_name
options:
connect_args:
sslmode: prefer
# Credentials
username: datahub
password: datahub
#include_tables: true
#include_views: true
#include_table_lineage: true
#default_schema: public
#table_lineage_mode: stl_scan_based
#include_copy_lineage: true
#start_time: 2020-12-15T20:08:23.091Z
#end_time: 2023-12-15T20:08:23.091Z
#profiling:
# enabled: true
# turn_off_expensive_profiling_metrics: false
# limit: 10
# query_combiner_enabled: true
# max_number_of_fields_to_profile: 8
# profile_table_level_only: false
# include_field_null_count: true
# include_field_min_value: true
# include_field_max_value: true
# include_field_mean_value: true
# include_field_median_value: true
# include_field_stddev_value: false
# include_field_quantiles: false
# include_field_distinct_value_frequencies: false
# include_field_histogram: false
# include_field_sample_values: false
#profile_pattern:
# allow:
# - "schema.table.column"
# deny:
# - "*.*.*"
host_port: 'redshift-cluster-for-testing-connectors.cdj7lmroi8gs.us-west-2.redshift.amazonaws.com:5438'
username: connector_test
table_lineage_mode: stl_scan_based
include_table_lineage: true
include_tables: true
include_views: true
profiling:
enabled: true
profile_table_level_only: true
database: dev
password: '9_6rXHKcawa_JEbBYRCQo_V-M.'

# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
Expand Down
57 changes: 29 additions & 28 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,9 @@ def __init__(self, ctx: PipelineContext, config: ModeConfig):
self._get_request_json(f"{self.config.connect_uri}/api/verify")
except HTTPError as http_error:
self.report.report_failure(
title="Unable to Verify Connection",
message=f"Unable to verify connection. Error was: {str(http_error)}",
title="Failed to Connect",
message="Unable to verify connection to mode.",
context=f"Error: {str(http_error)}",
)

self.workspace_uri = f"{self.config.connect_uri}/api/{self.config.workspace}"
Expand Down Expand Up @@ -528,9 +529,9 @@ def _get_space_name_and_tokens(self) -> dict:
space_info[s.get("token", "")] = s.get("name", "")
except HTTPError as http_error:
self.report.report_failure(
title="Unable to Retrieve Spaces for Workspace",
message=f"Unable to retrieve spaces/collections for {self.workspace_uri}, "
f"Reason: {str(http_error)}",
title="Failed to Retrieve Spaces",
message="Unable to retrieve spaces / collections for workspace.",
context=f"Workspace: {self.workspace_uri}, Error: {str(http_error)}",
)

return space_info
Expand Down Expand Up @@ -676,8 +677,9 @@ def _get_data_sources(self) -> List[dict]:
data_sources = ds_json.get("_embedded", {}).get("data_sources", [])
except HTTPError as http_error:
self.report.report_failure(
title="Unable to retrieve Data Sources",
message=f"Unable to retrieve data sources. Reason: {str(http_error)}",
title="Failed to retrieve Data Sources",
message="Unable to retrieve data sources from Mode.",
context=f"Error:{str(http_error)}",
)

return data_sources
Expand All @@ -691,7 +693,7 @@ def _get_platform_and_dbname(
if not data_sources:
self.report.report_failure(
title="No Data Sources Found",
message=f"No data sources found for datasource id: "
message=f"No data sources found for datasource with id: "
f"{data_source_id}",
)
return None, None
Expand Down Expand Up @@ -762,9 +764,9 @@ def _get_definition(self, definition_name):

except HTTPError as http_error:
self.report.report_failure(
title="Unable to Retrieve Definition",
message=f"Unable to retrieve definition for {definition_name}, "
f"Reason: {str(http_error)}",
title="Failed to Retrieve Definition",
message="Unable to retrieve definition from Mode.",
context=f"Definition Name: {definition_name}, Error: {str(http_error)}",
)
return None

Expand All @@ -783,10 +785,9 @@ def _get_source_from_query(self, raw_query: str) -> set:
source_paths.add(f"{source_schema}.{source_table}")
except Exception as e:
self.report.report_failure(
title="Unable to Extract Lineage From Query",
message=f"Unable to retrieve lineage from query. "
f"Query: {raw_query} "
f"Reason: {str(e)} ",
title="Failed to Extract Lineage From Query",
message="Unable to retrieve lineage from Mode query.",
context=f"Query: {raw_query}, Error: {str(e)}",
)

return source_paths
Expand Down Expand Up @@ -1322,9 +1323,9 @@ def _get_reports(self, space_token: str) -> List[dict]:
reports = reports_json.get("_embedded", {}).get("reports", {})
except HTTPError as http_error:
self.report.report_failure(
title="Unable to Retrieve Reports for Space",
message=f"Unable to retrieve reports for space token: {space_token}, "
f"Reason: {str(http_error)}",
title="Failed to Retrieve Reports for Space",
message="Unable to retrieve reports for space token.",
context=f"Space Token: {space_token}, Error: {str(http_error)}",
)
return reports

Expand All @@ -1338,9 +1339,9 @@ def _get_queries(self, report_token: str) -> list:
queries = queries_json.get("_embedded", {}).get("queries", {})
except HTTPError as http_error:
self.report.report_failure(
title="Unable to Retrieve Queries",
message=f"Unable to retrieve queries for report token: {report_token}, "
f"Reason: {str(http_error)}",
title="Failed to Retrieve Queries",
message="Unable to retrieve queries for report token.",
context=f"Report Token: {report_token}, Error: {str(http_error)}",
)
return queries

Expand All @@ -1355,9 +1356,9 @@ def _get_last_query_run(
queries = queries_json.get("_embedded", {}).get("queries", {})
except HTTPError as http_error:
self.report.report_failure(
title="Unable to Retrieve Queries for Report",
message=f"Unable to retrieve queries for report token: {report_token}, "
f"Reason: {str(http_error)}",
title="Failed to Retrieve Queries for Report",
message="Unable to retrieve queries for report token.",
context=f"Report Token:{report_token}, Error: {str(http_error)}",
)
return {}
return queries
Expand All @@ -1373,11 +1374,11 @@ def _get_charts(self, report_token: str, query_token: str) -> list:
charts = charts_json.get("_embedded", {}).get("charts", {})
except HTTPError as http_error:
self.report.report_failure(
title="Unable to Retrieve Charts",
message=f"Unable to retrieve charts: "
f"Report token: {report_token} "
title="Failed to Retrieve Charts",
message="Unable to retrieve charts from Mode.",
context=f"Report Token: {report_token}, "
f"Query token: {query_token}, "
f"Reason: {str(http_error)}",
f"Error: {str(http_error)}",
)
return charts

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def test_pipeline(pytestconfig, tmp_path):


@freeze_time(FROZEN_TIME)
def test_mode_ingest_success(
def test_metabase_ingest_success(
pytestconfig, tmp_path, test_pipeline, mock_datahub_graph, default_json_response_map
):
with patch(
Expand Down Expand Up @@ -260,7 +260,7 @@ def test_stateful_ingestion(


@freeze_time(FROZEN_TIME)
def test_mode_ingest_failure(pytestconfig, tmp_path, default_json_response_map):
def test_metabase_ingest_failure(pytestconfig, tmp_path, default_json_response_map):
with patch(
"datahub.ingestion.source.metabase.requests.session",
side_effect=MockResponse.build_mocked_requests_failure(
Expand Down
4 changes: 0 additions & 4 deletions metadata-ingestion/tests/integration/mode/test_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,3 @@ def test_mode_ingest_failure(pytestconfig, tmp_path):
except PipelineExecutionError as exec_error:
assert exec_error.args[0] == "Source reported errors"
assert len(exec_error.args[1].failures) == 1
assert (
list(exec_error.args[1].failures.keys())[0]
== "mode-report-75737b70402e"
)

0 comments on commit 6547a5a

Please sign in to comment.