Skip to content

Commit

Permalink
feat(ingest): add lineage_client_project_id field to the BigQuery con…
Browse files Browse the repository at this point in the history
…fig (#4138)

* feat(ingest): add lineage_client_project_id field to the bigquery config

* fix linting issues

* add type annotation for arguments
  • Loading branch information
vcs9 authored Feb 28, 2022
1 parent 88d1c96 commit 93ff095
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ source:
# - "schema.table.column"
# deny:
# - "*.*.*"
#lineage_client_project_id: project-id-1234567

## see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/source_docs/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ As a SQL-based service, the Athena integration is also supported by our SQL prof
| `domain.domain_key.allow` | | | List of regex patterns for tables/schemas to set domain_key domain key (domain_key can be any string like `sales`. There can be multiple domain key specified. |
| `domain.domain_key.deny` | | | List of regex patterns for tables/schemas to not assign domain_key. There can be multiple domain key specified. |
| `domain.domain_key.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. |
| `lineage_client_project_id` | | None | The project to use when creating the BigQuery Client. If left empty, the required `project_id` will be used. |


The following parameters are only relevant if include_table_lineage is set to true:
Expand Down
43 changes: 33 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def create_credential_temp_file(credential: BigQueryCredential) -> str:
class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig):
scheme: str = "bigquery"
project_id: Optional[str] = None
lineage_client_project_id: Optional[str] = None

log_page_size: Optional[pydantic.PositiveInt] = 1000
credential: Optional[BigQueryCredential]
Expand Down Expand Up @@ -304,20 +305,29 @@ def get_db_name(self, inspector: Inspector = None) -> str:

def _compute_big_query_lineage(self) -> None:
if self.config.include_table_lineage:
lineage_client_project_id = self._get_lineage_client_project_id()
if self.config.use_exported_bigquery_audit_metadata:
self._compute_bigquery_lineage_via_exported_bigquery_audit_metadata()
self._compute_bigquery_lineage_via_exported_bigquery_audit_metadata(
lineage_client_project_id
)
else:
self._compute_bigquery_lineage_via_gcp_logging()
self._compute_bigquery_lineage_via_gcp_logging(
lineage_client_project_id
)

if self.lineage_metadata is not None:
logger.info(
f"Built lineage map containing {len(self.lineage_metadata)} entries."
)

def _compute_bigquery_lineage_via_gcp_logging(self) -> None:
def _compute_bigquery_lineage_via_gcp_logging(
self, lineage_client_project_id: Optional[str]
) -> None:
logger.info("Populating lineage info via GCP audit logs")
try:
_clients: List[GCPLoggingClient] = self._make_bigquery_client()
_clients: List[GCPLoggingClient] = self._make_bigquery_client(
lineage_client_project_id
)
log_entries: Iterable[AuditLogEntry] = self._get_bigquery_log_entries(
_clients
)
Expand All @@ -331,10 +341,12 @@ def _compute_bigquery_lineage_via_gcp_logging(self) -> None:
e,
)

def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(self) -> None:
def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(
self, lineage_client_project_id: Optional[str]
) -> None:
logger.info("Populating lineage info via exported GCP audit logs")
try:
_client: BigQueryClient = BigQueryClient(project=self.config.project_id)
_client: BigQueryClient = BigQueryClient(project=lineage_client_project_id)
exported_bigquery_audit_metadata: Iterable[
BigQueryAuditMetadata
] = self._get_exported_bigquery_audit_metadata(_client)
Expand All @@ -350,17 +362,28 @@ def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(self) -> None
e,
)

def _make_bigquery_client(self) -> List[GCPLoggingClient]:
def _make_bigquery_client(
self, lineage_client_project_id: Optional[str]
) -> List[GCPLoggingClient]:
# See https://github.com/googleapis/google-cloud-python/issues/2674 for
# why we disable gRPC here.
client_options = self.config.extra_client_options.copy()
client_options["_use_grpc"] = False
project_id = self.config.project_id
if project_id is not None:
return [GCPLoggingClient(**client_options, project=project_id)]
if lineage_client_project_id is not None:
return [
GCPLoggingClient(**client_options, project=lineage_client_project_id)
]
else:
return [GCPLoggingClient(**client_options)]

def _get_lineage_client_project_id(self) -> Optional[str]:
project_id: Optional[str] = (
self.config.lineage_client_project_id
if self.config.lineage_client_project_id
else self.config.project_id
)
return project_id

def _get_bigquery_log_entries(
self, clients: List[GCPLoggingClient]
) -> Iterable[AuditLogEntry]:
Expand Down

0 comments on commit 93ff095

Please sign in to comment.