-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dataplex operators #20377
Dataplex operators #20377
Conversation
if task_status == TaskState.DELETING: | ||
raise AirflowException(f"Task is going to be deleted {self.dataplex_task_id}") | ||
|
||
self.log.info(f"Current status of the Dataplex task {self.dataplex_task_id} => {task_status}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: We prefer to use "%" format for logs, otherwise the string interpolation will be executed indepnendently of the logging level set (yep. I know INFO is default, but it can be changed to ERROR and then it is unnecessary to interpolate it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, noted
# [END howto_dataplex_configuration] | ||
|
||
with models.DAG( | ||
"example_dataplex", start_date=datetime.datetime(2021, 1, 1), schedule_interval="@once" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind adding catchup=False
? This has been added to all example DAGs to ward off any unexpected DagRuns for users if they copy this DAG for their use and modify start_date
or schedule_interval
without knowing about the catchup
functionality.
|
||
class DataplexHook(GoogleBaseHook): | ||
"""Hook for Google Dataplex.""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add parameter/type info in the docstring for this hook? It would be great to see these in the Airflow API documentation which is generated by the docstring.
delegate_to=delegate_to, | ||
impersonation_chain=impersonation_chain, | ||
) | ||
self.api_key = API_KEY |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this something that users should be able to configure in an Airflow Connection and passed to the hook or is the idea that this key must be configured within the environment itself?
If the latter, should there be a validation here to check that the api_key
was provided rather than have the "INVALID API KEY" default value? Or is this default value checked somewhere downstream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Airflow Connection I set standard Keyfile Path - pointing to: /files/airflow-breeze-config/keys/<KEY_FILE_NAME>.json On the other hand we have to set an API Key in Credentials on GCP side to connect with discovery API - otherwise we can not perform operations.
I have not seen an option to set this value inside the Airflow Connection - so I used an environment variable. I am open to any suggestions about where it should be stored and any other improvements.
self.impersonation_chain = impersonation_chain | ||
self.asynchronous = asynchronous | ||
|
||
def execute(self, context: dict) -> dict: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that 2.2.3 has been released, typing for context
can be:
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from airflow.utils.context import Context
...
def execute(self, context: "Context") -> dict:
...
This can be applied to all of the operators too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL 🚀 Thanks @josh-fell
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I applied it globally during the Xmas break. Just wonder. Maybe we should add a pre-commit checking if the "old ways" are still used. WDYT @turbaszek @josh-fell ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Definitely worth automating.
self.delegate_to = delegate_to | ||
self.impersonation_chain = impersonation_chain | ||
|
||
def poke(self, context: dict) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here about context
typing.
from airflow.exceptions import AirflowException | ||
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook | ||
|
||
API_KEY = os.environ.get("GCP_API_KEY", "INVALID API KEY") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, API Key is needed to perform operations on dataplex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The authentication should be provided via dedicated connection. And as far as I remember GoogleBaseHook
already provides all authentication methods supported by Google. If this is something only Dataplex specific we should introduce a new connection type. In this way users will have full control over the credentials. See for example google ads:
airflow/airflow/providers/google/ads/hooks/ads.py
Lines 43 to 68 in 7947b72
This hook requires two connections: | |
- gcp_conn_id - provides service account details (like any other GCP connection) | |
- google_ads_conn_id - which contains information from Google Ads config.yaml file | |
in the ``extras``. Example of the ``extras``: | |
.. code-block:: json | |
{ | |
"google_ads_client": { | |
"developer_token": "{{ INSERT_TOKEN }}", | |
"path_to_private_key_file": null, | |
"delegated_account": "{{ INSERT_DELEGATED_ACCOUNT }}" | |
} | |
} | |
The ``path_to_private_key_file`` is resolved by the hook using credentials from gcp_conn_id. | |
https://developers.google.com/google-ads/api/docs/client-libs/python/oauth-service | |
.. seealso:: | |
For more information on how Google Ads authentication flow works take a look at: | |
https://developers.google.com/google-ads/api/docs/client-libs/python/oauth-service | |
.. seealso:: | |
For more information on the Google Ads API, take a look at the API docs: | |
https://developers.google.com/google-ads/api/docs/start |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed within the team, for now I am going to remove API_KEY - it was needed for development purposes. Once the Dataplex API will be publicly available it will not be needed any more. I will commit changes and then draft this PR.
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
6bf36e6
to
904f091
Compare
904f091
to
e7b7abf
Compare
e7b7abf
to
de5162b
Compare
4eda728
to
d93fbb5
Compare
@mik-laj @vikramkoka guys could you do a review, would be great. Thank you |
d93fbb5
to
14c857d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me 👌
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
It needs at least rebase and checking if the errors were accidental. |
14c857d
to
96a62fd
Compare
Add support for Google Dataplex. Includes operators, sensors, hooks, example dags, tests and docs.
Authored-by: Wojciech Januszek [email protected]
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.