From 0d677e49922df94b1c4254455541ac988952c84a Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 25 Jun 2024 22:21:54 -0700 Subject: [PATCH] fix(ingest/snowflake): fix column batcher (#10781) --- .../source/snowflake/snowflake_query.py | 105 ++---------------- .../datahub/utilities/prefix_batch_builder.py | 2 +- .../tests/integration/snowflake/common.py | 103 ----------------- .../utilities/test_prefix_patch_builder.py | 2 +- 4 files changed, 14 insertions(+), 198 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index c4b6f597bbb7e5..3c4e9c1ce62413 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -327,45 +327,6 @@ def operational_data_for_time_window( ORDER BY query_start_time DESC ;""" - @staticmethod - def table_to_table_lineage_history( - start_time_millis: int, - end_time_millis: int, - include_column_lineage: bool = True, - ) -> str: - return f""" - WITH table_lineage_history AS ( - SELECT - r.value:"objectName"::varchar AS upstream_table_name, - r.value:"objectDomain"::varchar AS upstream_table_domain, - r.value:"columns" AS upstream_table_columns, - w.value:"objectName"::varchar AS downstream_table_name, - w.value:"objectDomain"::varchar AS downstream_table_domain, - w.value:"columns" AS downstream_table_columns, - t.query_start_time AS query_start_time - FROM - (SELECT * from snowflake.account_usage.access_history) t, - lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r, - lateral flatten(input => t.OBJECTS_MODIFIED) w - WHERE r.value:"objectId" IS NOT NULL - AND w.value:"objectId" IS NOT NULL - AND w.value:"objectName" NOT LIKE '%.GE_TMP_%' - AND w.value:"objectName" NOT LIKE '%.GE_TEMP_%' - AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3) - AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)) - SELECT - upstream_table_name AS "UPSTREAM_TABLE_NAME", - downstream_table_name AS "DOWNSTREAM_TABLE_NAME", - upstream_table_columns AS "UPSTREAM_TABLE_COLUMNS", - downstream_table_columns AS "DOWNSTREAM_TABLE_COLUMNS" - FROM table_lineage_history - WHERE upstream_table_domain in ('Table', 'External table') and downstream_table_domain = 'Table' - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY downstream_table_name, - upstream_table_name{", downstream_table_columns" if include_column_lineage else ""} - ORDER BY query_start_time DESC - ) = 1""" - @staticmethod def view_dependencies() -> str: return """ @@ -386,58 +347,6 @@ def view_dependencies() -> str: referencing_object_domain in ('VIEW', 'MATERIALIZED VIEW') """ - @staticmethod - def view_lineage_history( - start_time_millis: int, - end_time_millis: int, - include_column_lineage: bool = True, - ) -> str: - return f""" - WITH view_lineage_history AS ( - SELECT - vu.value : "objectName"::varchar AS view_name, - vu.value : "objectDomain"::varchar AS view_domain, - vu.value : "columns" AS view_columns, - w.value : "objectName"::varchar AS downstream_table_name, - w.value : "objectDomain"::varchar AS downstream_table_domain, - w.value : "columns" AS downstream_table_columns, - t.query_start_time AS query_start_time - FROM - ( - SELECT - * - FROM - snowflake.account_usage.access_history - ) t, - lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) vu, - lateral flatten(input => t.OBJECTS_MODIFIED) w - WHERE - vu.value : "objectId" IS NOT NULL - AND w.value : "objectId" IS NOT NULL - AND w.value : "objectName" NOT LIKE '%.GE_TMP_%' - AND w.value : "objectName" NOT LIKE '%.GE_TEMP_%' - AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3) - AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3) - ) - SELECT - view_name AS "VIEW_NAME", - view_domain AS "VIEW_DOMAIN", - view_columns AS "VIEW_COLUMNS", - downstream_table_name AS "DOWNSTREAM_TABLE_NAME", - downstream_table_domain AS "DOWNSTREAM_TABLE_DOMAIN", - downstream_table_columns AS "DOWNSTREAM_TABLE_COLUMNS" - FROM - view_lineage_history - WHERE - view_domain in ('View', 'Materialized view') - QUALIFY ROW_NUMBER() OVER ( - PARTITION BY view_name, - downstream_table_name {", downstream_table_columns" if include_column_lineage else ""} - ORDER BY - query_start_time DESC - ) = 1 - """ - # Note on use of `upstreams_deny_pattern` to ignore temporary tables: # Snowflake access history may include temporary tables in DIRECT_OBJECTS_ACCESSED and # OBJECTS_MODIFIED->columns->directSources. We do not need these temporary tables and filter these in the query. @@ -773,7 +682,12 @@ def table_upstreams_with_column_lineage( t.query_start_time AS query_start_time, t.query_id AS query_id FROM - (SELECT * from snowflake.account_usage.access_history) t, + ( + SELECT * from snowflake.account_usage.access_history + WHERE + query_start_time >= to_timestamp_ltz({start_time_millis}, 3) + AND query_start_time < to_timestamp_ltz({end_time_millis}, 3) + ) t, lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r, lateral flatten(input => t.OBJECTS_MODIFIED) w, lateral flatten(input => w.value : "columns", outer => true) wcols, @@ -933,7 +847,12 @@ def table_upstreams_only( t.query_start_time AS query_start_time, t.query_id AS query_id FROM - (SELECT * from snowflake.account_usage.access_history) t, + ( + SELECT * from snowflake.account_usage.access_history + WHERE + query_start_time >= to_timestamp_ltz({start_time_millis}, 3) + AND query_start_time < to_timestamp_ltz({end_time_millis}, 3) + ) t, lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r, lateral flatten(input => t.OBJECTS_MODIFIED) w WHERE diff --git a/metadata-ingestion/src/datahub/utilities/prefix_batch_builder.py b/metadata-ingestion/src/datahub/utilities/prefix_batch_builder.py index b6da7a1fbd1521..271c2517e87713 100644 --- a/metadata-ingestion/src/datahub/utilities/prefix_batch_builder.py +++ b/metadata-ingestion/src/datahub/utilities/prefix_batch_builder.py @@ -35,7 +35,7 @@ def split_group(group: PrefixGroup) -> List[PrefixGroup]: prefix_length = len(group.prefix) + 1 subgroups = defaultdict(list) for name in group.names: - if len(name) <= prefix_length: + if len(name) < prefix_length: # Handle cases where a single name is also the prefix for a large number of names. # For example, if NAME and NAME_{1..10000} are both in the list. result.append(PrefixGroup(prefix=name, names=[name], exact_match=True)) diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index 881fac96f82e8d..1d3e2c8b95af3a 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -433,69 +433,6 @@ def default_query_results( # noqa: C901 for i in range(num_usages) ] return mock - elif query in ( - snowflake_query.SnowflakeQuery.table_to_table_lineage_history( - 1654473600000, - 1654586220000, - ), - snowflake_query.SnowflakeQuery.table_to_table_lineage_history( - 1654473600000, 1654586220000, False - ), - ): - return [ - { - "DOWNSTREAM_TABLE_NAME": f"TEST_DB.TEST_SCHEMA.TABLE_{op_idx}", - "UPSTREAM_TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_2", - "UPSTREAM_TABLE_COLUMNS": json.dumps( - [ - {"columnId": 0, "columnName": f"COL_{col_idx}"} - for col_idx in range(1, num_cols + 1) - ] - ), - "DOWNSTREAM_TABLE_COLUMNS": json.dumps( - [ - { - "columnId": 0, - "columnName": f"COL_{col_idx}", - "directSources": [ - { - "columnName": f"COL_{col_idx}", - "objectDomain": "Table", - "objectId": 0, - "objectName": "TEST_DB.TEST_SCHEMA.TABLE_2", - } - ], - } - for col_idx in range(1, num_cols + 1) - ] - ), - } - for op_idx in range(1, num_ops + 1) - ] + [ - { - "DOWNSTREAM_TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_1", - "UPSTREAM_TABLE_NAME": "OTHER_DB.OTHER_SCHEMA.TABLE_1", - "UPSTREAM_TABLE_COLUMNS": json.dumps( - [{"columnId": 0, "columnName": "COL_1"}] - ), - "DOWNSTREAM_TABLE_COLUMNS": json.dumps( - [ - { - "columnId": 0, - "columnName": "COL_1", - "directSources": [ - { - "columnName": "COL_1", - "objectDomain": "Table", - "objectId": 0, - "objectName": "OTHER_DB.OTHER_SCHEMA.TABLE_1", - } - ], - } - ] - ), - } - ] elif query in ( snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( start_time_millis=1654473600000, @@ -662,46 +599,6 @@ def default_query_results( # noqa: C901 ), } ] - elif query in [ - snowflake_query.SnowflakeQuery.view_lineage_history( - 1654473600000, - 1654586220000, - ), - snowflake_query.SnowflakeQuery.view_lineage_history( - 1654473600000, 1654586220000, False - ), - ]: - return [ - { - "DOWNSTREAM_TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_1", - "VIEW_NAME": "TEST_DB.TEST_SCHEMA.VIEW_1", - "VIEW_DOMAIN": "VIEW", - "VIEW_COLUMNS": json.dumps( - [ - {"columnId": 0, "columnName": f"COL_{col_idx}"} - for col_idx in range(1, num_cols + 1) - ] - ), - "DOWNSTREAM_TABLE_DOMAIN": "TABLE", - "DOWNSTREAM_TABLE_COLUMNS": json.dumps( - [ - { - "columnId": 0, - "columnName": f"COL_{col_idx}", - "directSources": [ - { - "columnName": f"COL_{col_idx}", - "objectDomain": "Table", - "objectId": 0, - "objectName": "TEST_DB.TEST_SCHEMA.TABLE_2", - } - ], - } - for col_idx in range(1, num_cols + 1) - ] - ), - } - ] elif query in [ snowflake_query.SnowflakeQuery.view_dependencies_v2(), snowflake_query.SnowflakeQuery.view_dependencies(), diff --git a/metadata-ingestion/tests/unit/utilities/test_prefix_patch_builder.py b/metadata-ingestion/tests/unit/utilities/test_prefix_patch_builder.py index 19af7e9f66c1ab..c62bac24a0a42b 100644 --- a/metadata-ingestion/tests/unit/utilities/test_prefix_patch_builder.py +++ b/metadata-ingestion/tests/unit/utilities/test_prefix_patch_builder.py @@ -35,7 +35,7 @@ def test_build_prefix_batches_exceeds_max_batch_size(): ] expected = [ [PrefixGroup(prefix="app", names=["app"], exact_match=True)], - [PrefixGroup(prefix="app", names=["apple", "applet", "application"])], + [PrefixGroup(prefix="appl", names=["apple", "applet", "application"])], [PrefixGroup(prefix="b", names=["banana", "band", "bandana"])], [ PrefixGroup(prefix="c", names=["candle", "candy"]),