Skip to content

Commit

Permalink
fix(ingest/snowflake): fix column batcher (datahub-project#10781)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jun 26, 2024
1 parent 8567692 commit 0d677e4
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,45 +327,6 @@ def operational_data_for_time_window(
ORDER BY query_start_time DESC
;"""

@staticmethod
def table_to_table_lineage_history(
start_time_millis: int,
end_time_millis: int,
include_column_lineage: bool = True,
) -> str:
return f"""
WITH table_lineage_history AS (
SELECT
r.value:"objectName"::varchar AS upstream_table_name,
r.value:"objectDomain"::varchar AS upstream_table_domain,
r.value:"columns" AS upstream_table_columns,
w.value:"objectName"::varchar AS downstream_table_name,
w.value:"objectDomain"::varchar AS downstream_table_domain,
w.value:"columns" AS downstream_table_columns,
t.query_start_time AS query_start_time
FROM
(SELECT * from snowflake.account_usage.access_history) t,
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r,
lateral flatten(input => t.OBJECTS_MODIFIED) w
WHERE r.value:"objectId" IS NOT NULL
AND w.value:"objectId" IS NOT NULL
AND w.value:"objectName" NOT LIKE '%.GE_TMP_%'
AND w.value:"objectName" NOT LIKE '%.GE_TEMP_%'
AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3))
SELECT
upstream_table_name AS "UPSTREAM_TABLE_NAME",
downstream_table_name AS "DOWNSTREAM_TABLE_NAME",
upstream_table_columns AS "UPSTREAM_TABLE_COLUMNS",
downstream_table_columns AS "DOWNSTREAM_TABLE_COLUMNS"
FROM table_lineage_history
WHERE upstream_table_domain in ('Table', 'External table') and downstream_table_domain = 'Table'
QUALIFY ROW_NUMBER() OVER (
PARTITION BY downstream_table_name,
upstream_table_name{", downstream_table_columns" if include_column_lineage else ""}
ORDER BY query_start_time DESC
) = 1"""

@staticmethod
def view_dependencies() -> str:
return """
Expand All @@ -386,58 +347,6 @@ def view_dependencies() -> str:
referencing_object_domain in ('VIEW', 'MATERIALIZED VIEW')
"""

@staticmethod
def view_lineage_history(
start_time_millis: int,
end_time_millis: int,
include_column_lineage: bool = True,
) -> str:
return f"""
WITH view_lineage_history AS (
SELECT
vu.value : "objectName"::varchar AS view_name,
vu.value : "objectDomain"::varchar AS view_domain,
vu.value : "columns" AS view_columns,
w.value : "objectName"::varchar AS downstream_table_name,
w.value : "objectDomain"::varchar AS downstream_table_domain,
w.value : "columns" AS downstream_table_columns,
t.query_start_time AS query_start_time
FROM
(
SELECT
*
FROM
snowflake.account_usage.access_history
) t,
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) vu,
lateral flatten(input => t.OBJECTS_MODIFIED) w
WHERE
vu.value : "objectId" IS NOT NULL
AND w.value : "objectId" IS NOT NULL
AND w.value : "objectName" NOT LIKE '%.GE_TMP_%'
AND w.value : "objectName" NOT LIKE '%.GE_TEMP_%'
AND t.query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND t.query_start_time < to_timestamp_ltz({end_time_millis}, 3)
)
SELECT
view_name AS "VIEW_NAME",
view_domain AS "VIEW_DOMAIN",
view_columns AS "VIEW_COLUMNS",
downstream_table_name AS "DOWNSTREAM_TABLE_NAME",
downstream_table_domain AS "DOWNSTREAM_TABLE_DOMAIN",
downstream_table_columns AS "DOWNSTREAM_TABLE_COLUMNS"
FROM
view_lineage_history
WHERE
view_domain in ('View', 'Materialized view')
QUALIFY ROW_NUMBER() OVER (
PARTITION BY view_name,
downstream_table_name {", downstream_table_columns" if include_column_lineage else ""}
ORDER BY
query_start_time DESC
) = 1
"""

# Note on use of `upstreams_deny_pattern` to ignore temporary tables:
# Snowflake access history may include temporary tables in DIRECT_OBJECTS_ACCESSED and
# OBJECTS_MODIFIED->columns->directSources. We do not need these temporary tables and filter these in the query.
Expand Down Expand Up @@ -773,7 +682,12 @@ def table_upstreams_with_column_lineage(
t.query_start_time AS query_start_time,
t.query_id AS query_id
FROM
(SELECT * from snowflake.account_usage.access_history) t,
(
SELECT * from snowflake.account_usage.access_history
WHERE
query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND query_start_time < to_timestamp_ltz({end_time_millis}, 3)
) t,
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r,
lateral flatten(input => t.OBJECTS_MODIFIED) w,
lateral flatten(input => w.value : "columns", outer => true) wcols,
Expand Down Expand Up @@ -933,7 +847,12 @@ def table_upstreams_only(
t.query_start_time AS query_start_time,
t.query_id AS query_id
FROM
(SELECT * from snowflake.account_usage.access_history) t,
(
SELECT * from snowflake.account_usage.access_history
WHERE
query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND query_start_time < to_timestamp_ltz({end_time_millis}, 3)
) t,
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r,
lateral flatten(input => t.OBJECTS_MODIFIED) w
WHERE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def split_group(group: PrefixGroup) -> List[PrefixGroup]:
prefix_length = len(group.prefix) + 1
subgroups = defaultdict(list)
for name in group.names:
if len(name) <= prefix_length:
if len(name) < prefix_length:
# Handle cases where a single name is also the prefix for a large number of names.
# For example, if NAME and NAME_{1..10000} are both in the list.
result.append(PrefixGroup(prefix=name, names=[name], exact_match=True))
Expand Down
103 changes: 0 additions & 103 deletions metadata-ingestion/tests/integration/snowflake/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,69 +433,6 @@ def default_query_results( # noqa: C901
for i in range(num_usages)
]
return mock
elif query in (
snowflake_query.SnowflakeQuery.table_to_table_lineage_history(
1654473600000,
1654586220000,
),
snowflake_query.SnowflakeQuery.table_to_table_lineage_history(
1654473600000, 1654586220000, False
),
):
return [
{
"DOWNSTREAM_TABLE_NAME": f"TEST_DB.TEST_SCHEMA.TABLE_{op_idx}",
"UPSTREAM_TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_2",
"UPSTREAM_TABLE_COLUMNS": json.dumps(
[
{"columnId": 0, "columnName": f"COL_{col_idx}"}
for col_idx in range(1, num_cols + 1)
]
),
"DOWNSTREAM_TABLE_COLUMNS": json.dumps(
[
{
"columnId": 0,
"columnName": f"COL_{col_idx}",
"directSources": [
{
"columnName": f"COL_{col_idx}",
"objectDomain": "Table",
"objectId": 0,
"objectName": "TEST_DB.TEST_SCHEMA.TABLE_2",
}
],
}
for col_idx in range(1, num_cols + 1)
]
),
}
for op_idx in range(1, num_ops + 1)
] + [
{
"DOWNSTREAM_TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_1",
"UPSTREAM_TABLE_NAME": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
"UPSTREAM_TABLE_COLUMNS": json.dumps(
[{"columnId": 0, "columnName": "COL_1"}]
),
"DOWNSTREAM_TABLE_COLUMNS": json.dumps(
[
{
"columnId": 0,
"columnName": "COL_1",
"directSources": [
{
"columnName": "COL_1",
"objectDomain": "Table",
"objectId": 0,
"objectName": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
}
],
}
]
),
}
]
elif query in (
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=1654473600000,
Expand Down Expand Up @@ -662,46 +599,6 @@ def default_query_results( # noqa: C901
),
}
]
elif query in [
snowflake_query.SnowflakeQuery.view_lineage_history(
1654473600000,
1654586220000,
),
snowflake_query.SnowflakeQuery.view_lineage_history(
1654473600000, 1654586220000, False
),
]:
return [
{
"DOWNSTREAM_TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_1",
"VIEW_NAME": "TEST_DB.TEST_SCHEMA.VIEW_1",
"VIEW_DOMAIN": "VIEW",
"VIEW_COLUMNS": json.dumps(
[
{"columnId": 0, "columnName": f"COL_{col_idx}"}
for col_idx in range(1, num_cols + 1)
]
),
"DOWNSTREAM_TABLE_DOMAIN": "TABLE",
"DOWNSTREAM_TABLE_COLUMNS": json.dumps(
[
{
"columnId": 0,
"columnName": f"COL_{col_idx}",
"directSources": [
{
"columnName": f"COL_{col_idx}",
"objectDomain": "Table",
"objectId": 0,
"objectName": "TEST_DB.TEST_SCHEMA.TABLE_2",
}
],
}
for col_idx in range(1, num_cols + 1)
]
),
}
]
elif query in [
snowflake_query.SnowflakeQuery.view_dependencies_v2(),
snowflake_query.SnowflakeQuery.view_dependencies(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_build_prefix_batches_exceeds_max_batch_size():
]
expected = [
[PrefixGroup(prefix="app", names=["app"], exact_match=True)],
[PrefixGroup(prefix="app", names=["apple", "applet", "application"])],
[PrefixGroup(prefix="appl", names=["apple", "applet", "application"])],
[PrefixGroup(prefix="b", names=["banana", "band", "bandana"])],
[
PrefixGroup(prefix="c", names=["candle", "candy"]),
Expand Down

0 comments on commit 0d677e4

Please sign in to comment.