Skip to content

Commit

Permalink
feat(ingest): databricks - Unity catalog source (#6292)
Browse files Browse the repository at this point in the history
Co-authored-by: MohdSiddique Bagwan <[email protected]>
Co-authored-by: Shirshanka Das <[email protected]>
  • Loading branch information
3 people authored Oct 30, 2022
1 parent 3eae087 commit 0dc2d6a
Show file tree
Hide file tree
Showing 17 changed files with 2,947 additions and 2 deletions.
Binary file added datahub-web-react/src/images/databrickslogo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
22 changes: 22 additions & 0 deletions metadata-ingestion/docs/sources/databricks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
DataHub supports integration with Databricks ecosystem using a multitude of connectors, depending on your exact setup.

## Databricks Hive

The simplest way to integrate is usually via the Hive connector. The [Hive starter recipe](http://datahubproject.io/docs/generated/ingestion/sources/hive#starter-recipe) has a section describing how to connect to your Databricks workspace.

## Databricks Unity Catalog (new)

The recently introduced [Unity Catalog](https://www.databricks.com/product/unity-catalog) provides a new way to govern your assets within the Databricks lakehouse. If you have enabled Unity Catalog, you can use the `unity-catalog` source (see below) to integrate your metadata into DataHub as an alternate to the Hive pathway.

## Databricks Spark

To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions [here](../../../../metadata-integration/java/spark-lineage/README.md#configuration-instructions-databricks).

## Watch the DataHub Talk at the Data and AI Summit 2022

For a deeper look at how to think about DataHub within and across your Databricks ecosystem, watch the recording of our talk at the Data and AI Summit 2022.

[![IMAGE_ALT](../../images/databricks/data_and_ai_summit_2022.png)](https://www.youtube.com/watch?v=SCP0PR3t7dc)



11 changes: 11 additions & 0 deletions metadata-ingestion/docs/sources/databricks/unity-catalog_post.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#### Troubleshooting

##### No data lineage captured or missing lineage

Check that you meet the [Unity Catalog lineage requirements](https://docs.databricks.com/data-governance/unity-catalog/data-lineage.html#requirements).

Also check the [Unity Catalog limitations](https://docs.databricks.com/data-governance/unity-catalog/data-lineage.html#limitations) to make sure that lineage would be expected to exist in this case.

##### Lineage extraction is too slow

Currently, there is no way to get table or column lineage in bulk from the Databricks Unity Catalog REST api. Table lineage calls require one API call per table, and column lineage calls require one API call per column. If you find metadata extraction taking too long, you can turn off column level lineage extraction via the `include_column_lineage` config flag.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Prerequisities
- Generate a Databrick Personal Access token following the guide here: https://docs.databricks.com/dev-tools/api/latest/authentication.html#generate-a-personal-access-token
- Get your workspace Id where Unity Catalog is following: https://docs.databricks.com/workspace/workspace-details.html#workspace-instance-names-urls-and-ids
- Check the starter recipe below and replace Token and Workspace Id with the ones above.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
source:
type: unity-catalog
config:
workspace_url: https://my-workspace.cloud.databricks.com
token: "mygenerated_databricks_token"
#metastore_id_pattern:
# deny:
# - 11111-2222-33333-44-555555
#catalog_pattern:
# allow:
# - my-catalog
#schema_pattern:
# deny:
# - information_schema
#table_pattern:
# allow:
# - test.lineagedemo.dinner
# First you have to create domains on Datahub by following this guide -> https://datahubproject.io/docs/domains/#domains-setup-prerequisites-and-permissions
#domain:
# urn:li:domain:1111-222-333-444-555:
# allow:
# - main.*

stateful_ingestion:
enabled: true

pipeline_name: acme-corp-unity


# sink configs if needed
8 changes: 7 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ def get_long_description():
"sqlparse",
}

databricks_cli = {
"databricks-cli==0.17.3",
}

# Note: for all of these, framework_common will be added.
plugins: Dict[str, Set[str]] = {
Expand Down Expand Up @@ -322,6 +325,7 @@ def get_long_description():
"nifi": {"requests", "packaging"},
"powerbi": microsoft_common,
"vertica": sql_common | {"sqlalchemy-vertica[vertica-python]==0.0.5"},
"unity-catalog": databricks_cli | {"requests"},
}

all_exclude_plugins: Set[str] = {
Expand Down Expand Up @@ -411,7 +415,8 @@ def get_long_description():
"starburst-trino-usage",
"powerbi",
"vertica",
"salesforce"
"salesforce",
"unity-catalog"
# airflow is added below
]
for dependency in plugins[plugin]
Expand Down Expand Up @@ -534,6 +539,7 @@ def get_long_description():
"presto-on-hive = datahub.ingestion.source.sql.presto_on_hive:PrestoOnHiveSource",
"pulsar = datahub.ingestion.source.pulsar:PulsarSource",
"salesforce = datahub.ingestion.source.salesforce:SalesforceSource",
"unity-catalog = datahub.ingestion.source.unity.source:UnityCatalogSource",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
Expand Down
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ class ProjectIdKey(PlatformKey):
project_id: str


class MetastoreKey(PlatformKey):
metastore: str


class CatalogKey(MetastoreKey):
catalog: str


class UnitySchemaKey(CatalogKey):
unity_schema: str


class BigQueryDatasetKey(ProjectIdKey):
dataset_id: str

Expand Down
Empty file.
69 changes: 69 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/unity/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import Dict, List, Optional

import pydantic
from pydantic import Field

from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)


class UnityCatalogStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the UnityCatalogConfig.
"""

_entity_types: List[str] = Field(default=["dataset", "container"])


class UnityCatalogSourceConfig(StatefulIngestionConfigBase):
token: str = pydantic.Field(description="Databricks personal access token")
workspace_url: str = pydantic.Field(description="Databricks workspace url")
workspace_name: str = pydantic.Field(
default=None,
description="Name of the workspace. Default to deployment name present in workspace_url",
)

metastore_id_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for metastore id to filter in ingestion.",
)

catalog_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for catalogs to filter in ingestion. Specify regex to match the catalog name",
)

schema_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for schemas to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'",
)

table_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in catalog.schema.table format. e.g. to match all tables starting with customer in Customer catalog and public schema, use the regex 'Customer.public.customer.*'",
)
domain: Dict[str, AllowDenyPattern] = Field(
default=dict(),
description='Attach domains to catalogs, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like "Marketing".) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified.',
)

include_table_lineage: Optional[bool] = pydantic.Field(
default=True,
description="Option to enable/disable lineage generation.",
)

include_column_lineage: Optional[bool] = pydantic.Field(
default=True,
description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ",
)

stateful_ingestion: Optional[UnityCatalogStatefulIngestionConfig] = pydantic.Field(
default=None, description="Unity Catalog Stateful Ingestion Config."
)
Loading

0 comments on commit 0dc2d6a

Please sign in to comment.