diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index efc40bf6afeab4..01301f838a6882 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -18,7 +18,13 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - The `should_overwrite` flag in `csv-enricher` has been replaced with `write_semantics` to match the format used for other sources. See the [documentation](https://datahubproject.io/docs/generated/ingestion/sources/csv/) for more details - Closing an authorization hole in creating tags adding a Platform Privilege called `Create Tags` for creating tags. This is assigned to `datahub` root user, along with default All Users policy. Notice: You may need to add this privilege (or `Manage Tags`) to existing users that need the ability to create tags on the platform. +- #5329 Below profiling config parameters are now supported in `BigQuery`: + - profiling.profile_if_updated_since_days (default=1) + - profiling.profile_table_size_limit (default=1GB) + - profiling.profile_table_row_limit (default=50000) + Set above parameters to `null` if you want older behaviour. + ### Potential Downtime ### Deprecations diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index 3687160ee0ebec..bedd5525181fce 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -166,6 +166,7 @@ where REGEXP_CONTAINS(table_name, r'^\\d{{{date_length}}}$') """.strip() + # The existing implementation of this method can be found here: # https://github.com/googleapis/python-bigquery-sqlalchemy/blob/main/sqlalchemy_bigquery/base.py#L1018-L1025. # The existing implementation does not use the schema parameter and hence @@ -442,12 +443,21 @@ def _make_gcp_logging_client( else: return [GCPLoggingClient(**client_options)] - def generate_profile_candidates( - self, - inspector: Inspector, - threshold_time: Optional[datetime.datetime], - schema: str, - ) -> Optional[List[str]]: + @staticmethod + def get_all_schema_tables_query(schema: str) -> str: + base_query = ( + f"SELECT " + f"table_id, " + f"size_bytes, " + f"last_modified_time, " + f"row_count, " + f"FROM {schema}.__TABLES__" + ) + return base_query + + def generate_profile_candidate_query( + self, threshold_time: Optional[datetime.datetime], schema: str + ) -> str: row_condition = ( f"row_count<{self.config.profiling.profile_table_row_limit} and " if self.config.profiling.profile_table_row_limit @@ -466,22 +476,26 @@ def generate_profile_candidates( c = f"{row_condition}{size_condition}{time_condition}" profile_clause = c if c == "" else f" WHERE {c}"[:-4] if profile_clause == "": - return None + return "" + query = f"{self.get_all_schema_tables_query(schema)}{profile_clause}" + logger.debug(f"Profiling via {query}") + return query + + def generate_profile_candidates( + self, + inspector: Inspector, + threshold_time: Optional[datetime.datetime], + schema: str, + ) -> Optional[List[str]]: storage_project_id = self.get_multiproject_project_id(inspector) exec_project_id = self.get_multiproject_project_id( inspector, run_on_compute=True ) _client: BigQueryClient = BigQueryClient(project=exec_project_id) + + full_schema_name = f"{storage_project_id}.{schema}" # Reading all tables' metadata to report - base_query = ( - f"SELECT " - f"table_id, " - f"size_bytes, " - f"last_modified_time, " - f"row_count, " - f"FROM {storage_project_id}.{schema}.__TABLES__" - ) - all_tables = _client.query(base_query) + all_tables = _client.query(self.get_all_schema_tables_query(full_schema_name)) report_tables: List[str] = [ "table_id, size_bytes, last_modified_time, row_count" ] @@ -489,29 +503,22 @@ def generate_profile_candidates( report_tables.append( f"{table_row.table_id}, {table_row.size_bytes}, {table_row.last_modified_time}, {table_row.row_count}" ) - report_key = f"{self._get_project_id(inspector)}.{schema}" + report_key = f"{self._get_project_id(inspector)}.{full_schema_name}" self.report.table_metadata[report_key] = report_tables + + query = self.generate_profile_candidate_query(threshold_time, full_schema_name) self.report.profile_table_selection_criteria[report_key] = ( - "no constraint" if profile_clause == "" else profile_clause.lstrip(" WHERE") + "no constraint" if query == "" else query.split(" WHERE")[1] ) + if query == "": + return None - # reading filtered tables. TODO: remove this call and apply local filtering on above query results. - query = ( - f"SELECT " - f"table_id, " - f"size_bytes, " - f"last_modified_time, " - f"row_count, " - f"FROM {storage_project_id}.{schema}.__TABLES__" - f"{profile_clause}" - ) - logger.debug(f"Profiling via {query}") query_job = _client.query(query) _profile_candidates = [] for row in query_job: _profile_candidates.append( self.get_identifier( - schema=schema, + schema=full_schema_name, entity=row.table_id, inspector=inspector, ) diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 9e98254b84086b..85626373e20bf9 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -1,5 +1,6 @@ import json import os +from datetime import datetime import pytest @@ -10,7 +11,6 @@ def test_bigquery_uri(): - config = BigQueryConfig.parse_obj( { "project_id": "test-project", @@ -20,7 +20,6 @@ def test_bigquery_uri(): def test_bigquery_uri_with_credential(): - expected_credential_json = { "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "auth_uri": "https://accounts.google.com/o/oauth2/auth", @@ -67,7 +66,6 @@ def test_bigquery_uri_with_credential(): def test_simple_upstream_table_generation(): - a: BigQueryTableRef = BigQueryTableRef( project="test-project", dataset="test-dataset", table="a" ) @@ -97,7 +95,6 @@ def test_error_on_missing_config(): def test_upstream_table_generation_with_temporary_table_without_temp_upstream(): - a: BigQueryTableRef = BigQueryTableRef( project="test-project", dataset="test-dataset", table="a" ) @@ -143,7 +140,6 @@ def test_upstream_table_generation_with_temporary_table_with_temp_upstream(): def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstream(): - a: BigQueryTableRef = BigQueryTableRef( project="test-project", dataset="test-dataset", table="a" ) @@ -173,3 +169,158 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr } upstreams = source.get_upstream_tables(str(a), []) assert list(upstreams).sort() == [c, e].sort() + + +def test_bq_get_profile_candidate_query_all_params(): + config = BigQueryConfig.parse_obj( + { + "profiling": { + "profile_if_updated_since_days": 1, + "profile_table_size_limit": 5, + "profile_table_row_limit": 50000, + } + } + ) + source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test")) + threshold_time = datetime.fromtimestamp(1648876349) + expected_query = ( + "SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE " + "row_count<50000 and ROUND(size_bytes/POW(10,9),2)<5 and last_modified_time>=1648876349000 " + ) + query = source.generate_profile_candidate_query(threshold_time, "dataset_foo") + assert query == expected_query + + +def test_bq_get_profile_candidate_query_no_day_limit(): + config = BigQueryConfig.parse_obj( + { + "profiling": { + "profile_if_updated_since_days": None, + "profile_table_size_limit": 5, + "profile_table_row_limit": 50000, + } + } + ) + source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test")) + expected_query = ( + "SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE " + "row_count<50000 and ROUND(size_bytes/POW(10,9),2)<5 " + ) + query = source.generate_profile_candidate_query(None, "dataset_foo") + assert query == expected_query + + +def test_bq_get_profile_candidate_query_no_size_limit(): + config = BigQueryConfig.parse_obj( + { + "profiling": { + "profile_if_updated_since_days": 1, + "profile_table_size_limit": None, + "profile_table_row_limit": 50000, + } + } + ) + source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test")) + threshold_time = datetime.fromtimestamp(1648876349) + expected_query = ( + "SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE " + "row_count<50000 and last_modified_time>=1648876349000 " + ) + query = source.generate_profile_candidate_query(threshold_time, "dataset_foo") + assert query == expected_query + + +def test_bq_get_profile_candidate_query_no_row_limit(): + config = BigQueryConfig.parse_obj( + { + "profiling": { + "profile_if_updated_since_days": 1, + "profile_table_size_limit": 5, + "profile_table_row_limit": None, + } + } + ) + source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test")) + threshold_time = datetime.fromtimestamp(1648876349) + expected_query = ( + "SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE " + "ROUND(size_bytes/POW(10,9),2)<5 and last_modified_time>=1648876349000 " + ) + query = source.generate_profile_candidate_query(threshold_time, "dataset_foo") + assert query == expected_query + + +def test_bq_get_profile_candidate_query_all_null(): + + config = BigQueryConfig.parse_obj( + { + "profiling": { + "profile_if_updated_since_days": None, + "profile_table_size_limit": None, + "profile_table_row_limit": None, + } + } + ) + source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test")) + expected_query = "" + query = source.generate_profile_candidate_query(None, "dataset_foo") + assert query == expected_query + + +def test_bq_get_profile_candidate_query_only_row(): + config = BigQueryConfig.parse_obj( + { + "profiling": { + "profile_if_updated_since_days": None, + "profile_table_size_limit": None, + "profile_table_row_limit": 50000, + } + } + ) + source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test")) + expected_query = ( + "SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE " + "row_count<50000 " + ) + query = source.generate_profile_candidate_query(None, "dataset_foo") + assert query == expected_query + + +def test_bq_get_profile_candidate_query_only_days(): + config = BigQueryConfig.parse_obj( + { + "profiling": { + "profile_if_updated_since_days": 1, + "profile_table_size_limit": None, + "profile_table_row_limit": None, + } + } + ) + source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test")) + threshold_time = datetime.fromtimestamp(1648876349) + expected_query = ( + "SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE " + "last_modified_time>=1648876349000 " + ) + query = source.generate_profile_candidate_query(threshold_time, "dataset_foo") + assert query == expected_query + + +def test_bq_get_profile_candidate_query_only_size(): + + config = BigQueryConfig.parse_obj( + { + "profiling": { + "profile_if_updated_since_days": None, + "profile_table_size_limit": 5, + "profile_table_row_limit": None, + } + } + ) + source = BigQuerySource(config=config, ctx=PipelineContext(run_id="test")) + expected_query = ( + "SELECT table_id, size_bytes, last_modified_time, row_count, FROM dataset_foo.__TABLES__ WHERE " + "ROUND(size_bytes/POW(10,9),2)<5 " + ) + query = source.generate_profile_candidate_query(None, "dataset_foo") + assert query == expected_query