-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): bigquery - external url support and a small profiling filter fix #6714
Changes from all commits
56ed27d
a4a9eea
5accc11
a4bc985
082db4c
0bc2b25
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
from google.cloud import bigquery | ||
from google.cloud.bigquery.table import TableListItem | ||
|
||
from datahub.configuration.pattern_utils import is_schema_allowed | ||
from datahub.emitter.mce_builder import ( | ||
make_container_urn, | ||
make_data_platform_urn, | ||
|
@@ -54,7 +55,11 @@ | |
BigqueryTable, | ||
BigqueryView, | ||
) | ||
from datahub.ingestion.source.bigquery_v2.common import get_bigquery_client | ||
from datahub.ingestion.source.bigquery_v2.common import ( | ||
BQ_EXTERNAL_DATASET_URL_TEMPLATE, | ||
BQ_EXTERNAL_TABLE_URL_TEMPLATE, | ||
get_bigquery_client, | ||
) | ||
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor | ||
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler | ||
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor | ||
|
@@ -459,6 +464,11 @@ def gen_dataset_containers( | |
dataset, | ||
["Dataset"], | ||
database_container_key, | ||
external_url=BQ_EXTERNAL_DATASET_URL_TEMPLATE.format( | ||
project=project_id, dataset=dataset | ||
) | ||
if self.config.include_external_url | ||
else None, | ||
) | ||
|
||
self.stale_entity_removal_handler.add_entity_to_state( | ||
|
@@ -570,8 +580,12 @@ def _process_project( | |
bigquery_project.datasets | ||
) | ||
for bigquery_dataset in bigquery_project.datasets: | ||
|
||
if not self.config.dataset_pattern.allowed(bigquery_dataset.name): | ||
if not is_schema_allowed( | ||
self.config.dataset_pattern, | ||
bigquery_dataset.name, | ||
project_id, | ||
self.config.match_fully_qualified_names, | ||
): | ||
self.report.report_dropped(f"{bigquery_dataset.name}.*") | ||
continue | ||
try: | ||
|
@@ -854,6 +868,13 @@ def gen_dataset_workunits( | |
else None, | ||
lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000)) | ||
if table.last_altered is not None | ||
else TimeStamp(time=int(table.created.timestamp() * 1000)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! |
||
if table.created is not None | ||
else None, | ||
externalUrl=BQ_EXTERNAL_TABLE_URL_TEMPLATE.format( | ||
jjoyce0510 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
project=project_id, dataset=dataset_name, table=table.name | ||
) | ||
if self.config.include_external_url | ||
else None, | ||
) | ||
if custom_properties: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,16 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig): | |
description="Regex patterns for dataset to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex 'analytics'", | ||
) | ||
|
||
match_fully_qualified_names: bool = Field( | ||
default=False, | ||
description="Whether `dataset_pattern` is matched against fully qualified dataset name `<project_id>.<dataset_name>`.", | ||
) | ||
|
||
include_external_url: bool = Field( | ||
default=True, | ||
description="Whether to populate BigQuery Console url to Datasets/Tables", | ||
) | ||
|
||
debug_include_full_payloads: bool = Field( | ||
default=False, | ||
description="Include full payload into events. It is only for debugging and internal use.", | ||
|
@@ -128,6 +138,20 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict: | |
logging.warning( | ||
"schema_pattern will be ignored in favour of dataset_pattern. schema_pattern will be deprecated, please use dataset_pattern only." | ||
) | ||
|
||
match_fully_qualified_names = values.get("match_fully_qualified_names") | ||
|
||
if ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. amazing! |
||
dataset_pattern is not None | ||
and dataset_pattern != AllowDenyPattern.allow_all() | ||
and match_fully_qualified_names is not None | ||
and not match_fully_qualified_names | ||
): | ||
logger.warning( | ||
"Please update `dataset_pattern` to match against fully qualified schema name `<project_id>.<dataset_name>` and set config `match_fully_qualified_names : True`." | ||
"Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. " | ||
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`." | ||
) | ||
return values | ||
|
||
def get_table_pattern(self, pattern: List[str]) -> str: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,9 @@ | |
BQ_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" | ||
BQ_DATE_SHARD_FORMAT = "%Y%m%d" | ||
|
||
BQ_EXTERNAL_TABLE_URL_TEMPLATE = "https://console.cloud.google.com/bigquery?project={project}&ws=!1m5!1m4!4m3!1s{project}!2s{dataset}!3s{table}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is great! thanks for extracting into a nice place. |
||
BQ_EXTERNAL_DATASET_URL_TEMPLATE = "https://console.cloud.google.com/bigquery?project={project}&ws=!1m4!1m3!3m2!1s{project}!2s{dataset}" | ||
|
||
|
||
def _make_gcp_logging_client( | ||
project_id: Optional[str] = None, extra_client_options: Dict[str, Any] = {} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!