From 5937b354471bcd2f3425f673dfa295a9a4abfb8a Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Thu, 1 Dec 2022 00:25:43 -0800 Subject: [PATCH 01/11] fix(actions): add missing datahub-gms-protocol env var (#6593) --- docker/datahub-actions/env/docker.env | 3 ++- .../quickstart/docker-compose-without-neo4j-m1.quickstart.yml | 1 + docker/quickstart/docker-compose-without-neo4j.quickstart.yml | 1 + docker/quickstart/docker-compose.quickstart.yml | 1 + 4 files changed, 5 insertions(+), 1 deletion(-) diff --git a/docker/datahub-actions/env/docker.env b/docker/datahub-actions/env/docker.env index f055455c37d852..48fb0b080d2436 100644 --- a/docker/datahub-actions/env/docker.env +++ b/docker/datahub-actions/env/docker.env @@ -1,3 +1,4 @@ +DATAHUB_GMS_PROTOCOL=http DATAHUB_GMS_HOST=datahub-gms DATAHUB_GMS_PORT=8080 @@ -18,4 +19,4 @@ KAFKA_PROPERTIES_SECURITY_PROTOCOL=PLAINTEXT # KAFKA_PROPERTIES_SSL_TRUSTSTORE_LOCATION=/mnt/certs/truststore # KAFKA_PROPERTIES_SSL_KEYSTORE_PASSWORD=keystore_password # KAFKA_PROPERTIES_SSL_KEY_PASSWORD=keystore_password -# KAFKA_PROPERTIES_SSL_TRUSTSTORE_PASSWORD=truststore_password \ No newline at end of file +# KAFKA_PROPERTIES_SSL_TRUSTSTORE_PASSWORD=truststore_password diff --git a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml index 9e76b6a34419f5..e87f55c0086c81 100644 --- a/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j-m1.quickstart.yml @@ -23,6 +23,7 @@ services: depends_on: - datahub-gms environment: + - DATAHUB_GMS_PROTOCOL=http - DATAHUB_GMS_HOST=datahub-gms - DATAHUB_GMS_PORT=8080 - KAFKA_BOOTSTRAP_SERVER=broker:29092 diff --git a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml index 6889f96f625f21..e6f6b73396de39 100644 --- a/docker/quickstart/docker-compose-without-neo4j.quickstart.yml +++ b/docker/quickstart/docker-compose-without-neo4j.quickstart.yml @@ -23,6 +23,7 @@ services: depends_on: - datahub-gms environment: + - DATAHUB_GMS_PROTOCOL=http - DATAHUB_GMS_HOST=datahub-gms - DATAHUB_GMS_PORT=8080 - KAFKA_BOOTSTRAP_SERVER=broker:29092 diff --git a/docker/quickstart/docker-compose.quickstart.yml b/docker/quickstart/docker-compose.quickstart.yml index d6a05abb9c68ff..486740bcf418ad 100644 --- a/docker/quickstart/docker-compose.quickstart.yml +++ b/docker/quickstart/docker-compose.quickstart.yml @@ -25,6 +25,7 @@ services: depends_on: - datahub-gms environment: + - DATAHUB_GMS_PROTOCOL=http - DATAHUB_GMS_HOST=datahub-gms - DATAHUB_GMS_PORT=8080 - KAFKA_BOOTSTRAP_SERVER=broker:29092 From f63c3e5222af7a7bc8ddaf454c16763cc1a8a734 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Thu, 1 Dec 2022 15:03:03 +0530 Subject: [PATCH 02/11] fix(ingest): restrict snowflake-connector-python dependency (#6594) --- metadata-ingestion/setup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 991b8792b3ecd0..1a744a6fe328a6 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -178,6 +178,8 @@ def get_long_description(): # Eventually we should just require snowflake-sqlalchemy>=1.4.3, but I won't do that immediately # because it may break Airflow users that need SQLAlchemy 1.3.x. "SQLAlchemy<1.4.42", + # See https://github.com/snowflakedb/snowflake-connector-python/pull/1348 for why 2.8.2 is blocked + "snowflake-connector-python!=2.8.2", "pandas", "cryptography", "msal", From 6fe9ad4fbb51a584d3ea54de43190aaa72cb5296 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 1 Dec 2022 14:05:29 -0500 Subject: [PATCH 03/11] feat(ingest/bigquery): avoid creating/deleting tables for profiling (#6578) --- .../sources/bigquery/bigquery-legacy_pre.md | 20 +-- .../docs/sources/bigquery/bigquery_pre.md | 27 +-- .../recipes/bigquery_to_datahub.dhub.yaml | 1 - .../ingestion/source/ge_data_profiler.py | 160 +++++++++++------- .../ingestion/source/ge_profiling_config.py | 18 +- .../ingestion/source/profiling/common.py | 2 +- .../datahub/ingestion/source/s3/profiling.py | 4 +- .../datahub/ingestion/source/sql/bigquery.py | 2 +- .../ingestion/source_config/sql/bigquery.py | 11 -- 9 files changed, 117 insertions(+), 128 deletions(-) diff --git a/metadata-ingestion/docs/sources/bigquery/bigquery-legacy_pre.md b/metadata-ingestion/docs/sources/bigquery/bigquery-legacy_pre.md index 39e8db0ee8c554..2d433b82e1a8bd 100644 --- a/metadata-ingestion/docs/sources/bigquery/bigquery-legacy_pre.md +++ b/metadata-ingestion/docs/sources/bigquery/bigquery-legacy_pre.md @@ -78,24 +78,8 @@ Note: Since bigquery source also supports dataset level lineage, the auth client ### Profiling Details -Profiling can profile normal/partitioned and sharded tables as well but due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables. - -If limit/offset parameter is set or partitioning partitioned or sharded table Great Expectation (the profiling framework we use) needs to create temporary -views. By default these views are created in the schema where the profiled table is but you can control to create all these -tables into a predefined schema by setting `profiling.bigquery_temp_table_schema` property. -Temporary tables are removed after profiling. - -```yaml - profiling: - enabled: true - bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created -``` - -:::note - -Due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables. -You can set partition explicitly with `partition.partition_datetime` property if you want. (partition will be applied to all partitioned tables) -::: +For performance reasons, we only profile the latest partition for partitioned tables and the latest shard for sharded tables. +You can set partition explicitly with `partition.partition_datetime` property if you want, though note that partition config will be applied to all partitioned tables. ### Working with multi-project GCP setups diff --git a/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md b/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md index e60016dade51f4..c204fcb6449d59 100644 --- a/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md +++ b/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md @@ -21,8 +21,6 @@ There are two important concepts to understand and identify: | `bigquery.jobs.list`             | Manage the queries that the service account has sent. *This only needs for the extractor project where the service account belongs* |                                                                                                               | | `bigquery.readsessions.create`   | Create a session for streaming large results. *This only needs for the extractor project where the service account belongs*         |                                                                                                               | | `bigquery.readsessions.getData` | Get data from the read session. *This only needs for the extractor project where the service account belongs*                       | -| `bigquery.tables.create`         | Create temporary tables when profiling tables. Tip: Use the `profiling.bigquery_temp_table_schema` to ensure that all temp tables (across multiple projects) are created in this project under a specific dataset.                 | Profiling                           |                                                                                                                 | -| `bigquery.tables.delete`         | Delete temporary tables when profiling tables. Tip: Use the `profiling.bigquery_temp_table_schema` to ensure that all temp tables (across multiple projects) are created in this project under a specific dataset.                   | Profiling                           |                                                                                                                 | 2. Grant the following permissions to the Service Account on every project where you would like to extract metadata from :::info @@ -41,11 +39,6 @@ If you have multiple projects in your BigQuery setup, the role should be granted | `logging.logEntries.list`       | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) | | `logging.privateLogEntries.list` | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) | | `bigquery.tables.getData`       | Access table data to extract storage size, last updated at, data profiles etc. | Profiling                           |                                                                                                                 | -| `bigquery.tables.create`         | [Optional] Only needed if not using the `profiling.bigquery_temp_table_schema` config option. | Profiling                           |                                                                                                                 | -| `bigquery.tables.delete`         | [Optional] Only needed if not using the `profiling.bigquery_temp_table_schema` config option. | Profiling                           |                                                                                                                 | - -The profiler creates temporary tables to profile partitioned/sharded tables and that is why it needs table create/delete privilege. -Use `profiling.bigquery_temp_table_schema` to restrict to one specific dataset the create/delete permission #### Create a service account in the Extractor Project @@ -100,24 +93,8 @@ Note: Since bigquery source also supports dataset level lineage, the auth client ### Profiling Details -Profiling can profile normal/partitioned and sharded tables as well but due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables. - -If limit/offset parameter is set or partitioning partitioned or sharded table Great Expectation (the profiling framework we use) needs to create temporary -views. By default, these views are created in the schema where the profiled table is but you can control to create all these -tables into a predefined schema by setting `profiling.bigquery_temp_table_schema` property. -Temporary tables are removed after profiling. - -```yaml -profiling: - enabled: true - bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created -``` - -:::note - -Due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables. -You can set partition explicitly with `partition.partition_datetime` property if you want. (partition will be applied to all partitioned tables) -::: +For performance reasons, we only profile the latest partition for partitioned tables and the latest shard for sharded tables. +You can set partition explicitly with `partition.partition_datetime` property if you want, though note that partition config will be applied to all partitioned tables. ### Caveats diff --git a/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml b/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml index 377af98598d981..84f098fa06c5c6 100644 --- a/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml +++ b/metadata-ingestion/examples/recipes/bigquery_to_datahub.dhub.yaml @@ -20,7 +20,6 @@ source: #end_time: 2023-12-15T20:08:23.091Z #profiling: # enabled: true - # bigquery_temp_table_schema: my-project-id.my-schema-where-views-can-be-created-for-profiling # turn_off_expensive_profiling_metrics: false # query_combiner_enabled: true # max_number_of_fields_to_profile: 8 diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 394b39765c1624..e6782a92e3f452 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -10,10 +10,20 @@ import traceback import unittest.mock import uuid -from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Optional, + Tuple, + Union, + cast, +) import sqlalchemy as sa -from great_expectations import __version__ as ge_version from great_expectations.core.util import convert_to_json_serializable from great_expectations.data_context import BaseDataContext from great_expectations.data_context.types.base import ( @@ -30,12 +40,11 @@ from sqlalchemy.exc import ProgrammingError from typing_extensions import Concatenate, ParamSpec -from datahub.configuration.common import ConfigurationError from datahub.emitter.mce_builder import get_sys_time from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.profiling.common import ( Cardinality, - _convert_to_cardinality, + convert_to_cardinality, ) from datahub.ingestion.source.sql.sql_common import SQLSourceReport from datahub.metadata.schema_classes import ( @@ -315,7 +324,7 @@ def _get_column_cardinality( column_spec.unique_count = unique_count - column_spec.cardinality = _convert_to_cardinality(unique_count, pct_unique) + column_spec.cardinality = convert_to_cardinality(unique_count, pct_unique) @_run_with_query_combiner def _get_dataset_rows(self, dataset_profile: DatasetProfileClass) -> None: @@ -795,16 +804,6 @@ def generate_profiles( self.report.report_from_query_combiner(query_combiner.report) - def _is_legacy_ge_temp_table_creation(self) -> bool: - legacy_ge_bq_temp_table_creation: bool = False - (major, minor, patch) = ge_version.split(".") - if int(major) == 0 and ( - int(minor) < 15 or (int(minor) == 15 and int(patch) < 3) - ): - legacy_ge_bq_temp_table_creation = True - - return legacy_ge_bq_temp_table_creation - def _generate_profile_from_request( self, query_combiner: SQLAlchemyQueryCombiner, @@ -820,16 +819,6 @@ def _generate_profile_from_request( **request.batch_kwargs, ) - def _drop_bigquery_temp_table(self, bigquery_temp_table: str) -> None: - try: - with self.base_engine.connect() as connection: - connection.execute(f"drop view if exists `{bigquery_temp_table}`") - logger.debug(f"Temp table {bigquery_temp_table} was dropped.") - except Exception: - logger.warning( - f"Unable to delete bigquery temporary table: {bigquery_temp_table}" - ) - def _drop_trino_temp_table(self, temp_dataset: Dataset) -> None: schema = temp_dataset._table.schema table = temp_dataset._table.name @@ -855,7 +844,6 @@ def _generate_single_profile( logger.debug( f"Received single profile request for {pretty_name} for {schema}, {table}, {custom_sql}" ) - bigquery_temp_table: Optional[str] = None ge_config = { "schema": schema, @@ -865,46 +853,90 @@ def _generate_single_profile( **kwargs, } - # We have to create temporary tables if offset or limit or custom sql is set on Bigquery - if custom_sql or self.config.limit or self.config.offset: - if profiler_args is not None: - temp_table_db = profiler_args.get("temp_table_db", schema) - if platform is not None and platform == "bigquery": - ge_config["schema"] = None - - if self.config.bigquery_temp_table_schema: - num_parts = self.config.bigquery_temp_table_schema.split(".") - # If we only have 1 part that means the project_id is missing from the table name and we add it - if len(num_parts) == 1: - bigquery_temp_table = f"{temp_table_db}.{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" - elif len(num_parts) == 2: - bigquery_temp_table = f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" + bigquery_temp_table: Optional[str] = None + if platform == "bigquery" and ( + custom_sql or self.config.limit or self.config.offset + ): + # On BigQuery, we need to bypass GE's mechanism for creating temporary tables because + # it requires create/delete table permissions. + import google.cloud.bigquery.job.query + from google.cloud.bigquery.dbapi.cursor import Cursor as BigQueryCursor + + raw_connection = self.base_engine.raw_connection() + try: + cursor: "BigQueryCursor" = cast( + "BigQueryCursor", raw_connection.cursor() + ) + if custom_sql is not None: + # Note that limit and offset are not supported for custom SQL. + bq_sql = custom_sql else: - raise ConfigurationError( - f"bigquery_temp_table_schema should be either project.dataset or dataset format but it was: {self.config.bigquery_temp_table_schema}" - ) - else: - assert table - table_parts = table.split(".") - if len(table_parts) == 2: - bigquery_temp_table = ( - f"{temp_table_db}.{table_parts[0]}.ge-temp-{uuid.uuid4()}" - ) + bq_sql = f"SELECT * FROM `{table}`" + if self.config.limit: + bq_sql += f" LIMIT {self.config.limit}" + if self.config.offset: + bq_sql += f" OFFSET {self.config.offset}" + + cursor.execute(bq_sql) + + # Great Expectations batch v2 API, which is the one we're using, requires + # a concrete table name against which profiling is executed. Normally, GE + # creates a table with an expiry time of 24 hours. However, we don't want the + # temporary tables to stick around that long, so we'd also have to delete them + # ourselves. As such, the profiler required create and delete table permissions + # on BigQuery. + # + # It turns out that we can (ab)use the BigQuery cached results feature + # to avoid creating temporary tables ourselves. For almost all queries, BigQuery + # will store the results in a temporary, cached results table when an explicit + # destination table is not provided. These tables are pretty easy to identify + # because they live in "anonymous datasets" and have a name that looks like + # "project-id._d60e97aec7f471046a960419adb6d44e98300db7.anon10774d0ea85fd20fe9671456c5c53d5f1b85e1b17bedb232dfce91661a219ee3" + # These tables are per-user and per-project, so there's no risk of permissions escalation. + # As per the docs, the cached results tables typically have a lifetime of 24 hours, + # which should be plenty for our purposes. + # See https://cloud.google.com/bigquery/docs/cached-results for more details. + # + # The code below extracts the name of the cached results table from the query job + # and points GE to that table for profiling. + # + # Risks: + # 1. If the query results are larger than the maximum response size, BigQuery will + # not cache the results. According to the docs https://cloud.google.com/bigquery/quotas, + # the maximum response size is 10 GB compressed. + # 2. The cache lifetime of 24 hours is "best-effort" and hence not guaranteed. + # 3. Tables with column-level security may not be cached, and tables with row-level + # security will not be cached. + # 4. BigQuery "discourages" using cached results directly, but notes that + # the current semantics do allow it. + # + # The better long-term solution would be to use a subquery avoid this whole + # temporary table dance. However, that would require either a) upgrading to + # use GE's batch v3 API or b) bypassing GE altogether. + + query_job: Optional[ + "google.cloud.bigquery.job.query.QueryJob" + ] = cursor._query_job + assert query_job + temp_destination_table = query_job.destination + bigquery_temp_table = f"{temp_destination_table.project}.{temp_destination_table.dataset_id}.{temp_destination_table.table_id}" + finally: + raw_connection.close() - # With this pr there is no option anymore to set the bigquery temp table: - # https://github.com/great-expectations/great_expectations/pull/4925 - # This dirty hack to make it possible to control the temp table to use in Bigquery - # otherwise it will expect dataset_id in the connection url which is not option in our case - # as we batch these queries. - # Currently only with this option is possible to control the temp table which is created: - # https://github.com/great-expectations/great_expectations/blob/7e53b615c36a53f78418ce46d6bc91a7011163c0/great_expectations/datasource/sqlalchemy_datasource.py#L397 - if self._is_legacy_ge_temp_table_creation(): - ge_config["bigquery_temp_table"] = bigquery_temp_table - else: - ge_config["snowflake_transient_table"] = bigquery_temp_table + if platform == "bigquery": + if bigquery_temp_table: + ge_config["table"] = bigquery_temp_table + ge_config["schema"] = None + ge_config["limit"] = None + ge_config["offset"] = None + + bigquery_temp_table = None - if custom_sql is not None: - ge_config["query"] = custom_sql + assert not ge_config["limit"] + assert not ge_config["offset"] + else: + if custom_sql is not None: + ge_config["query"] = custom_sql with self._ge_context() as ge_context, PerfTimer() as timer: try: @@ -944,8 +976,6 @@ def _generate_single_profile( finally: if self.base_engine.engine.name == "trino": self._drop_trino_temp_table(batch) - elif bigquery_temp_table: - self._drop_bigquery_temp_table(bigquery_temp_table) def _get_ge_dataset( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index d06bb9d5d21ef1..576d4de8c2568c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -1,4 +1,5 @@ import datetime +import logging import os from typing import Any, Dict, List, Optional @@ -14,6 +15,8 @@ # all include_field_ flags are reported. } +logger = logging.getLogger(__name__) + class GEProfilingConfig(ConfigModel): enabled: bool = Field( @@ -128,15 +131,22 @@ class GEProfilingConfig(ConfigModel): catch_exceptions: bool = Field(default=True, description="") partition_profiling_enabled: bool = Field(default=True, description="") - bigquery_temp_table_schema: Optional[str] = Field( - default=None, - description="On bigquery for profiling partitioned tables needs to create temporary views. You have to define a dataset where these will be created. Views will be cleaned up after profiler runs. (Great expectation tech details about this (https://legacy.docs.greatexpectations.io/en/0.9.0/reference/integrations/bigquery.html#custom-queries-with-sql-datasource).", - ) partition_datetime: Optional[datetime.datetime] = Field( default=None, description="For partitioned datasets profile only the partition which matches the datetime or profile the latest one if not set. Only Bigquery supports this.", ) + @pydantic.root_validator(pre=True) + def deprecate_bigquery_temp_table_schema(cls, values): + # TODO: Update docs to remove mention of this field. + if "bigquery_temp_table_schema" in values: + logger.warning( + "The bigquery_temp_table_schema config is no longer required. Please remove it from your config.", + DeprecationWarning, + ) + del values["bigquery_temp_table_schema"] + return values + @pydantic.root_validator() def ensure_field_level_settings_are_normalized( cls: "GEProfilingConfig", values: Dict[str, Any] diff --git a/metadata-ingestion/src/datahub/ingestion/source/profiling/common.py b/metadata-ingestion/src/datahub/ingestion/source/profiling/common.py index 463bfed0a7f7e6..b54f0e02fc1c87 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/profiling/common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/profiling/common.py @@ -13,7 +13,7 @@ class Cardinality(Enum): UNIQUE = 7 -def _convert_to_cardinality( +def convert_to_cardinality( unique_count: Optional[int], pct_unique: Optional[float] ) -> Optional[Cardinality]: """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/profiling.py b/metadata-ingestion/src/datahub/ingestion/source/s3/profiling.py index 9f534d77c1b3b6..a8822fca58cce9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/profiling.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/profiling.py @@ -37,7 +37,7 @@ from datahub.emitter.mce_builder import get_sys_time from datahub.ingestion.source.profiling.common import ( Cardinality, - _convert_to_cardinality, + convert_to_cardinality, ) from datahub.ingestion.source.s3.report import DataLakeSourceReport from datahub.metadata.schema_classes import ( @@ -303,7 +303,7 @@ def __init__( ) column_spec.type_ = column_types[column] - column_spec.cardinality = _convert_to_cardinality( + column_spec.cardinality = convert_to_cardinality( column_distinct_counts[column], column_null_fractions[column], ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index 0613fda4595b24..e9a82b41822e2e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -961,7 +961,7 @@ def get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler": ) def get_profile_args(self) -> Dict: - return {"temp_table_db": self.config.project_id} + return {} def is_dataset_eligible_for_profiling( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py index 36e7f48b1271e4..3782d26ea748a2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/sql/bigquery.py @@ -97,16 +97,6 @@ def bigquery_doesnt_need_platform_instance(cls, v): def validate_that_bigquery_audit_metadata_datasets_is_correctly_configured( cls, values: Dict[str, Any] ) -> Dict[str, Any]: - profiling = values.get("profiling") - if ( - values.get("storage_project_id") - and profiling is not None - and profiling.enabled - and not profiling.bigquery_temp_table_schema - ): - raise ConfigurationError( - "If storage project is being used with profiling then bigquery_temp_table_schema needs to be set to a dataset in the compute project" - ) if ( values.get("use_exported_bigquery_audit_metadata") and not values.get("use_v2_audit_metadata") @@ -115,7 +105,6 @@ def validate_that_bigquery_audit_metadata_datasets_is_correctly_configured( raise ConfigurationError( "bigquery_audit_metadata_datasets must be specified if using exported audit metadata. Otherwise set use_v2_audit_metadata to True." ) - pass return values @pydantic.validator("platform") From d6dd8ccc51cf48090acc11aedbd9b827f320be3b Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 1 Dec 2022 17:02:50 -0500 Subject: [PATCH 04/11] fix(ingest): unify emit interface (#6592) --- .../api/entities/corpgroup/corpgroup.py | 10 ++----- .../datahub/api/entities/corpuser/corpuser.py | 10 ++----- .../datahub/api/entities/datajob/dataflow.py | 6 +--- .../datahub/api/entities/datajob/datajob.py | 6 +--- .../dataprocess/dataprocess_instance.py | 10 ++----- .../src/datahub/emitter/kafka_emitter.py | 9 +++--- .../src/datahub/emitter/rest_emitter.py | 28 ++++++++++++++----- 7 files changed, 34 insertions(+), 45 deletions(-) diff --git a/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py b/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py index 66bf5524c2dd11..5fc9ed8ec5c70a 100644 --- a/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py +++ b/metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Callable, Iterable, Optional, Union, cast +from typing import Callable, Iterable, Optional, Union import datahub.emitter.mce_builder as builder from datahub.emitter.kafka_emitter import DatahubKafkaEmitter @@ -81,10 +81,4 @@ def emit( :param callback: The callback method for KafkaEmitter if it is used """ for mcp in self.generate_mcp(): - if type(emitter).__name__ == "DatahubKafkaEmitter": - assert callback is not None - kafka_emitter = cast("DatahubKafkaEmitter", emitter) - kafka_emitter.emit(mcp, callback) - else: - rest_emitter = cast("DatahubRestEmitter", emitter) - rest_emitter.emit(mcp) + emitter.emit(mcp, callback) diff --git a/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py b/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py index 00fe35ded5ee25..1f4c1d6fdca171 100644 --- a/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py +++ b/metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py @@ -1,7 +1,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Callable, Iterable, List, Optional, Union, cast +from typing import Callable, Iterable, List, Optional, Union import datahub.emitter.mce_builder as builder from datahub.emitter.kafka_emitter import DatahubKafkaEmitter @@ -100,10 +100,4 @@ def emit( :param callback: The callback method for KafkaEmitter if it is used """ for mcp in self.generate_mcp(): - if type(emitter).__name__ == "DatahubKafkaEmitter": - assert callback is not None - kafka_emitter = cast("DatahubKafkaEmitter", emitter) - kafka_emitter.emit(mcp, callback) - else: - rest_emitter = cast("DatahubRestEmitter", emitter) - rest_emitter.emit(mcp) + emitter.emit(mcp, callback) diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py index 0a1354aa20aa8b..8c4e2da92fe583 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py @@ -135,10 +135,6 @@ def emit( :param emitter: Datahub Emitter to emit the process event :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used """ - from datahub.emitter.kafka_emitter import DatahubKafkaEmitter for mcp in self.generate_mcp(): - if isinstance(emitter, DatahubKafkaEmitter): - emitter.emit(mcp, callback) - else: - emitter.emit(mcp) + emitter.emit(mcp, callback) diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py index 68a876138c277a..8335b0f66292e3 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py @@ -178,13 +178,9 @@ def emit( :param emitter: Datahub Emitter to emit the process event :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used """ - from datahub.emitter.kafka_emitter import DatahubKafkaEmitter for mcp in self.generate_mcp(): - if isinstance(emitter, DatahubKafkaEmitter): - emitter.emit(mcp, callback) - else: - emitter.emit(mcp) + emitter.emit(mcp, callback) def generate_data_input_output_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: mcp = MetadataChangeProposalWrapper( diff --git a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py index 09484d246161f9..5dde5fd1ff06e7 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py +++ b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py @@ -1,7 +1,7 @@ import time from dataclasses import dataclass, field from enum import Enum -from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Union, cast +from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Union from datahub.api.entities.datajob import DataFlow, DataJob from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -282,13 +282,7 @@ def _emit_mcp( :param emitter: (Union[DatahubRestEmitter, DatahubKafkaEmitter]) the datahub emitter to emit generated mcps :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used """ - if type(emitter).__name__ == "DatahubKafkaEmitter": - assert callback is not None - kafka_emitter = cast("DatahubKafkaEmitter", emitter) - kafka_emitter.emit(mcp, callback) - else: - rest_emitter = cast("DatahubRestEmitter", emitter) - rest_emitter.emit(mcp) + emitter.emit(mcp, callback) def emit( self, diff --git a/metadata-ingestion/src/datahub/emitter/kafka_emitter.py b/metadata-ingestion/src/datahub/emitter/kafka_emitter.py index 75d07e16f6e3a0..dff659b186172b 100644 --- a/metadata-ingestion/src/datahub/emitter/kafka_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/kafka_emitter.py @@ -116,9 +116,9 @@ def emit( callback: Optional[Callable[[Exception, str], None]] = None, ) -> None: if isinstance(item, (MetadataChangeProposal, MetadataChangeProposalWrapper)): - return self.emit_mcp_async(item, callback or _noop_callback) + return self.emit_mcp_async(item, callback or _error_reporting_callback) else: - return self.emit_mce_async(item, callback or _noop_callback) + return self.emit_mce_async(item, callback or _error_reporting_callback) def emit_mce_async( self, @@ -155,5 +155,6 @@ def flush(self) -> None: producer.flush() -def _noop_callback(err: Exception, msg: str) -> None: - pass +def _error_reporting_callback(err: Exception, msg: str) -> None: + if err: + logger.error(f"Failed to emit to kafka: {err} {msg}") diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index 00eee29e212499..0ad5eb3d76e1e0 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -4,7 +4,7 @@ import logging import os from json.decoder import JSONDecodeError -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union import requests from requests.adapters import HTTPAdapter, Retry @@ -175,15 +175,29 @@ def emit( MetadataChangeProposalWrapper, UsageAggregation, ], + # NOTE: This signature should have the exception be optional rather than + # required. However, this would be a breaking change that may need + # more careful consideration. + callback: Optional[Callable[[Exception, str], None]] = None, ) -> Tuple[datetime.datetime, datetime.datetime]: start_time = datetime.datetime.now() - if isinstance(item, UsageAggregation): - self.emit_usage(item) - elif isinstance(item, (MetadataChangeProposal, MetadataChangeProposalWrapper)): - self.emit_mcp(item) + try: + if isinstance(item, UsageAggregation): + self.emit_usage(item) + elif isinstance( + item, (MetadataChangeProposal, MetadataChangeProposalWrapper) + ): + self.emit_mcp(item) + else: + self.emit_mce(item) + except Exception as e: + if callback: + callback(e, str(e)) + raise else: - self.emit_mce(item) - return start_time, datetime.datetime.now() + if callback: + callback(None, "success") # type: ignore + return start_time, datetime.datetime.now() def emit_mce(self, mce: MetadataChangeEvent) -> None: url = f"{self._gms_server}/entities?action=ingest" From 83b21b021c818b2dffb9821db061c2ae6bdfefc5 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Thu, 1 Dec 2022 16:43:15 -0600 Subject: [PATCH 05/11] fix(security): security version updates (#6602) --- build.gradle | 28 ++++++++++++------- buildSrc/build.gradle | 4 +-- datahub-frontend/play.gradle | 2 +- datahub-ranger-plugin/build.gradle | 12 ++++++++ entity-registry/build.gradle | 5 ++++ metadata-io/build.gradle | 3 ++ .../auth-ranger-impl/build.gradle | 12 ++++++++ 7 files changed, 53 insertions(+), 13 deletions(-) diff --git a/build.gradle b/build.gradle index 74146515850a04..b57419e285a38c 100644 --- a/build.gradle +++ b/build.gradle @@ -8,6 +8,8 @@ buildscript { ext.neo4jVersion = '4.4.9' ext.graphQLJavaVersion = '19.0' ext.testContainersVersion = '1.17.4' + ext.jacksonVersion = '2.13.4' + ext.jettyVersion = '9.4.46.v20220331' apply from: './repositories.gradle' buildscript.repositories.addAll(project.repositories) dependencies { @@ -57,6 +59,7 @@ project.ext.externalDependency = [ 'commonsCli': 'commons-cli:commons-cli:1.5.0', 'commonsIo': 'commons-io:commons-io:2.4', 'commonsLang': 'commons-lang:commons-lang:2.6', + 'commonsText': 'org.apache.commons:commons-text:1.10.0', 'commonsCollections': 'commons-collections:commons-collections:3.2.2', 'data' : 'com.linkedin.pegasus:data:' + pegasusVersion, 'datastaxOssNativeProtocol': 'com.datastax.oss:native-protocol:1.5.1', @@ -75,7 +78,7 @@ project.ext.externalDependency = [ 'gson': 'com.google.code.gson:gson:2.8.9', 'guice': 'com.google.inject:guice:4.2.2', 'guava': 'com.google.guava:guava:27.0.1-jre', - 'h2': 'com.h2database:h2:2.1.210', + 'h2': 'com.h2database:h2:2.1.214', 'hadoopClient': 'org.apache.hadoop:hadoop-client:3.2.1', 'hadoopCommon':'org.apache.hadoop:hadoop-common:2.7.2', 'hadoopMapreduceClient':'org.apache.hadoop:hadoop-mapreduce-client-core:2.7.2', @@ -84,15 +87,18 @@ project.ext.externalDependency = [ 'httpClient': 'org.apache.httpcomponents:httpclient:4.5.9', 'httpAsyncClient': 'org.apache.httpcomponents:httpasyncclient:4.1.5', 'iStackCommons': 'com.sun.istack:istack-commons-runtime:4.0.1', - 'jacksonCore': 'com.fasterxml.jackson.core:jackson-core:2.13.2', - 'jacksonDataBind': 'com.fasterxml.jackson.core:jackson-databind:2.13.2.2', - 'jacksonDataFormatYaml': 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.2', + 'jacksonCore': "com.fasterxml.jackson.core:jackson-core:$jacksonVersion", + 'jacksonDataBind': "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion.2", + 'jacksonDataFormatYaml': "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jacksonVersion", + 'woodstoxCore': 'com.fasterxml.woodstox:woodstox-core:6.4.0', 'javatuples': 'org.javatuples:javatuples:1.2', 'javaxInject' : 'javax.inject:javax.inject:1', 'javaxValidation' : 'javax.validation:validation-api:2.0.1.Final', 'jerseyCore': 'org.glassfish.jersey.core:jersey-client:2.25.1', 'jerseyGuava': 'org.glassfish.jersey.bundles.repackaged:jersey-guava:2.25.1', - 'jettyJaas': 'org.eclipse.jetty:jetty-jaas:9.4.46.v20220331', + 'jettyJaas': "org.eclipse.jetty:jetty-jaas:$jettyVersion", + 'jettyClient': "org.eclipse.jetty:jetty-client:$jettyVersion", + 'jettison': 'org.codehaus.jettison:jettison:1.5.2', 'jgrapht': 'org.jgrapht:jgrapht-core:1.5.1', 'jna': 'net.java.dev.jna:jna:5.12.1', 'jsonPatch': 'com.github.java-json-tools:json-patch:1.13', @@ -136,14 +142,15 @@ project.ext.externalDependency = [ 'playTest': 'com.typesafe.play:play-test_2.12:2.7.6', 'pac4j': 'org.pac4j:pac4j-oidc:3.6.0', 'playPac4j': 'org.pac4j:play-pac4j_2.12:8.0.2', - 'postgresql': 'org.postgresql:postgresql:42.3.3', - 'protobuf': 'com.google.protobuf:protobuf-java:3.19.3', + 'postgresql': 'org.postgresql:postgresql:42.3.8', + 'protobuf': 'com.google.protobuf:protobuf-java:3.19.6', 'rangerCommons': 'org.apache.ranger:ranger-plugins-common:2.3.0', 'reflections': 'org.reflections:reflections:0.9.9', 'resilience4j': 'io.github.resilience4j:resilience4j-retry:1.7.1', 'rythmEngine': 'org.rythmengine:rythm-engine:1.3.0', 'servletApi': 'javax.servlet:javax.servlet-api:3.1.0', - 'shiroCore': 'org.apache.shiro:shiro-core:1.8.0', + 'shiroCore': 'org.apache.shiro:shiro-core:1.10.0', + 'snakeYaml': 'org.yaml:snakeyaml:1.33', 'sparkSql' : 'org.apache.spark:spark-sql_2.11:2.4.8', 'sparkHive' : 'org.apache.spark:spark-hive_2.11:2.4.8', 'springBeans': "org.springframework:spring-beans:$springVersion", @@ -184,6 +191,7 @@ configure(subprojects.findAll {! it.name.startsWith('spark-lineage') }) { configurations.all { exclude group: "io.netty", module: "netty" + exclude group: "log4j", module: "log4j" exclude group: "org.springframework.boot", module: "spring-boot-starter-logging" exclude group: "ch.qos.logback", module: "logback-classic" exclude group: "org.apache.logging.log4j", module: "log4j-to-slf4j" @@ -219,8 +227,8 @@ subprojects { implementation('org.apache.commons:commons-compress:1.21') implementation('org.apache.velocity:velocity-engine-core:2.3') implementation('org.hibernate:hibernate-validator:6.0.20.Final') - implementation('com.fasterxml.jackson.core:jackson-databind:2.13.2.2') - implementation('com.fasterxml.jackson.core:jackson-dataformat-cbor:2.13.2') + implementation("com.fasterxml.jackson.core:jackson-databind:$jacksonVersion.2") + implementation("com.fasterxml.jackson.core:jackson-dataformat-cbor:$jacksonVersion") } } diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 2d94fd876c31f2..b240501b49b073 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -10,7 +10,7 @@ dependencies { exclude group: 'com.google.guava', module: 'guava' } compile 'com.google.guava:guava:27.0.1-jre' - compile 'com.fasterxml.jackson.core:jackson-databind:2.9.10.7' - compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.8.11' + compile 'com.fasterxml.jackson.core:jackson-databind:2.13.4.2' + compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.4' compile 'commons-io:commons-io:2.11.0' } \ No newline at end of file diff --git a/datahub-frontend/play.gradle b/datahub-frontend/play.gradle index fb08cbddc1b071..579449e9e39b16 100644 --- a/datahub-frontend/play.gradle +++ b/datahub-frontend/play.gradle @@ -17,7 +17,7 @@ dependencies { constraints { play('org.springframework:spring-core:5.2.3.RELEASE') - play('com.fasterxml.jackson.core:jackson-databind:2.9.10.4') + play(externalDependency.jacksonDataBind) play('com.nimbusds:nimbus-jose-jwt:7.9') play('com.typesafe.akka:akka-actor_2.12:2.5.16') play('net.minidev:json-smart:2.4.1') diff --git a/datahub-ranger-plugin/build.gradle b/datahub-ranger-plugin/build.gradle index 810b1a1991c9fe..b3277a664af22f 100644 --- a/datahub-ranger-plugin/build.gradle +++ b/datahub-ranger-plugin/build.gradle @@ -30,6 +30,18 @@ dependencies { implementation externalDependency.hadoopCommon3 implementation externalDependency.log4jApi + constraints { + implementation(externalDependency.woodstoxCore) { + because("previous versions are vulnerable to CVE-2022-40151 CVE-2022-40152") + } + implementation(externalDependency.jettyClient) { + because("previous versions are vulnerable to CVE-2021-28165") + } + implementation(externalDependency.jettison) { + because("previous versions are vulnerable to CVE-2022-40149 CVE-2022-40150") + } + } + testCompile externalDependency.testng } diff --git a/entity-registry/build.gradle b/entity-registry/build.gradle index 9a77f76cd7bad9..3594e0440f63d4 100644 --- a/entity-registry/build.gradle +++ b/entity-registry/build.gradle @@ -10,6 +10,11 @@ dependencies { compile externalDependency.jacksonDataFormatYaml compile externalDependency.reflections compile externalDependency.jsonPatch + constraints { + implementation(externalDependency.snakeYaml) { + because("previous versions are vulnerable to CVE-2022-25857") + } + } dataModel project(':li-utils') annotationProcessor externalDependency.lombok diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index 0a7924f0020916..0ebee676567521 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -66,6 +66,9 @@ dependencies { implementation(externalDependency.log4jApi) { because("previous versions are vulnerable to CVE-2021-45105") } + implementation(externalDependency.commonsText) { + because("previous versions are vulnerable to CVE-2022-42889") + } } } diff --git a/metadata-service/auth-ranger-impl/build.gradle b/metadata-service/auth-ranger-impl/build.gradle index 7abb9e78ac055e..8d13106bc6657d 100644 --- a/metadata-service/auth-ranger-impl/build.gradle +++ b/metadata-service/auth-ranger-impl/build.gradle @@ -13,6 +13,18 @@ dependencies { } implementation externalDependency.hadoopCommon3 + constraints { + implementation(externalDependency.woodstoxCore) { + because("previous versions are vulnerable to CVE-2022-40151 CVE-2022-40152") + } + implementation(externalDependency.jettyClient) { + because("previous versions are vulnerable to CVE-2021-28165") + } + implementation(externalDependency.jettison) { + because("previous versions are vulnerable to CVE-2022-40149 CVE-2022-40150") + } + } + implementation 'org.apache.logging.log4j:log4j-1.2-api:2.17.1' implementation 'rome:rome:1.0' runtimeOnly externalDependency.jna From 10deee73335e224bae838dc147640680e4403352 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Bry=C5=84ski?= Date: Fri, 2 Dec 2022 00:24:25 +0100 Subject: [PATCH 06/11] docs(): remove Kafka Streams from documentation (#6596) --- docs/architecture/metadata-ingestion.md | 2 +- docs/architecture/metadata-serving.md | 6 +++--- metadata-jobs/README.md | 4 ++-- metadata-jobs/mae-consumer-job/README.md | 8 ++++---- metadata-jobs/mce-consumer-job/README.md | 6 +++--- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/docs/architecture/metadata-ingestion.md b/docs/architecture/metadata-ingestion.md index 6ab8ab964c6b6c..7c32c18b273117 100644 --- a/docs/architecture/metadata-ingestion.md +++ b/docs/architecture/metadata-ingestion.md @@ -25,7 +25,7 @@ As long as you can emit a [Metadata Change Proposal (MCP)] event to Kafka or mak ### Applying Metadata Change Proposals to DataHub Metadata Service (mce-consumer-job) -DataHub comes with a Kafka Streams based job, [mce-consumer-job], which consumes the Metadata Change Proposals and writes them into the DataHub Metadata Service (datahub-gms) using the `/ingest` endpoint. +DataHub comes with a Spring job, [mce-consumer-job], which consumes the Metadata Change Proposals and writes them into the DataHub Metadata Service (datahub-gms) using the `/ingest` endpoint. [Metadata Change Proposal (MCP)]: ../what/mxe.md#metadata-change-proposal-mcp [Metadata Change Log (MCL)]: ../what/mxe.md#metadata-change-log-mcl diff --git a/docs/architecture/metadata-serving.md b/docs/architecture/metadata-serving.md index 3032848461b851..43b0558f310dea 100644 --- a/docs/architecture/metadata-serving.md +++ b/docs/architecture/metadata-serving.md @@ -25,11 +25,11 @@ Note that not all MCP-s will result in an MCL, because the DataHub serving tier ### Metadata Index Applier (mae-consumer-job) -[Metadata Change Logs]s are consumed by another Kafka Streams job, [mae-consumer-job], which applies the changes to the [graph] and [search index] accordingly. +[Metadata Change Logs]s are consumed by another Spring job, [mae-consumer-job], which applies the changes to the [graph] and [search index] accordingly. The job is entity-agnostic and will execute corresponding graph & search index builders, which will be invoked by the job when a specific metadata aspect is changed. -The builder should instruct the job how to update the graph and search index based on the metadata change. +The builder should instruct the job how to update the graph and search index based on the metadata change. -To ensure that metadata changes are processed in the correct chronological order, MCLs are keyed by the entity [URN] — meaning all MAEs for a particular entity will be processed sequentially by a single Kafka streams thread. +To ensure that metadata changes are processed in the correct chronological order, MCLs are keyed by the entity [URN] — meaning all MAEs for a particular entity will be processed sequentially by a single thread. ### Metadata Query Serving diff --git a/metadata-jobs/README.md b/metadata-jobs/README.md index 691d5f52095e00..b718e0ca8485e3 100644 --- a/metadata-jobs/README.md +++ b/metadata-jobs/README.md @@ -1,10 +1,10 @@ # MXE Processing Jobs -DataHub uses Kafka as the pub-sub message queue in the backend. There are 2 Kafka topics used by DataHub which are +DataHub uses Kafka as the pub-sub message queue in the backend. There are 2 Kafka topics used by DataHub which are `MetadataChangeEvent` and `MetadataAuditEvent`. * `MetadataChangeEvent:` This message is emitted by any data platform or crawler in which there is a change in the metadata. * `MetadataAuditEvent:` This message is emitted by [DataHub GMS](../gms) to notify that metadata change is registered. -To be able to consume from these two topics, there are two [Kafka Streams](https://kafka.apache.org/documentation/streams/) +To be able to consume from these two topics, there are two Spring jobs DataHub uses: * [MCE Consumer Job](mce-consumer-job): Writes to [DataHub GMS](../gms) * [MAE Consumer Job](mae-consumer-job): Writes to [Elasticsearch](../docker/elasticsearch) & [Neo4j](../docker/neo4j) diff --git a/metadata-jobs/mae-consumer-job/README.md b/metadata-jobs/mae-consumer-job/README.md index 3d75bd14379e4b..e5aa4486426cc3 100644 --- a/metadata-jobs/mae-consumer-job/README.md +++ b/metadata-jobs/mae-consumer-job/README.md @@ -4,7 +4,7 @@ title: "metadata-jobs:mae-consumer-job" # Metadata Audit Event Consumer Job -The Metadata Audit Event Consumer is a [Kafka Streams](https://kafka.apache.org/documentation/streams/) job which can be deployed by itself, or as part of the Metadata Service. +The Metadata Audit Event Consumer is a Spring job which can be deployed by itself, or as part of the Metadata Service. Its main function is to listen to change log events emitted as a result of changes made to the Metadata Graph, converting changes in the metadata model into updates against secondary search & graph indexes (among other things) @@ -15,10 +15,10 @@ Today the job consumes from two important Kafka topics: 2. `MetadataChangeLog_Timeseries_v1` > Where does the name **Metadata Audit Event** come from? Well, history. Previously, this job consumed -> a single `MetadataAuditEvent` topic which has been deprecated and removed from the critical path. Hence, the name! +> a single `MetadataAuditEvent` topic which has been deprecated and removed from the critical path. Hence, the name! ## Pre-requisites -* You need to have [JDK8](https://www.oracle.com/java/technologies/jdk8-downloads.html) +* You need to have [JDK8](https://www.oracle.com/java/technologies/jdk8-downloads.html) installed on your machine to be able to build `DataHub Metadata Service`. ## Build @@ -46,7 +46,7 @@ the application directly from command line after a successful [build](#build): ``` ## Endpoints -Spring boot actuator has been enabled for MAE Application. +Spring boot actuator has been enabled for MAE Application. `healthcheck`, `metrics` and `info` web endpoints are enabled by default. `healthcheck` - http://localhost:9091/actuator/health diff --git a/metadata-jobs/mce-consumer-job/README.md b/metadata-jobs/mce-consumer-job/README.md index 5d911feef76c02..c242899f142565 100644 --- a/metadata-jobs/mce-consumer-job/README.md +++ b/metadata-jobs/mce-consumer-job/README.md @@ -4,10 +4,10 @@ title: "metadata-jobs:mce-consumer-job" # Metadata Change Event Consumer Job -The Metadata Change Event Consumer is a [Kafka Streams](https://kafka.apache.org/documentation/streams/) job which can be deployed by itself, or as part of the Metadata Service. +The Metadata Change Event Consumer is a Spring job which can be deployed by itself, or as part of the Metadata Service. Its main function is to listen to change proposal events emitted by clients of DataHub which request changes to the Metadata Graph. It then applies -these requests against DataHub's storage layer: the Metadata Service. +these requests against DataHub's storage layer: the Metadata Service. Today the job consumes from two topics: @@ -62,7 +62,7 @@ listen on port 5005 for a remote debugger. ``` ## Endpoints -Spring boot actuator has been enabled for MCE Application. +Spring boot actuator has been enabled for MCE Application. `healthcheck`, `metrics` and `info` web endpoints are enabled by default. `healthcheck` - http://localhost:9090/actuator/health From df96e89557bea8f11a6697263cf83b297716998c Mon Sep 17 00:00:00 2001 From: John Joyce Date: Thu, 1 Dec 2022 15:25:52 -0800 Subject: [PATCH 07/11] refactor(ui): Improving Kafka UI Ingestion Form, Create Domain, Create Secret Modals (#6588) --- .../src/app/domain/CreateDomainModal.tsx | 28 +++---- .../app/ingest/secret/SecretBuilderModal.tsx | 76 +++++++------------ .../source/builder/RecipeForm/DictField.tsx | 1 + .../source/builder/RecipeForm/FormField.tsx | 21 +++-- .../source/builder/RecipeForm/RecipeForm.tsx | 5 ++ .../SecretField/CreateSecretButton.tsx | 8 +- .../RecipeForm/SecretField/SecretField.tsx | 34 ++++++--- .../source/builder/RecipeForm/common.tsx | 21 +++-- .../ingest/source/builder/RecipeForm/kafka.ts | 39 +++++++--- .../app/ingest/source/builder/sources.json | 2 +- .../src/app/ingest/source/conf/kafka/kafka.ts | 1 + 11 files changed, 133 insertions(+), 103 deletions(-) diff --git a/datahub-web-react/src/app/domain/CreateDomainModal.tsx b/datahub-web-react/src/app/domain/CreateDomainModal.tsx index 75436a463fbe81..629298e47958ba 100644 --- a/datahub-web-react/src/app/domain/CreateDomainModal.tsx +++ b/datahub-web-react/src/app/domain/CreateDomainModal.tsx @@ -29,15 +29,9 @@ const DESCRIPTION_FIELD_NAME = 'description'; export default function CreateDomainModal({ onClose, onCreate }: Props) { const [createDomainMutation] = useCreateDomainMutation(); - const [createButtonEnabled, setCreateButtonEnabled] = useState(true); + const [createButtonEnabled, setCreateButtonEnabled] = useState(false); const [form] = Form.useForm(); - const setStagedName = (name) => { - form.setFieldsValue({ - name, - }); - }; - const onCreateDomain = () => { createDomainMutation({ variables: { @@ -88,7 +82,7 @@ export default function CreateDomainModal({ onClose, onCreate }: Props) { - @@ -98,9 +92,9 @@ export default function CreateDomainModal({ onClose, onCreate }: Props) { form={form} initialValues={{}} layout="vertical" - onFieldsChange={() => - setCreateButtonEnabled(form.getFieldsError().some((field) => field.errors.length > 0)) - } + onFieldsChange={() => { + setCreateButtonEnabled(!form.getFieldsError().some((field) => field.errors.length > 0)); + }} > Name}> Give your new Domain a name. @@ -121,7 +115,15 @@ export default function CreateDomainModal({ onClose, onCreate }: Props) { {SUGGESTED_DOMAIN_NAMES.map((name) => { return ( - setStagedName(name)}> + { + form.setFieldsValue({ + name, + }); + setCreateButtonEnabled(true); + }} + > {name} ); @@ -137,7 +139,7 @@ export default function CreateDomainModal({ onClose, onCreate }: Props) { rules={[{ whitespace: true }, { min: 1, max: 500 }]} hasFeedback > - + diff --git a/datahub-web-react/src/app/ingest/secret/SecretBuilderModal.tsx b/datahub-web-react/src/app/ingest/secret/SecretBuilderModal.tsx index 92bdce6d52ac66..539eef972608c6 100644 --- a/datahub-web-react/src/app/ingest/secret/SecretBuilderModal.tsx +++ b/datahub-web-react/src/app/ingest/secret/SecretBuilderModal.tsx @@ -3,6 +3,10 @@ import React, { useState } from 'react'; import { useEnterKeyListener } from '../../shared/useEnterKeyListener'; import { SecretBuilderState } from './types'; +const NAME_FIELD_NAME = 'name'; +const DESCRIPTION_FIELD_NAME = 'description'; +const VALUE_FIELD_NAME = 'value'; + type Props = { initialState?: SecretBuilderState; visible: boolean; @@ -11,38 +15,15 @@ type Props = { }; export const SecretBuilderModal = ({ initialState, visible, onSubmit, onCancel }: Props) => { - const [secretBuilderState, setSecretBuilderState] = useState(initialState || {}); - const [createButtonEnabled, setCreateButtonEnabled] = useState(true); + const [createButtonEnabled, setCreateButtonEnabled] = useState(false); const [form] = Form.useForm(); - const setName = (name: string) => { - setSecretBuilderState({ - ...secretBuilderState, - name, - }); - }; - - const setValue = (value: string) => { - setSecretBuilderState({ - ...secretBuilderState, - value, - }); - }; - - const setDescription = (description: string) => { - setSecretBuilderState({ - ...secretBuilderState, - description, - }); - }; - // Handle the Enter press useEnterKeyListener({ querySelectorToExecuteClick: '#createSecretButton', }); function resetValues() { - setSecretBuilderState({}); form.resetFields(); } @@ -60,8 +41,17 @@ export const SecretBuilderModal = ({ initialState, visible, onSubmit, onCancel } @@ -73,7 +63,7 @@ export const SecretBuilderModal = ({ initialState, visible, onSubmit, onCancel } initialValues={initialState} layout="vertical" onFieldsChange={() => - setCreateButtonEnabled(form.getFieldsError().some((field) => field.errors.length > 0)) + setCreateButtonEnabled(!form.getFieldsError().some((field) => field.errors.length > 0)) } > Name}> @@ -81,22 +71,19 @@ export const SecretBuilderModal = ({ initialState, visible, onSubmit, onCancel } Give your secret a name. This is what you'll use to reference the secret from your recipes. - setName(event.target.value)} - /> + Value}> @@ -104,7 +91,7 @@ export const SecretBuilderModal = ({ initialState, visible, onSubmit, onCancel } The value of your secret, which will be encrypted and stored securely within DataHub. - setValue(event.target.value)} - autoComplete="false" - /> + Description}> An optional description to help keep track of your secret. - - setDescription(event.target.value)} - /> + + diff --git a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/DictField.tsx b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/DictField.tsx index f378e9d109e456..e15a83b08e5f65 100644 --- a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/DictField.tsx +++ b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/DictField.tsx @@ -71,6 +71,7 @@ export default function DictField({ field, removeMargin }: Props) { {field.keyField && ( {field.options && ( - {field.options.map((option) => ( {option.label} ))} @@ -76,6 +77,7 @@ function SelectField({ field, removeMargin }: CommonFieldProps) { function DateField({ field, removeMargin }: CommonFieldProps) { return ( void; removeMargin?: boolean; + form: FormInstance; } function FormField(props: Props) { - const { field, secrets, refetchSecrets, removeMargin } = props; + const { field, secrets, refetchSecrets, removeMargin, form } = props; if (field.type === FieldType.LIST) return ; @@ -105,7 +108,13 @@ function FormField(props: Props) { if (field.type === FieldType.SECRET) return ( - + ); if (field.type === FieldType.DICT) return ; @@ -113,12 +122,14 @@ function FormField(props: Props) { const isBoolean = field.type === FieldType.BOOLEAN; let input = ; if (isBoolean) input = ; - if (field.type === FieldType.TEXTAREA) input = ; + if (field.type === FieldType.TEXTAREA) + input = ; const valuePropName = isBoolean ? 'checked' : 'value'; const getValueFromEvent = isBoolean ? undefined : (e) => (e.target.value === '' ? null : e.target.value); return ( secretA.name.localeCompare(secretB.name)) || []; + const [form] = Form.useForm(); function updateFormValues(changedValues: any, allValues: any) { let updatedValues = YAML.parse(displayRecipe); @@ -137,6 +138,7 @@ function RecipeForm(props: Props) { layout="vertical" initialValues={getInitialValues(displayRecipe, allFields)} onFinish={onClickNext} + form={form} onValuesChange={updateFormValues} > @@ -147,6 +149,7 @@ function RecipeForm(props: Props) { secrets={secrets} refetchSecrets={refetchSecrets} removeMargin={i === fields.length - 1} + form={form} /> ))} {CONNECTORS_WITH_TEST_CONNECTION.has(type as string) && ( @@ -184,6 +187,7 @@ function RecipeForm(props: Props) { secrets={secrets} refetchSecrets={refetchSecrets} removeMargin={i === filterFields.length - 1} + form={form} /> @@ -209,6 +213,7 @@ function RecipeForm(props: Props) { secrets={secrets} refetchSecrets={refetchSecrets} removeMargin={i === advancedFields.length - 1} + form={form} /> ))} diff --git a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/SecretField/CreateSecretButton.tsx b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/SecretField/CreateSecretButton.tsx index 01ca495900e400..31024512cbbcc1 100644 --- a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/SecretField/CreateSecretButton.tsx +++ b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/SecretField/CreateSecretButton.tsx @@ -24,10 +24,11 @@ const CreateButton = styled(Button)` `; interface Props { + onSubmit?: (state: SecretBuilderState) => void; refetchSecrets: () => void; } -function CreateSecretButton({ refetchSecrets }: Props) { +function CreateSecretButton({ onSubmit, refetchSecrets }: Props) { const [isCreateModalVisible, setIsCreateModalVisible] = useState(false); const [createSecretMutation] = useCreateSecretMutation(); @@ -42,12 +43,11 @@ function CreateSecretButton({ refetchSecrets }: Props) { }, }) .then(() => { + onSubmit?.(state); setIsCreateModalVisible(false); resetBuilderState(); + message.success({ content: `Created secret!` }); setTimeout(() => refetchSecrets(), 3000); - message.loading({ content: `Loading...`, duration: 3 }).then(() => { - message.success({ content: `Successfully created Secret!` }); - }); }) .catch((e) => { message.destroy(); diff --git a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/SecretField/SecretField.tsx b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/SecretField/SecretField.tsx index ac1fe2166f87b7..08213f891205ae 100644 --- a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/SecretField/SecretField.tsx +++ b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/SecretField/SecretField.tsx @@ -1,5 +1,5 @@ import React, { ReactNode } from 'react'; -import { AutoComplete, Divider, Form } from 'antd'; +import { AutoComplete, Divider, Form, FormInstance } from 'antd'; import styled from 'styled-components/macro'; import { Secret } from '../../../../../../types.generated'; import CreateSecretButton from './CreateSecretButton'; @@ -52,6 +52,7 @@ interface SecretFieldProps { secrets: Secret[]; removeMargin?: boolean; refetchSecrets: () => void; + form: FormInstance; } function SecretFieldTooltip({ tooltipLabel }: { tooltipLabel?: string | ReactNode }) { @@ -79,11 +80,16 @@ function SecretFieldTooltip({ tooltipLabel }: { tooltipLabel?: string | ReactNod ); } -function SecretField({ field, secrets, removeMargin, refetchSecrets }: SecretFieldProps) { - const options = secrets.map((secret) => ({ value: `\${${secret.name}}`, label: secret.name })); +const encodeSecret = (secretName: string) => { + return `\${${secretName}}`; +}; + +function SecretField({ field, secrets, removeMargin, form, refetchSecrets }: SecretFieldProps) { + const options = secrets.map((secret) => ({ value: encodeSecret(secret.name), label: secret.name })); return ( !!option?.value.toLowerCase().includes(input.toLowerCase())} + notFoundContent={<>No secrets found} options={options} - dropdownRender={(menu) => ( - <> - {menu} - - - - )} + dropdownRender={(menu) => { + return ( + <> + {menu} + + + form.setFields([{ name: field.name, value: encodeSecret(state.name as string) }]) + } + refetchSecrets={refetchSecrets} + /> + + ); + }} /> ); diff --git a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/common.tsx b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/common.tsx index 56a22f832aa627..359ca217c9f5d6 100644 --- a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/common.tsx +++ b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/common.tsx @@ -25,6 +25,7 @@ export interface RecipeField { type: FieldType; fieldPath: string | string[]; rules: any[] | null; + required?: boolean; // Today, Only makes a difference on Selects section?: string; options?: Option[]; buttonLabel?: string; @@ -55,13 +56,11 @@ function clearFieldAndParents(recipe: any, fieldPath: string | string[]) { } export function setFieldValueOnRecipe(recipe: any, value: any, fieldPath: string | string[]) { const updatedRecipe = { ...recipe }; - if (value !== undefined) { - if (value === null || value === '') { - clearFieldAndParents(updatedRecipe, fieldPath); - return updatedRecipe; - } - set(updatedRecipe, fieldPath, value); + if (value === null || value === '' || value === undefined) { + clearFieldAndParents(updatedRecipe, fieldPath); + return updatedRecipe; } + set(updatedRecipe, fieldPath, value); return updatedRecipe; } @@ -267,7 +266,7 @@ export const INCLUDE_TABLE_LINEAGE: RecipeField = { export const PROFILING_ENABLED: RecipeField = { name: 'profiling.enabled', label: 'Enable Profiling', - tooltip: 'Whether profiling should be done.', + tooltip: 'Whether profiling should be performed on the assets extracted from the ingestion source.', type: FieldType.BOOLEAN, fieldPath: 'source.config.profiling.enabled', rules: null, @@ -276,7 +275,7 @@ export const PROFILING_ENABLED: RecipeField = { export const STATEFUL_INGESTION_ENABLED: RecipeField = { name: 'stateful_ingestion.enabled', label: 'Enable Stateful Ingestion', - tooltip: 'Remove stale datasets from datahub once they have been deleted in the source.', + tooltip: 'Remove stale assets from DataHub once they have been deleted in the ingestion source.', type: FieldType.BOOLEAN, fieldPath: 'source.config.stateful_ingestion.enabled', rules: null, @@ -322,7 +321,7 @@ export const TABLE_LINEAGE_MODE: RecipeField = { export const INGEST_TAGS: RecipeField = { name: 'ingest_tags', label: 'Ingest Tags', - tooltip: 'Ingest Tags from source. This will override Tags entered from UI', + tooltip: 'Ingest Tags from the source. Be careful: This can override Tags entered by users of DataHub.', type: FieldType.BOOLEAN, fieldPath: 'source.config.ingest_tags', rules: null, @@ -331,7 +330,7 @@ export const INGEST_TAGS: RecipeField = { export const INGEST_OWNER: RecipeField = { name: 'ingest_owner', label: 'Ingest Owner', - tooltip: 'Ingest Owner from source. This will override Owner info entered from UI', + tooltip: 'Ingest Owner from source. Be careful: This cah override Owners added by users of DataHub.', type: FieldType.BOOLEAN, fieldPath: 'source.config.ingest_owner', rules: null, @@ -391,7 +390,7 @@ export const START_TIME: RecipeField = { name: 'start_time', label: 'Start Time', tooltip: - 'Earliest date of audit logs to process for usage, lineage etc. Default: Last full day in UTC or last time DataHub ingested usage (if stateful ingestion is enabled). Tip: Set this to an older date (e.g. 1 month ago) for your first ingestion run, and then uncheck it for future runs.', + 'Earliest date used when processing audit logs for lineage, usage, and more. Default: Last full day in UTC or last time DataHub ingested usage (if stateful ingestion is enabled). Tip: Set this to an older date (e.g. 1 month ago) to bootstrap your first ingestion run, and then reduce for subsequent runs.', placeholder: 'Select date and time', type: FieldType.DATE, fieldPath: startTimeFieldPath, diff --git a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/kafka.ts b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/kafka.ts index 2b137bcf4d289c..7b30fda0b70ad7 100644 --- a/datahub-web-react/src/app/ingest/source/builder/RecipeForm/kafka.ts +++ b/datahub-web-react/src/app/ingest/source/builder/RecipeForm/kafka.ts @@ -1,10 +1,15 @@ import { RecipeField, FieldType, setListValuesOnRecipe } from './common'; +// TODO: Currently platform_instance is required to be present for stateful ingestion to work +// We need to solve this prior to enabling by default here. + const saslUsernameFieldPath = ['source', 'config', 'connection', 'consumer_config', 'sasl.username']; export const KAFKA_SASL_USERNAME: RecipeField = { name: 'connection.consumer_config.sasl.username', label: 'Username', - tooltip: 'SASL username. You can get (in the Confluent UI) from your cluster -> Data Integration -> API Keys.', + placeholder: 'datahub-client', + tooltip: + 'The SASL username. Required if the Security Protocol is SASL based. In the Confluent Control Center, you can find this in Cluster > Data Integration > API Keys.', type: FieldType.TEXT, fieldPath: saslUsernameFieldPath, rules: null, @@ -14,7 +19,9 @@ const saslPasswordFieldPath = ['source', 'config', 'connection', 'consumer_confi export const KAFKA_SASL_PASSWORD: RecipeField = { name: 'connection.consumer_config.sasl.password', label: 'Password', - tooltip: 'SASL password. You can get (in the Confluent UI) from your cluster -> Data Integration -> API Keys.', + placeholder: 'datahub-client-password', + tooltip: + 'The SASL Password. Required if the Security Protocol is SASL based. In the Confluent Control Center, you can find this in Cluster > Data Integration > API Keys.', type: FieldType.SECRET, fieldPath: saslPasswordFieldPath, rules: null, @@ -22,8 +29,10 @@ export const KAFKA_SASL_PASSWORD: RecipeField = { export const KAFKA_BOOTSTRAP: RecipeField = { name: 'connection.bootstrap', - label: 'Connection Bootstrap', - tooltip: 'Bootstrap URL.', + label: 'Bootstrap Servers', + required: true, + tooltip: + 'The ‘host[:port]’ string (or list of ‘host[:port]’ strings) that we should contact to bootstrap initial cluster metadata.', placeholder: 'abc-defg.eu-west-1.aws.confluent.cloud:9092', type: FieldType.TEXT, fieldPath: 'source.config.connection.bootstrap', @@ -33,7 +42,8 @@ export const KAFKA_BOOTSTRAP: RecipeField = { export const KAFKA_SCHEMA_REGISTRY_URL: RecipeField = { name: 'connection.schema_registry_url', label: 'Schema Registry URL', - tooltip: 'URL where your Confluent Cloud Schema Registry is hosted.', + tooltip: + 'The URL where the schema Schema Registry is hosted. If provided, DataHub will attempt to extract Avro and Protobuf topic schemas from the registry.', placeholder: 'https://abc-defgh.us-east-2.aws.confluent.cloud', type: FieldType.TEXT, fieldPath: 'source.config.connection.schema_registry_url', @@ -51,7 +61,7 @@ export const KAFKA_SCHEMA_REGISTRY_USER_CREDENTIAL: RecipeField = { name: 'schema_registry_config.basic.auth.user.info', label: 'Schema Registry Credentials', tooltip: - 'API credentials for Confluent schema registry which you get (in Confluent UI) from Schema Registry -> API credentials.', + 'API credentials for the Schema Registry. In Confluent Control Center, you can find these under Schema Registry > API Credentials.', // eslint-disable-next-line no-template-curly-in-string placeholder: '${REGISTRY_API_KEY_ID}:${REGISTRY_API_KEY_SECRET}', type: FieldType.TEXT, @@ -63,13 +73,16 @@ const securityProtocolFieldPath = ['source', 'config', 'connection', 'consumer_c export const KAFKA_SECURITY_PROTOCOL: RecipeField = { name: 'security.protocol', label: 'Security Protocol', - tooltip: 'Security Protocol', + tooltip: 'The Security Protocol used for authentication.', type: FieldType.SELECT, + required: true, fieldPath: securityProtocolFieldPath, rules: null, options: [ + { label: 'PLAINTEXT', value: 'PLAINTEXT' }, { label: 'SASL_SSL', value: 'SASL_SSL' }, { label: 'SASL_PLAINTEXT', value: 'SASL_PLAINTEXT' }, + { label: 'SSL', value: 'SSL' }, ], }; @@ -77,9 +90,11 @@ const saslMechanismFieldPath = ['source', 'config', 'connection', 'consumer_conf export const KAFKA_SASL_MECHANISM: RecipeField = { name: 'sasl.mechanism', label: 'SASL Mechanism', - tooltip: 'SASL Mechanism', + tooltip: + 'The SASL mechanism used for authentication. This field is required if the selected Security Protocol is SASL based.', type: FieldType.SELECT, fieldPath: saslMechanismFieldPath, + placeholder: 'None', rules: null, options: [ { label: 'PLAIN', value: 'PLAIN' }, @@ -92,12 +107,12 @@ const topicAllowFieldPath = 'source.config.topic_patterns.allow'; export const TOPIC_ALLOW: RecipeField = { name: 'topic_patterns.allow', label: 'Allow Patterns', - tooltip: 'Use regex here.', + tooltip: 'Provide an optional Regular Expresssion (REGEX) to include specific Kafka Topic names in ingestion.', type: FieldType.LIST, buttonLabel: 'Add pattern', fieldPath: topicAllowFieldPath, rules: null, - section: 'Topics', + section: 'Filter by Topic', setValueOnRecipeOverride: (recipe: any, values: string[]) => setListValuesOnRecipe(recipe, values, topicAllowFieldPath), }; @@ -106,12 +121,12 @@ const topicDenyFieldPath = 'source.config.topic_patterns.deny'; export const TOPIC_DENY: RecipeField = { name: 'topic_patterns.deny', label: 'Deny Patterns', - tooltip: 'Use regex here.', + tooltip: 'Provide an optional Regular Expresssion (REGEX) to exclude specific Kafka Topic names from ingestion.', type: FieldType.LIST, buttonLabel: 'Add pattern', fieldPath: topicDenyFieldPath, rules: null, - section: 'Topics', + section: 'Filter by Topic', setValueOnRecipeOverride: (recipe: any, values: string[]) => setListValuesOnRecipe(recipe, values, topicDenyFieldPath), }; diff --git a/datahub-web-react/src/app/ingest/source/builder/sources.json b/datahub-web-react/src/app/ingest/source/builder/sources.json index 6f70b40901d86d..10e3176b41a7fe 100644 --- a/datahub-web-react/src/app/ingest/source/builder/sources.json +++ b/datahub-web-react/src/app/ingest/source/builder/sources.json @@ -25,7 +25,7 @@ "name": "kafka", "displayName": "Kafka", "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/kafka/", - "recipe": "source:\n type: kafka\n config:\n connection:\n consumer_config:\n security.protocol: \"SASL_SSL\"\n sasl.mechanism: \"PLAIN\"\n stateful_ingestion:\n enabled: true'" + "recipe": "source:\n type: kafka\n config:\n connection:\n consumer_config:\n security.protocol: \"PLAINTEXT\"\n stateful_ingestion:\n enabled: false" }, { "urn": "urn:li:dataPlatform:looker", diff --git a/datahub-web-react/src/app/ingest/source/conf/kafka/kafka.ts b/datahub-web-react/src/app/ingest/source/conf/kafka/kafka.ts index 70a9f8bdae46e2..6926d54a03f58f 100644 --- a/datahub-web-react/src/app/ingest/source/conf/kafka/kafka.ts +++ b/datahub-web-react/src/app/ingest/source/conf/kafka/kafka.ts @@ -11,6 +11,7 @@ source: sasl.mechanism: "PLAIN" stateful_ingestion: enabled: true + `; export const KAFKA = 'kafka'; From 308b4eae87b20fa1a557e78ac0cc0329f63e6e4e Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 1 Dec 2022 22:33:10 -0500 Subject: [PATCH 08/11] fix(ingest): clarify tableau auth error messages (#6600) --- metadata-ingestion/src/datahub/ingestion/source/tableau.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index 726878e2627f87..9434381ff6e3c3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -186,10 +186,12 @@ def make_tableau_client(self) -> Server: return server except ServerResponseError as e: raise ValueError( - f"Unable to login with credentials provided: {str(e)}" + f"Unable to login (invalid credentials or missing permissions): {str(e)}" ) from e except Exception as e: - raise ValueError(f"Unable to login: {str(e)}") from e + raise ValueError( + f"Unable to login (check your Tableau connection and credentials): {str(e)}" + ) from e class TableauConfig( From 44c2749510fe6f864b5747cb5db95456aa524569 Mon Sep 17 00:00:00 2001 From: Nick Wu Date: Thu, 1 Dec 2022 20:29:59 -0800 Subject: [PATCH 09/11] docs(graphql): fix deleteTest "Create"->"Delete" (#6574) Co-authored-by: Aseem Bansal --- datahub-graphql-core/src/main/resources/tests.graphql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datahub-graphql-core/src/main/resources/tests.graphql b/datahub-graphql-core/src/main/resources/tests.graphql index f40cd02f0172bd..9dce48ac60d834 100644 --- a/datahub-graphql-core/src/main/resources/tests.graphql +++ b/datahub-graphql-core/src/main/resources/tests.graphql @@ -225,7 +225,7 @@ extend type Mutation { updateTest(urn: String!, input: UpdateTestInput!): String """ - Create an existing test - note that this will NOT delete dangling pointers until the next execution of the test. + Delete an existing test - note that this will NOT delete dangling pointers until the next execution of the test. """ deleteTest(urn: String!): Boolean -} \ No newline at end of file +} From 6e31754594c3295c8e715c35b630580adc690e3a Mon Sep 17 00:00:00 2001 From: Tim Costa Date: Thu, 1 Dec 2022 22:33:54 -0600 Subject: [PATCH 10/11] fix: remove set -x from start script (#6589) Co-authored-by: Tim Costa --- docker/datahub-gms/start.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/datahub-gms/start.sh b/docker/datahub-gms/start.sh index 44dfdff02ac1fd..38225f67845ffd 100755 --- a/docker/datahub-gms/start.sh +++ b/docker/datahub-gms/start.sh @@ -1,5 +1,5 @@ #!/bin/bash -set -x + # Add default URI (http) scheme if needed if ! echo $NEO4J_HOST | grep -q "://" ; then NEO4J_HOST="http://$NEO4J_HOST" From 4f7b5ac5e690534721e54260cd9d17d6d39de7b8 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Fri, 2 Dec 2022 09:13:47 +0000 Subject: [PATCH 11/11] feat(sql): Add SQL index on createdon field (#6522) --- docker/mariadb/init.sql | 2 ++ docker/mysql-setup/init.sql | 3 ++- docker/mysql/init.sql | 3 ++- docker/postgres-setup/init.sql | 2 ++ docker/postgres/init.sql | 2 ++ 5 files changed, 10 insertions(+), 2 deletions(-) diff --git a/docker/mariadb/init.sql b/docker/mariadb/init.sql index 084fdc93a3717a..c4132575cf442c 100644 --- a/docker/mariadb/init.sql +++ b/docker/mariadb/init.sql @@ -11,6 +11,8 @@ create table metadata_aspect_v2 ( constraint pk_metadata_aspect_v2 primary key (urn,aspect,version) ); +create index timeIndex ON metadata_aspect_v2 (createdon); + insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby) values( 'urn:li:corpuser:datahub', 'corpUserInfo', diff --git a/docker/mysql-setup/init.sql b/docker/mysql-setup/init.sql index 6bd7133a359a89..78098af4648bce 100644 --- a/docker/mysql-setup/init.sql +++ b/docker/mysql-setup/init.sql @@ -12,7 +12,8 @@ create table if not exists metadata_aspect_v2 ( createdon datetime(6) not null, createdby varchar(255) not null, createdfor varchar(255), - constraint pk_metadata_aspect_v2 primary key (urn,aspect,version) + constraint pk_metadata_aspect_v2 primary key (urn,aspect,version), + INDEX timeIndex (createdon) ); -- create default records for datahub user if not exists diff --git a/docker/mysql/init.sql b/docker/mysql/init.sql index fa9d856f499e4e..97ae3ea1467445 100644 --- a/docker/mysql/init.sql +++ b/docker/mysql/init.sql @@ -8,7 +8,8 @@ CREATE TABLE metadata_aspect_v2 ( createdon datetime(6) NOT NULL, createdby VARCHAR(255) NOT NULL, createdfor VARCHAR(255), - CONSTRAINT pk_metadata_aspect_v2 PRIMARY KEY (urn,aspect,version) + constraint pk_metadata_aspect_v2 primary key (urn,aspect,version), + INDEX timeIndex (createdon) ) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin; INSERT INTO metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby) VALUES( diff --git a/docker/postgres-setup/init.sql b/docker/postgres-setup/init.sql index e7c515e7385acc..12fff7aec7fe6f 100644 --- a/docker/postgres-setup/init.sql +++ b/docker/postgres-setup/init.sql @@ -11,6 +11,8 @@ CREATE TABLE IF NOT EXISTS metadata_aspect_v2 ( CONSTRAINT pk_metadata_aspect_v2 PRIMARY KEY (urn, aspect, version) ); +create index timeIndex ON metadata_aspect_v2 (createdon); + -- create default records for datahub user if not exists CREATE TEMP TABLE temp_metadata_aspect_v2 AS TABLE metadata_aspect_v2; INSERT INTO temp_metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby) VALUES( diff --git a/docker/postgres/init.sql b/docker/postgres/init.sql index 72298ed4b6726d..4da8adaf8a6da0 100644 --- a/docker/postgres/init.sql +++ b/docker/postgres/init.sql @@ -11,6 +11,8 @@ create table metadata_aspect_v2 ( constraint pk_metadata_aspect_v2 primary key (urn,aspect,version) ); +create index timeIndex ON metadata_aspect_v2 (createdon); + insert into metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby) values( 'urn:li:corpuser:datahub', 'corpUserInfo',