Skip to content

Commit

Permalink
deprecate(ingest): bigquery - Removing bigquery-legacy source (datahu…
Browse files Browse the repository at this point in the history
…b-project#6851)

Co-authored-by: John Joyce <[email protected]>
  • Loading branch information
2 people authored and cccs-Dustin committed Feb 1, 2023
1 parent 26a463f commit 8938c98
Show file tree
Hide file tree
Showing 22 changed files with 176 additions and 20,965 deletions.
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
### Potential Downtime

### Deprecations
#6851 - Sources bigquery-legacy and bigquery-usage-legacy have been removed.

### Other notable Changes

Expand Down
117 changes: 0 additions & 117 deletions metadata-ingestion/docs/sources/bigquery/bigquery-legacy_pre.md

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

8 changes: 0 additions & 8 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,6 @@ def get_long_description():
# PyAthena is pinned with exact version because we use private method in PyAthena
"athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"},
"azure-ad": set(),
"bigquery-legacy": sql_common
| bigquery_common
| {"sqlalchemy-bigquery>=1.4.1", "sqllineage==1.3.6", "sqlparse"},
"bigquery-usage-legacy": bigquery_common | usage_common | {"cachetools"},
"bigquery": sql_common
| bigquery_common
| {"sqllineage==1.3.6", "sql_metadata", "sqlalchemy-bigquery>=1.4.1"},
Expand Down Expand Up @@ -411,8 +407,6 @@ def get_long_description():
dependency
for plugin in [
"bigquery",
"bigquery-legacy",
"bigquery-usage-legacy",
"clickhouse",
"clickhouse-usage",
"delta-lake",
Expand Down Expand Up @@ -493,9 +487,7 @@ def get_long_description():
"sqlalchemy = datahub.ingestion.source.sql.sql_generic:SQLAlchemyGenericSource",
"athena = datahub.ingestion.source.sql.athena:AthenaSource",
"azure-ad = datahub.ingestion.source.identity.azure_ad:AzureADSource",
"bigquery-legacy = datahub.ingestion.source.sql.bigquery:BigQuerySource",
"bigquery = datahub.ingestion.source.bigquery_v2.bigquery:BigqueryV2Source",
"bigquery-usage-legacy = datahub.ingestion.source.usage.bigquery_usage:BigQueryUsageSource",
"clickhouse = datahub.ingestion.source.sql.clickhouse:ClickHouseSource",
"clickhouse-usage = datahub.ingestion.source.usage.clickhouse_usage:ClickHouseUsageSource",
"delta-lake = datahub.ingestion.source.delta_lake:DeltaLakeSource",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import logging
import os
from datetime import timedelta
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional

from pydantic import Field, PositiveInt, root_validator
from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.ingestion.source.sql.sql_common import SQLAlchemyConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
LineageStatefulIngestionConfig,
ProfilingStatefulIngestionConfig,
UsageStatefulIngestionConfig,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_config.sql.bigquery import BigQueryConfig
from datahub.ingestion.source_config.bigquery import BigQueryBaseConfig
from datahub.ingestion.source_config.usage.bigquery_usage import BigQueryCredential

logger = logging.getLogger(__name__)

Expand All @@ -29,7 +32,8 @@ class BigQueryUsageConfig(BaseUsageConfig):


class BigQueryV2Config(
BigQueryConfig,
BigQueryBaseConfig,
SQLAlchemyConfig,
LineageStatefulIngestionConfig,
UsageStatefulIngestionConfig,
ProfilingStatefulIngestionConfig,
Expand Down Expand Up @@ -118,6 +122,58 @@ class BigQueryV2Config(
description="Use the legacy sharded table urn suffix added.",
)

scheme: str = "bigquery"

log_page_size: PositiveInt = Field(
default=1000,
description="The number of log item will be queried per page for lineage collection",
)
credential: Optional[BigQueryCredential] = Field(
description="BigQuery credential informations"
)
# extra_client_options, include_table_lineage and max_query_duration are relevant only when computing the lineage.
extra_client_options: Dict[str, Any] = Field(
default={},
description="Additional options to pass to google.cloud.logging_v2.client.Client.",
)
include_table_lineage: Optional[bool] = Field(
default=True,
description="Option to enable/disable lineage generation. Is enabled by default.",
)
max_query_duration: timedelta = Field(
default=timedelta(minutes=15),
description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.",
)

bigquery_audit_metadata_datasets: Optional[List[str]] = Field(
default=None,
description="A list of datasets that contain a table named cloudaudit_googleapis_com_data_access which contain BigQuery audit logs, specifically, those containing BigQueryAuditMetadata. It is recommended that the project of the dataset is also specified, for example, projectA.datasetB.",
)
use_exported_bigquery_audit_metadata: bool = Field(
default=False,
description="When configured, use BigQueryAuditMetadata in bigquery_audit_metadata_datasets to compute lineage information.",
)
use_date_sharded_audit_log_tables: bool = Field(
default=False,
description="Whether to read date sharded tables or time partitioned tables when extracting usage from exported audit logs.",
)
_credentials_path: Optional[str] = PrivateAttr(None)

upstream_lineage_in_report: bool = Field(
default=False,
description="Useful for debugging lineage information. Set to True to see the raw lineage created internally.",
)

def __init__(self, **data: Any):
super().__init__(**data)

if self.credential:
self._credentials_path = self.credential.create_credential_temp_file()
logger.debug(
f"Creating temporary credential file at {self._credentials_path}"
)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self._credentials_path

@root_validator(pre=False)
def profile_default_settings(cls, values: Dict) -> Dict:
# Extra default SQLAlchemy option for better connection pooling and threading.
Expand Down Expand Up @@ -185,3 +241,14 @@ def get_sql_alchemy_url(self, run_on_compute: bool = False) -> str:
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return "bigquery://"

@validator("platform")
def platform_is_always_bigquery(cls, v):
return "bigquery"

@validator("platform_instance")
def bigquery_doesnt_need_platform_instance(cls, v):
if v is not None:
raise ConfigurationError(
"BigQuery project ids are globally unique. You do not need to specify a platform instance."
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import datetime
import logging
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Tuple, cast

from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -47,8 +47,8 @@ def __init__(

@staticmethod
def get_partition_range_from_partition_id(
partition_id: str, partition_datetime: Optional[datetime.datetime]
) -> Tuple[datetime.datetime, datetime.datetime]:
partition_id: str, partition_datetime: Optional[datetime]
) -> Tuple[datetime, datetime]:
partition_range_map: Dict[int, Tuple[relativedelta, str]] = {
4: (relativedelta(years=1), "%Y"),
6: (relativedelta(months=1), "%Y%m"),
Expand All @@ -61,7 +61,12 @@ def get_partition_range_from_partition_id(
(delta, format) = partition_range_map[len(partition_id)]
duration = delta
if not partition_datetime:
partition_datetime = datetime.datetime.strptime(partition_id, format)
partition_datetime = datetime.strptime(partition_id, format)
else:
partition_datetime = datetime.strptime(
partition_datetime.strftime(format), format
)

else:
raise ValueError(
f"check your partition_id {partition_id}. It must be yearly/monthly/daily/hourly."
Expand All @@ -74,7 +79,7 @@ def generate_partition_profiler_query(
project: str,
schema: str,
table: BigqueryTable,
partition_datetime: Optional[datetime.datetime],
partition_datetime: Optional[datetime],
) -> Tuple[Optional[str], Optional[str]]:
"""
Method returns partition id if table is partitioned or sharded and generate custom partition query for
Expand Down Expand Up @@ -227,7 +232,7 @@ def generate_wu_from_profile_requests(
# We don't add to the profiler state if we only do table level profiling as it always happens
if self.state_handler and not request.profile_table_level_only:
self.state_handler.add_to_state(
dataset_urn, int(datetime.datetime.utcnow().timestamp() * 1000)
dataset_urn, int(datetime.utcnow().timestamp() * 1000)
)

wu = wrap_aspect_as_workunit(
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ def build_custom_properties(
topic_detail: Optional[TopicMetadata],
extra_topic_config: Optional[Dict[str, ConfigEntry]],
) -> Dict[str, str]:

custom_props: Dict[str, str] = {}
self.update_custom_props_with_topic_details(topic, topic_detail, custom_props)
self.update_custom_props_with_topic_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None
def get_workunits(
self, discovered_tables: List[str], discovered_views: List[str]
) -> Iterable[MetadataWorkUnit]:

self.connection = self.create_connection()
if self.connection is None:
return
Expand Down
Loading

0 comments on commit 8938c98

Please sign in to comment.