Skip to content

Commit

Permalink
fix(ingest): fix TypeError when columns are returned as None in snowf…
Browse files Browse the repository at this point in the history
…lake query
  • Loading branch information
mayurinehate committed Nov 10, 2022
1 parent ae2ea52 commit bb4c67d
Showing 1 changed file with 15 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,24 @@ class SnowflakeUpstreamTable:
downstreamColumns: List[SnowflakeColumnWithLineage]

@classmethod
def from_dict(cls, dataset, upstreams_columns_dict, downstream_columns_dict):
def from_dict(cls, dataset, upstreams_columns_json, downstream_columns_json):
try:
upstreams_columns_list = []
downstream_columns_list = []
if upstreams_columns_json is not None:
upstreams_columns_list = json.loads(upstreams_columns_json)
if downstream_columns_json is not None:
downstream_columns_list = json.loads(downstream_columns_json)

table_with_upstreams = cls(
dataset,
[
SnowflakeColumnReference.parse_obj(col)
for col in upstreams_columns_dict
for col in upstreams_columns_list
],
[
SnowflakeColumnWithLineage.parse_obj(col)
for col in downstream_columns_dict
for col in downstream_columns_list
],
)
except ValidationError:
Expand Down Expand Up @@ -390,8 +397,8 @@ def _populate_lineage(self, conn: SnowflakeConnection) -> None:
# (<upstream_table_name>, <json_list_of_upstream_columns>, <json_list_of_downstream_columns>)
SnowflakeUpstreamTable.from_dict(
upstream_table_name,
json.loads(db_row["UPSTREAM_TABLE_COLUMNS"]),
json.loads(db_row["DOWNSTREAM_TABLE_COLUMNS"]),
db_row["UPSTREAM_TABLE_COLUMNS"],
db_row["DOWNSTREAM_TABLE_COLUMNS"],
),
)
num_edges += 1
Expand Down Expand Up @@ -441,7 +448,7 @@ def _populate_view_upstream_lineage(self, conn: SnowflakeConnection) -> None:
# key is the downstream view name
self._lineage_map[view_name].update_lineage(
# (<upstream_table_name>, <empty_json_list_of_upstream_table_columns>, <empty_json_list_of_downstream_view_columns>)
SnowflakeUpstreamTable.from_dict(view_upstream, [], [])
SnowflakeUpstreamTable.from_dict(view_upstream, None, None)
)
num_edges += 1
logger.debug(
Expand Down Expand Up @@ -499,8 +506,8 @@ def _populate_view_downstream_lineage(self, conn: SnowflakeConnection) -> None:
# (<upstream_view_name>, <json_list_of_upstream_view_columns>, <json_list_of_downstream_columns>)
SnowflakeUpstreamTable.from_dict(
view_name,
json.loads(db_row["VIEW_COLUMNS"]),
json.loads(db_row["DOWNSTREAM_TABLE_COLUMNS"]),
db_row["VIEW_COLUMNS"],
db_row["DOWNSTREAM_TABLE_COLUMNS"],
)
)
self.report.num_view_to_table_edges_scanned += 1
Expand Down

0 comments on commit bb4c67d

Please sign in to comment.