Skip to content

Commit

Permalink
Merge branch 'master' into feature/tableau-stateful-ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
amanda-her committed Oct 11, 2022
2 parents 3c1c2f7 + 0427122 commit 417be5a
Show file tree
Hide file tree
Showing 40 changed files with 843 additions and 4,899 deletions.
2 changes: 2 additions & 0 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ We recommend you use the lineage plugin if you are on Airflow version >= 2.0.2 o

|Name | Default value | Description |
|---|---|---|
| datahub.enabled | true | If the plugin should be enabled. |
| datahub.conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. |
| datahub.cluster | prod | name of the airflow cluster |
| datahub.capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
Expand Down Expand Up @@ -99,6 +100,7 @@ If you are looking to run Airflow and DataHub using docker locally, follow the g
[lineage]
backend = datahub_provider.lineage.datahub.DatahubLineageBackend
datahub_kwargs = {
"enabled": true,
"datahub_conn_id": "datahub_rest_default",
"cluster": "prod",
"capture_ownership_info": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
def get_lineage_config() -> DatahubLineageConfig:
"""Load the lineage config from airflow.cfg."""

enabled = conf.get("datahub", "enabled", fallback=True)
datahub_conn_id = conf.get("datahub", "conn_id", fallback="datahub_rest_default")
cluster = conf.get("datahub", "cluster", fallback="prod")
graceful_exceptions = conf.get("datahub", "graceful_exceptions", fallback=True)
Expand All @@ -27,6 +28,7 @@ def get_lineage_config() -> DatahubLineageConfig:
)
capture_executions = conf.get("datahub", "capture_executions", fallback=True)
return DatahubLineageConfig(
enabled=enabled,
datahub_conn_id=datahub_conn_id,
cluster=cluster,
graceful_exceptions=graceful_exceptions,
Expand Down
10 changes: 5 additions & 5 deletions metadata-ingestion/docs/sources/bigquery/bigquery-beta_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ Use `profiling.bigquery_temp_table_schema` to restrict to one specific dataset t

```yml
credential:
project_id: project-id-1234567
private_key_id: "d0121d0000882411234e11166c6aaa23ed5d74e0"
private_key: "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----\n"
client_email: "[email protected]"
client_id: "123456678890"
project_id: project-id-1234567
private_key_id: "d0121d0000882411234e11166c6aaa23ed5d74e0"
private_key: "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----\n"
client_email: "[email protected]"
client_id: "123456678890"
```
### Lineage Computation Details
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/junit.quick.xml

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
from abc import ABC, abstractmethod
from enum import auto
from typing import IO, Any, ClassVar, Dict, List, Optional, Pattern, cast
from typing import IO, Any, ClassVar, Dict, List, Optional, Pattern, Type, cast

from cached_property import cached_property
from pydantic import BaseModel, Extra
Expand All @@ -18,6 +18,18 @@ class Config:
cached_property,
) # needed to allow cached_property to work. See https://github.com/samuelcolvin/pydantic/issues/1241 for more info.

@staticmethod
def schema_extra(schema: Dict[str, Any], model: Type["ConfigModel"]) -> None:
# We use the custom "hidden_from_schema" attribute to hide fields from the
# autogenerated docs.
remove_fields = []
for key, prop in schema.get("properties", {}).items():
if prop.get("hidden_from_schema"):
remove_fields.append(key)

for key in remove_fields:
del schema["properties"][key]


class PermissiveConfigModel(ConfigModel):
# A permissive config model that allows extra fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,9 @@ def _process_project(
)
return

self.report.num_project_datasets_to_scan[project_id] = len(
bigquery_project.datasets
)
for bigquery_dataset in bigquery_project.datasets:

if not self.config.dataset_pattern.allowed(bigquery_dataset.name):
Expand Down Expand Up @@ -619,7 +622,9 @@ def _process_table(
self.report.report_dropped(table_identifier.raw_table_name())
return

table.columns = self.get_columns_for_table(conn, table_identifier)
table.columns = self.get_columns_for_table(
conn, table_identifier, self.config.column_limit
)
if not table.columns:
logger.warning(f"Unable to get columns for table: {table_identifier}")

Expand Down Expand Up @@ -653,7 +658,9 @@ def _process_view(
self.report.report_dropped(table_identifier.raw_table_name())
return

view.columns = self.get_columns_for_table(conn, table_identifier)
view.columns = self.get_columns_for_table(
conn, table_identifier, column_limit=self.config.column_limit
)

lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None
if self.config.include_table_lineage:
Expand Down Expand Up @@ -877,8 +884,8 @@ def gen_schema_fields(self, columns: List[BigqueryColumn]) -> List[SchemaField]:
_COMPLEX_TYPE = re.compile("^(struct|array)")
last_id = -1
for col in columns:

if _COMPLEX_TYPE.match(col.data_type.lower()):
# if col.data_type is empty that means this column is part of a complex type
if col.data_type is None or _COMPLEX_TYPE.match(col.data_type.lower()):
# If the we have seen the ordinal position that most probably means we already processed this complex type
if last_id != col.ordinal_position:
schema_fields.extend(
Expand Down Expand Up @@ -1099,7 +1106,10 @@ def get_views_for_dataset(
return views.get(dataset_name, [])

def get_columns_for_table(
self, conn: bigquery.Client, table_identifier: BigqueryTableIdentifier
self,
conn: bigquery.Client,
table_identifier: BigqueryTableIdentifier,
column_limit: Optional[int] = None,
) -> List[BigqueryColumn]:

if (
Expand All @@ -1110,6 +1120,7 @@ def get_columns_for_table(
conn,
project_id=table_identifier.project_id,
dataset_name=table_identifier.dataset,
column_limit=column_limit,
)
self.schema_columns[
(table_identifier.project_id, table_identifier.dataset)
Expand All @@ -1125,7 +1136,9 @@ def get_columns_for_table(
logger.warning(
f"Couldn't get columns on the dataset level for {table_identifier}. Trying to get on table level..."
)
return BigQueryDataDictionary.get_columns_for_table(conn, table_identifier)
return BigQueryDataDictionary.get_columns_for_table(
conn, table_identifier, self.config.column_limit
)

# Access to table but none of its columns - is this possible ?
return columns.get(table_identifier.table, [])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def get_table_display_name(self) -> str:
]
if invalid_chars_in_table_name:
raise ValueError(
f"Cannot handle {self} - poorly formatted table name, contains {invalid_chars_in_table_name}"
f"Cannot handle {self.raw_table_name()} - poorly formatted table name, contains {invalid_chars_in_table_name}"
)
return table_name

Expand Down Expand Up @@ -207,6 +207,7 @@ class QueryEvent:
actor_email: str
query: str
statementType: str
project_id: str

job_name: Optional[str] = None
destinationTable: Optional[BigQueryTableRef] = None
Expand Down Expand Up @@ -238,6 +239,15 @@ def _job_name_ref(project: str, jobId: str) -> Optional[str]:
return f"projects/{project}/jobs/{jobId}"
return None

@staticmethod
def _get_project_id_from_job_name(job_name: str) -> str:
project_id_pattern = r"projects\/(.*)\/jobs\/.*"
matches = re.match(project_id_pattern, job_name, re.MULTILINE)
if matches:
return matches.group(1)
else:
raise ValueError(f"Unable to get project_id from jobname: {job_name}")

@classmethod
def from_entry(
cls, entry: AuditLogEntry, debug_include_full_payloads: bool = False
Expand All @@ -253,6 +263,7 @@ def from_entry(
job.get("jobName", {}).get("projectId"),
job.get("jobName", {}).get("jobId"),
),
project_id=job.get("jobName", {}).get("projectId"),
default_dataset=job_query_conf["defaultDataset"]
if job_query_conf["defaultDataset"]
else None,
Expand Down Expand Up @@ -331,6 +342,7 @@ def from_exported_bigquery_audit_metadata(
actor_email=payload["authenticationInfo"]["principalEmail"],
query=query_config["query"],
job_name=job["jobName"],
project_id=QueryEvent._get_project_id_from_job_name(job["jobName"]),
default_dataset=query_config["defaultDataset"]
if query_config.get("defaultDataset")
else None,
Expand Down Expand Up @@ -392,6 +404,7 @@ def from_entry_v2(
# basic query_event
query_event = QueryEvent(
job_name=job["jobName"],
project_id=QueryEvent._get_project_id_from_job_name(job["jobName"]),
timestamp=row.timestamp,
actor_email=payload["authenticationInfo"]["principalEmail"],
query=query_config["query"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class BigQueryV2Config(BigQueryConfig):
usage: BigQueryUsageConfig = Field(
default=BigQueryUsageConfig(), description="Usage related configs"
)

include_usage_statistics: bool = Field(
default=True,
description="Generate usage statistic",
Expand All @@ -56,13 +57,21 @@ class BigQueryV2Config(BigQueryConfig):
default=50,
description="Number of table queried in batch when getting metadata. This is a low leve config propert which should be touched with care. This restriction needed because we query partitions system view which throws error if we try to touch too many tables.",
)

column_limit: int = Field(
default=1000,
description="Maximum number of columns to process in a table",
)
# The inheritance hierarchy is wonky here, but these options need modifications.
project_id: Optional[str] = Field(
default=None,
description="[deprecated] Use project_id_pattern instead.",
)
storage_project_id: None = Field(default=None, exclude=True)
storage_project_id: None = Field(default=None, hidden_from_schema=True)

lineage_use_sql_parser: bool = Field(
default=False,
description="Experimental. Use sql parser to resolve view/table lineage. If there is a view being referenced then bigquery sends both the view as well as underlying tablein the references. There is no distinction between direct/base objects accessed. So doing sql parsing to ensure we only use direct objects accessed for lineage.",
)

@root_validator(pre=False)
def profile_default_settings(cls, values: Dict) -> Dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,40 @@

from datahub.ingestion.source.sql.sql_common import SQLSourceReport
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.stats_collections import TopKDict


@dataclass
class BigQueryV2Report(SQLSourceReport):
num_total_lineage_entries: Optional[int] = None
num_skipped_lineage_entries_missing_data: Optional[int] = None
num_skipped_lineage_entries_not_allowed: Optional[int] = None
num_lineage_entries_sql_parser_failure: Optional[int] = None
num_skipped_lineage_entries_other: Optional[int] = None
num_total_log_entries: Optional[int] = None
num_parsed_log_entires: Optional[int] = None
num_total_audit_entries: Optional[int] = None
num_parsed_audit_entires: Optional[int] = None
num_total_lineage_entries: TopKDict[str, int] = field(default_factory=TopKDict)
num_skipped_lineage_entries_missing_data: TopKDict[str, int] = field(
default_factory=TopKDict
)
num_skipped_lineage_entries_not_allowed: TopKDict[str, int] = field(
default_factory=TopKDict
)
num_lineage_entries_sql_parser_failure: TopKDict[str, int] = field(
default_factory=TopKDict
)
num_lineage_entries_sql_parser_success: TopKDict[str, int] = field(
default_factory=TopKDict
)
num_skipped_lineage_entries_other: TopKDict[str, int] = field(
default_factory=TopKDict
)
num_total_log_entries: TopKDict[str, int] = field(default_factory=TopKDict)
num_parsed_log_entries: TopKDict[str, int] = field(default_factory=TopKDict)
num_total_audit_entries: TopKDict[str, int] = field(default_factory=TopKDict)
num_parsed_audit_entries: TopKDict[str, int] = field(default_factory=TopKDict)
bigquery_audit_metadata_datasets_missing: Optional[bool] = None
lineage_failed_extraction: LossyList[str] = field(default_factory=LossyList)
lineage_metadata_entries: Optional[int] = None
lineage_mem_size: Optional[str] = None
lineage_extraction_sec: Dict[str, float] = field(default_factory=dict)
usage_extraction_sec: Dict[str, float] = field(default_factory=dict)
lineage_metadata_entries: TopKDict[str, int] = field(default_factory=TopKDict)
lineage_mem_size: Dict[str, str] = field(default_factory=TopKDict)
lineage_extraction_sec: Dict[str, float] = field(default_factory=TopKDict)
usage_extraction_sec: Dict[str, float] = field(default_factory=TopKDict)
usage_failed_extraction: LossyList[str] = field(default_factory=LossyList)
metadata_extraction_sec: Dict[str, float] = field(default_factory=dict)
num_project_datasets_to_scan: Dict[str, int] = field(default_factory=TopKDict)
metadata_extraction_sec: Dict[str, float] = field(default_factory=TopKDict)
include_table_lineage: Optional[bool] = None
use_date_sharded_audit_log_tables: Optional[bool] = None
log_page_size: Optional[pydantic.PositiveInt] = None
Expand All @@ -40,10 +53,10 @@ class BigQueryV2Report(SQLSourceReport):
audit_start_time: Optional[str] = None
audit_end_time: Optional[str] = None
upstream_lineage: LossyDict = field(default_factory=LossyDict)
partition_info: Dict[str, str] = field(default_factory=dict)
profile_table_selection_criteria: Dict[str, str] = field(default_factory=dict)
selected_profile_tables: Dict[str, List[str]] = field(default_factory=dict)
invalid_partition_ids: Dict[str, str] = field(default_factory=dict)
partition_info: Dict[str, str] = field(default_factory=TopKDict)
profile_table_selection_criteria: Dict[str, str] = field(default_factory=TopKDict)
selected_profile_tables: Dict[str, List[str]] = field(default_factory=TopKDict)
invalid_partition_ids: Dict[str, str] = field(default_factory=TopKDict)
allow_pattern: Optional[str] = None
deny_pattern: Optional[str] = None
num_usage_workunits_emitted: Optional[int] = None
Expand Down
Loading

0 comments on commit 417be5a

Please sign in to comment.