Skip to content

Commit

Permalink
feat(ingest): hive-on-presto - Add option to properly filter hive dat…
Browse files Browse the repository at this point in the history
…abase (#6247)
  • Loading branch information
treff7es authored Oct 20, 2022
1 parent 7a6ca47 commit 6757076
Showing 1 changed file with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sqlalchemy import create_engine, text
from sqlalchemy.engine.reflection import Inspector

from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mce_builder import (
make_container_urn,
make_dataset_urn_with_platform_instance,
Expand Down Expand Up @@ -104,6 +105,12 @@ class PrestoOnHiveConfig(BasicSQLAlchemyConfig):
scheme: str = Field(
default="mysql+pymysql", description="", hidden_from_schema=True
)

database_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for hive/presto database to filter in ingestion. Specify regex to only match the database name. e.g. to match all tables in database analytics, use the regex 'analytics'",
)

metastore_db_name: Optional[str] = Field(
default=None,
description="Name of the Hive metastore's database (usually: metastore). For backward compatibility, if this field is not provided, the database field will be used. If both the 'database' and 'metastore_db_name' fields are set then the 'database' field will be used to filter the hive/presto/trino database",
Expand Down Expand Up @@ -350,6 +357,9 @@ def gen_schema_containers(
iter_res = self._alchemy_client.execute_query(statement)
for row in iter_res:
schema = row["schema"]
if not self.config.database_pattern.allowed(schema):
continue

schema_container_key: PlatformKey = self.gen_schema_key(db_name, schema)
logger.debug("schema_container_key = {} ".format(schema_container_key))
database_container_key = self.gen_database_key(database=db_name)
Expand Down Expand Up @@ -425,6 +435,10 @@ def loop_tables(

self.report.report_entity_scanned(dataset_name, ent_type="table")

if not self.config.database_pattern.allowed(key.schema):
self.report.report_dropped(f"{dataset_name}")
continue

if not sql_config.table_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue
Expand Down Expand Up @@ -535,14 +549,21 @@ def get_hive_view_columns(self, inspector: Inspector) -> Iterable[ViewDataset]:
iter_res = self._alchemy_client.execute_query(statement)
for key, group in groupby(iter_res, self._get_table_key):
db_name = self.get_db_name(inspector)

schema_name = (
f"{db_name}.{key.schema}"
if self.config.include_catalog_name_in_ids
else key.schema
)

dataset_name = self.get_identifier(
schema=schema_name, entity=key.table, inspector=inspector
)

if not self.config.database_pattern.allowed(key.schema):
self.report.report_dropped(f"{dataset_name}")
continue

columns = list(group)

if len(columns) == 0:
Expand Down

0 comments on commit 6757076

Please sign in to comment.