Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add druid ingestion connection test #33796

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions airflow/providers/apache/druid/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@
Changelog
---------

4.0.0
.....

Features
~~~~~~~~

* ``Add druid ingestion connection test And expose druid ingest hook (#33795)``

.. note::
The connection type of Druid has been separated into two(druid_broker, druid_ingest)
Comment on lines +38 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please elaborate on this one. We should explain to users how to mitigate the issue (leave instructions how to migrate to this new version)

Copy link
Contributor Author

@jaegwonseo jaegwonseo Sep 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal
I'm not an English speaker, so I'm not sure if the following mention is accurate. Please review this

The connection type of Druid has been separated into two(Druid Broker, Druid Ingest). Please perform one of the two methods listed below.
  1. update druid connect type to (Druid Broker, Druid Ingest) on UI screen (Admin/Connections)
  2. run command airflow db migrate

but in second case, airflow db migrate doesn't support update connect type so we need implement this.
my suggestion is write first option only, and add an implementation to check the version of the Druid provider in the db migrate process. how about do this in other issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, we don't have connection migrations, and with current release model (that doesn't mean it is bad) I don't think it even possible to implement it. All migration happen only on upgrade Airflow Core, not Providers.

Just some examples of the past breaking changes and their descriptions:

  1. Remove AWS S3 Connection (spoiler alert, it does impact some users)
    https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/changelog.html#id66

  2. Removed deprecated fields in Slack Provider (still wait when users starts complain)
    https://airflow.apache.org/docs/apache-airflow-providers-slack/stable/changelog.html#breaking-changes

  3. Deprecated parameter removed from Google Provider
    https://airflow.apache.org/docs/apache-airflow-providers-google/stable/changelog.html#breaking-changes

The main point of this breaking changes. When user open issue/discussion in Github or write in slack it much easier to send them link to Breaking Changes, rather then try to figure out how to solve their issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal @Taragolis
Taragolis thanks for adding example link
in this pr the only concern point is changing connection type. So, I think this mention is enough.

In this version of provider Apache Druid Connection(conn_type="druid") has been separated into two(Druid Broker, Druid Ingest)
Update connect type(conn_type = "druid") to druid broker, druid ingest 

Copy link
Contributor

@eladkal eladkal Oct 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of the note is to explain what actions needed to be done in order to migrate.
The note you listed says the conn is split in two so:

  1. How do I choose which one? (Or/And what is the motivation to the split)
  2. If I want to have my code work exactly as it is before what steps I need to do?

These are the two questions the notes must answer


---------

3.5.0
.....

Expand Down
25 changes: 22 additions & 3 deletions airflow/providers/apache/druid/hooks/druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,14 @@ class DruidHook(BaseHook):
:param max_ingestion_time: The maximum ingestion time before assuming the job failed
"""

conn_name_attr = "druid_ingest_conn_id"
default_conn_name = "druid_ingest_default"
conn_type = "druid_ingest"
hook_name = "Druid Ingest"

def __init__(
self,
druid_ingest_conn_id: str = "druid_ingest_default",
druid_ingest_conn_id: str = default_conn_name,
timeout: int = 1,
max_ingestion_time: int | None = None,
) -> None:
Expand Down Expand Up @@ -149,6 +154,20 @@ def submit_indexing_job(

self.log.info("Successful index")

def test_connection(self) -> tuple[bool, str]:
try:
conn = self.get_connection(self.druid_ingest_conn_id)
host = conn.host
port = conn.port
# ref : https://druid.apache.org/docs/latest/operations/api-reference/#tasks
response = requests.get(f"http://{host}:{port}/druid/indexer/v1/tasks")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the main reasons for connection and test_connection method is storing the credentials securely and test them via this method, so IMHO this method should test if the credentials are valid or not. You can try something like:

Suggested change
response = requests.get(f"http://{host}:{port}/druid/indexer/v1/tasks")
response = requests.get(
f"http://{host}:{port}/druid/indexer/v1/tasks",
auth=self.get_auth(),
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hussein-awala
The Druid Broker Hook overrides the DbApiHook, but the test connection is not implemented to use authentication information
https://github.com/apache/airflow/blob/main/airflow/providers/common/sql/hooks/sql.py#L550

So, I think adding authentication information to the Druid hook (broker, ingest) is handled in another issue

if response.status_code == 200:
return True, "Connection successfully tested"
else:
return False, response.reason
except Exception as e:
return False, str(e)


class DruidDbApiHook(DbApiHook):
"""
Expand All @@ -160,8 +179,8 @@ class DruidDbApiHook(DbApiHook):

conn_name_attr = "druid_broker_conn_id"
default_conn_name = "druid_broker_default"
conn_type = "druid"
hook_name = "Druid"
conn_type = "druid_broker"
hook_name = "Druid Broker"
supports_autocommit = False

def get_conn(self) -> connect:
Expand Down
5 changes: 4 additions & 1 deletion airflow/providers/apache/druid/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ description: |

suspended: false
versions:
- 4.0.0
- 3.5.0
- 3.4.2
- 3.4.1
Expand Down Expand Up @@ -73,7 +74,9 @@ hooks:

connection-types:
- hook-class-name: airflow.providers.apache.druid.hooks.druid.DruidDbApiHook
connection-type: druid
connection-type: druid_broker
- hook-class-name: airflow.providers.apache.druid.hooks.druid.DruidHook
connection-type: druid_ingest

transfers:
- source-integration-name: Apache Hive
Expand Down
4 changes: 2 additions & 2 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def create_default_connections(session: Session = NEW_SESSION):
merge_conn(
Connection(
conn_id="druid_broker_default",
conn_type="druid",
conn_type="druid_broker",
host="druid-broker",
port=8082,
extra='{"endpoint": "druid/v2/sql"}',
Expand All @@ -234,7 +234,7 @@ def create_default_connections(session: Session = NEW_SESSION):
merge_conn(
Connection(
conn_id="druid_ingest_default",
conn_type="druid",
conn_type="druid_ingest",
host="druid-overlord",
port=8081,
extra='{"endpoint": "druid/indexer/v1/task"}',
Expand Down