From 794401bfde88e2fa4294056a85328f5b7d2afaae Mon Sep 17 00:00:00 2001 From: "jaegwon.seo" Date: Sun, 27 Aug 2023 22:46:20 +0900 Subject: [PATCH 1/5] add druid ingestion hook ad provier.yaml and add test --- airflow/providers/apache/druid/hooks/druid.py | 24 ++++++++++++++++--- airflow/providers/apache/druid/provider.yaml | 4 +++- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/airflow/providers/apache/druid/hooks/druid.py b/airflow/providers/apache/druid/hooks/druid.py index 7708684e60cf5..199f3a2c51216 100644 --- a/airflow/providers/apache/druid/hooks/druid.py +++ b/airflow/providers/apache/druid/hooks/druid.py @@ -55,9 +55,14 @@ class DruidHook(BaseHook): :param max_ingestion_time: The maximum ingestion time before assuming the job failed """ + conn_name_attr = "druid_ingest_conn_id" + default_conn_name = "druid_ingest_default" + conn_type = "druid_ingest" + hook_name = "Druid Ingest" + def __init__( self, - druid_ingest_conn_id: str = "druid_ingest_default", + druid_ingest_conn_id: str = default_conn_name, timeout: int = 1, max_ingestion_time: int | None = None, ) -> None: @@ -149,6 +154,19 @@ def submit_indexing_job( self.log.info("Successful index") + def test_connection(self) -> tuple[bool, str]: + try: + conn = self.get_connection(self.druid_ingest_conn_id) + host = conn.host + port = conn.port + # ref : https://druid.apache.org/docs/latest/operations/api-reference/#tasks + response = requests.get(f"http://{host}:{port}/druid/indexer/v1/tasks") + if response.status_code == 200: + return True, "Connection successfully tested" + else: + return False, response.reason + except Exception as e: + return False, str(e) class DruidDbApiHook(DbApiHook): """ @@ -160,8 +178,8 @@ class DruidDbApiHook(DbApiHook): conn_name_attr = "druid_broker_conn_id" default_conn_name = "druid_broker_default" - conn_type = "druid" - hook_name = "Druid" + conn_type = "druid_broker" + hook_name = "Druid Broker" supports_autocommit = False def get_conn(self) -> connect: diff --git a/airflow/providers/apache/druid/provider.yaml b/airflow/providers/apache/druid/provider.yaml index 90e54e50a4462..3efb336f62259 100644 --- a/airflow/providers/apache/druid/provider.yaml +++ b/airflow/providers/apache/druid/provider.yaml @@ -73,7 +73,9 @@ hooks: connection-types: - hook-class-name: airflow.providers.apache.druid.hooks.druid.DruidDbApiHook - connection-type: druid + connection-type: druid_broker + - hook-class-name: airflow.providers.apache.druid.hooks.druid.DruidHook + connection-type: druid_ingest transfers: - source-integration-name: Apache Hive From 34094f238381fe3b3cb0d653699969b0b0b7aa38 Mon Sep 17 00:00:00 2001 From: "jaegwon.seo" Date: Sun, 27 Aug 2023 22:50:03 +0900 Subject: [PATCH 2/5] modify connection init config --- airflow/utils/db.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index db4631f14867a..72b57caccdbf6 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -224,7 +224,7 @@ def create_default_connections(session: Session = NEW_SESSION): merge_conn( Connection( conn_id="druid_broker_default", - conn_type="druid", + conn_type="druid_broker", host="druid-broker", port=8082, extra='{"endpoint": "druid/v2/sql"}', @@ -234,7 +234,7 @@ def create_default_connections(session: Session = NEW_SESSION): merge_conn( Connection( conn_id="druid_ingest_default", - conn_type="druid", + conn_type="druid_ingest", host="druid-overlord", port=8081, extra='{"endpoint": "druid/indexer/v1/task"}', From cb55e9b9091b394b2b7dcdaa35d5fe096ffebe76 Mon Sep 17 00:00:00 2001 From: "jaegwon.seo" Date: Mon, 28 Aug 2023 23:13:17 +0900 Subject: [PATCH 3/5] apply lint --- airflow/providers/apache/druid/hooks/druid.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/providers/apache/druid/hooks/druid.py b/airflow/providers/apache/druid/hooks/druid.py index 199f3a2c51216..0fb80c3fed0a4 100644 --- a/airflow/providers/apache/druid/hooks/druid.py +++ b/airflow/providers/apache/druid/hooks/druid.py @@ -168,6 +168,7 @@ def test_connection(self) -> tuple[bool, str]: except Exception as e: return False, str(e) + class DruidDbApiHook(DbApiHook): """ Interact with Druid broker. From b58298997b1b3e3ae08c8bde4cb5ae15d5d553b0 Mon Sep 17 00:00:00 2001 From: "jaegwon.seo" Date: Mon, 28 Aug 2023 23:17:56 +0900 Subject: [PATCH 4/5] add change log --- airflow/providers/apache/druid/CHANGELOG.rst | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/airflow/providers/apache/druid/CHANGELOG.rst b/airflow/providers/apache/druid/CHANGELOG.rst index 6a1601e487829..9eeeb3e0200cc 100644 --- a/airflow/providers/apache/druid/CHANGELOG.rst +++ b/airflow/providers/apache/druid/CHANGELOG.rst @@ -27,6 +27,19 @@ Changelog --------- +3.6.0 +..... + +Features +~~~~~~~~ + +* ``Add druid ingestion connection test And expose druid ingest hook (#33795)`` + +.. note:: + The connection type of Druid has been separated into two(druid_broker, druid_ingest) + +--------- + 3.5.0 ..... From c51126c1501da960ac07111cf1845f975494b472 Mon Sep 17 00:00:00 2001 From: "jaegwon.seo" Date: Fri, 1 Sep 2023 23:39:28 +0900 Subject: [PATCH 5/5] modify druid provider version 3.5.0 -> 4.0.0 --- airflow/providers/apache/druid/CHANGELOG.rst | 2 +- airflow/providers/apache/druid/provider.yaml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/providers/apache/druid/CHANGELOG.rst b/airflow/providers/apache/druid/CHANGELOG.rst index 9eeeb3e0200cc..9edcb860f39b2 100644 --- a/airflow/providers/apache/druid/CHANGELOG.rst +++ b/airflow/providers/apache/druid/CHANGELOG.rst @@ -27,7 +27,7 @@ Changelog --------- -3.6.0 +4.0.0 ..... Features diff --git a/airflow/providers/apache/druid/provider.yaml b/airflow/providers/apache/druid/provider.yaml index 3efb336f62259..563fb0015577b 100644 --- a/airflow/providers/apache/druid/provider.yaml +++ b/airflow/providers/apache/druid/provider.yaml @@ -23,6 +23,7 @@ description: | suspended: false versions: + - 4.0.0 - 3.5.0 - 3.4.2 - 3.4.1