Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sc-29773] Handle tables without columns #1040

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions metaphor/unity_catalog/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime
from typing import Dict, List, Literal, Optional

from databricks.sql.types import Row
from pydantic import BaseModel


Expand All @@ -21,6 +22,10 @@ class Tag(BaseModel):
key: str
value: str

@classmethod
def from_row_tags(cls, tags: Optional[List[Dict]]) -> List["Tag"]:
return [Tag(key=tag["tag_name"], value=tag["tag_value"]) for tag in tags or []]


class CatalogInfo(BaseModel):
catalog_name: str
Expand All @@ -45,6 +50,20 @@ class ColumnInfo(BaseModel):
comment: Optional[str] = None
tags: List[Tag]

@classmethod
def from_row(cls, row: Row) -> List["ColumnInfo"]:
return [
ColumnInfo(
column_name=column["column_name"],
data_type=column["data_type"],
data_precision=column["data_precision"],
is_nullable=column["is_nullable"] == "YES",
comment=column["comment"],
tags=Tag.from_row_tags(column["tags"]),
)
for column in (row["columns"] if row["columns"] is not None else [])
]


class TableInfo(BaseModel):
catalog_name: str
Expand All @@ -63,6 +82,25 @@ class TableInfo(BaseModel):
tags: List[Tag] = []
columns: List[ColumnInfo] = []

@classmethod
def from_row(cls, row: Row) -> "TableInfo":
return TableInfo(
catalog_name=row["catalog_name"],
schema_name=row["schema_name"],
table_name=row["table_name"],
type=row["table_type"],
owner=row["owner"],
comment=row["table_comment"],
created_at=row["created_at"],
created_by=row["created_by"],
updated_at=row["updated_at"],
updated_by=row["updated_by"],
data_source_format=row["data_source_format"],
view_definition=row["view_definition"],
storage_location=row["storage_path"],
columns=ColumnInfo.from_row(row),
)


class VolumeInfo(BaseModel):
catalog_name: str
Expand Down
42 changes: 4 additions & 38 deletions metaphor/unity_catalog/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from metaphor.unity_catalog.models import (
CatalogInfo,
Column,
ColumnInfo,
ColumnLineage,
SchemaInfo,
TableInfo,
Expand Down Expand Up @@ -37,10 +36,6 @@
"""These are the operations that do not modify actual data."""


def to_tags(tags: Optional[List[Dict]]) -> List[Tag]:
return [Tag(key=tag["tag_name"], value=tag["tag_value"]) for tag in tags or []]


def list_catalogs(connection: Connection) -> List[CatalogInfo]:
"""
Fetch catalogs from system.access.information_schema
Expand Down Expand Up @@ -93,7 +88,7 @@ def list_catalogs(connection: Connection) -> List[CatalogInfo]:
catalog_name=row["catalog_name"],
owner=row["catalog_owner"],
comment=row["comment"],
tags=to_tags(row["tags"]),
tags=Tag.from_row_tags(row["tags"]),
)
)

Expand Down Expand Up @@ -158,7 +153,7 @@ def list_schemas(connection: Connection, catalog: str) -> List[SchemaInfo]:
schema_name=row["schema_name"],
owner=row["schema_owner"],
comment=row["comment"],
tags=to_tags(row["tags"]),
tags=Tag.from_row_tags(row["tags"]),
)
)

Expand Down Expand Up @@ -345,36 +340,7 @@ def list_tables(connection: Connection, catalog: str, schema: str) -> List[Table
return []

for row in cursor.fetchall():
columns = [
ColumnInfo(
column_name=column["column_name"],
data_type=column["data_type"],
data_precision=column["data_precision"],
is_nullable=column["is_nullable"] == "YES",
comment=column["comment"],
tags=to_tags(column["tags"]),
)
for column in row["columns"]
]

tables.append(
TableInfo(
catalog_name=row["catalog_name"],
schema_name=row["schema_name"],
table_name=row["table_name"],
type=row["table_type"],
owner=row["owner"],
comment=row["table_comment"],
created_at=row["created_at"],
created_by=row["created_by"],
updated_at=row["updated_at"],
updated_by=row["updated_by"],
data_source_format=row["data_source_format"],
view_definition=row["view_definition"],
storage_location=row["storage_path"],
columns=columns,
),
)
tables.append(TableInfo.from_row(row))

logger.info(f"Found {len(tables)} tables from {catalog}")
json_dump_to_debug_file(tables, f"list_tables_{catalog}_{schema}.json")
Expand Down Expand Up @@ -463,7 +429,7 @@ def list_volumes(connection: Connection, catalog: str, schema: str) -> List[Volu
updated_at=row["last_altered"],
updated_by=row["last_altered_by"],
storage_location=row["storage_location"],
tags=to_tags(row["tags"]),
tags=Tag.from_row_tags(row["tags"]),
)
)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.152"
version = "0.14.153"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
119 changes: 119 additions & 0 deletions tests/unity_catalog/test_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from datetime import datetime

from databricks.sql.types import Row

from metaphor.unity_catalog.models import TableInfo


class TestModel:
row = Row(
catalog_name="catalog",
schema_name="schema",
table_name="table",
table_type="TABLE",
owner="[email protected]",
table_comment="this is a comment",
created_at=datetime(2000, 1, 1, 0, 0, 0, 0),
created_by="[email protected]",
updated_at=datetime(2000, 1, 1, 0, 0, 0, 0),
updated_by="[email protected]",
data_source_format="sql",
view_definition="SELECT * FROM catalog.schema.source",
storage_path="s3://bucket/foo.csv",
tags=None,
columns=None,
)

def test_table_info_convert_none(self) -> None:
table_info = TableInfo.from_row(self.row)
assert table_info.model_dump() == {
"catalog_name": "catalog",
"schema_name": "schema",
"table_name": "table",
"type": "TABLE",
"owner": "[email protected]",
"comment": "this is a comment",
"created_at": datetime.fromisoformat("2000-01-01T00:00:00"),
"created_by": "[email protected]",
"updated_at": datetime.fromisoformat("2000-01-01T00:00:00"),
"updated_by": "[email protected]",
"view_definition": "SELECT * FROM catalog.schema.source",
"storage_location": "s3://bucket/foo.csv",
"data_source_format": "sql",
"tags": [],
"columns": [],
}

def test_table_info_with_column(self) -> None:
columns = [
{
"column_name": "col1",
"data_type": "TEXT",
"data_precision": None,
"is_nullable": "NO",
"comment": None,
"tags": [
{
"tag_name": "tag1",
"tag_value": "value1",
},
{
"tag_name": "tag2",
"tag_value": "value2",
},
],
},
{
"column_name": "col2",
"data_type": "INT",
"data_precision": 2,
"is_nullable": "YES",
"comment": "This is a nullable int field",
"tags": [
{
"tag_name": "tag1",
"tag_value": "value1",
},
],
},
]
row = Row(**{**self.row.asDict(), "columns": columns})
table_info = TableInfo.from_row(row)

assert table_info.model_dump() == {
"catalog_name": "catalog",
"schema_name": "schema",
"table_name": "table",
"type": "TABLE",
"owner": "[email protected]",
"comment": "this is a comment",
"created_at": datetime.fromisoformat("2000-01-01T00:00:00"),
"created_by": "[email protected]",
"updated_at": datetime.fromisoformat("2000-01-01T00:00:00"),
"updated_by": "[email protected]",
"view_definition": "SELECT * FROM catalog.schema.source",
"storage_location": "s3://bucket/foo.csv",
"data_source_format": "sql",
"tags": [],
"columns": [
{
"column_name": "col1",
"data_type": "TEXT",
"data_precision": None,
"is_nullable": False,
"comment": None,
"tags": [
{"key": "tag1", "value": "value1"},
{"key": "tag2", "value": "value2"},
],
},
{
"column_name": "col2",
"data_type": "INT",
"data_precision": 2,
"is_nullable": True,
"comment": "This is a nullable int field",
"tags": [{"key": "tag1", "value": "value1"}],
},
],
}
Loading