-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-5477] Rewrite Google PubSub Hook to Google Cloud Python #6096
[AIRFLOW-5477] Rewrite Google PubSub Hook to Google Cloud Python #6096
Conversation
123eb84
to
e019de8
Compare
e019de8
to
61a8117
Compare
Codecov Report
@@ Coverage Diff @@
## master #6096 +/- ##
==========================================
- Coverage 80.01% 79.45% -0.56%
==========================================
Files 610 610
Lines 35176 35286 +110
==========================================
- Hits 28147 28038 -109
- Misses 7029 7248 +219
Continue to review full report at Codecov.
|
airflow/gcp/hooks/pubsub.py
Outdated
) | ||
except NotFound: | ||
message = 'Topic does not exist: {}'.format(topic_path) | ||
self.log.warning(message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid formatting string before passing to logger.
Reference
https://github.com/apache/airflow/pull/4804/files
#5681
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
airflow/gcp/hooks/pubsub.py
Outdated
data=message.get("data", b''), | ||
**message.get('attributes', {}) | ||
) | ||
except GoogleAPICallError as e: | ||
raise PubSubException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part of code is not covered by unit test. Can you complete missing unit tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added test.
self.log.warning(message) | ||
if fail_if_exists: | ||
raise PubSubException(message) | ||
except GoogleAPICallError as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part of code is not covered by unit test. Can you complete missing unit tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added test.
airflow/gcp/hooks/pubsub.py
Outdated
except HttpError as e: | ||
raise PubSubException( | ||
'Error pulling messages from subscription {}'.format( | ||
full_subscription), e) | ||
subscription_path), e) | ||
except GoogleAPICallError as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part of code is not covered by unit test. Can you complete missing unit tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added test.
@mik-laj I've applied changes. Please verify is correct. |
@TobKed Today we have had various conversations about this integration. Is this PR done? |
bd5d94a
to
abe0842
Compare
abe0842
to
5afb4d0
Compare
@mik-laj Yes, hopefully PR is done. I've applied all necessary patches. |
""" | ||
template_fields = ['project', 'subscription'] | ||
template_fields = ['project_id', 'subscription'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
template_fields = ['project_id', 'subscription'] | |
template_fields = ['project', 'project_id', 'subscription'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to revert these changes (only "project"
shall remain) because of the errors due to errors:
File "/workspace/airflow/models/baseoperator.py", line 662, in _do_render_template_fields content = getattr(parent, attr_name) AttributeError: 'PubSubPublishOperator' object has no attribute 'project'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be OK. Maybe (nice to have) we could add a warning in deprecation message, that templates won't work ?
""" | ||
template_fields = ['project', 'topic', 'messages'] | ||
template_fields = ['project_id', 'topic', 'messages'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
template_fields = ['project_id', 'topic', 'messages'] | |
template_fields = ['project', 'project_id', 'topic', 'messages'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to revert these changes (only
"project"
shall remain) because of the errors due to errors:
File "/workspace/airflow/models/baseoperator.py", line 662, in _do_render_template_fields content = getattr(parent, attr_name) AttributeError: 'PubSubPublishOperator' object has no attribute 'project'
Can you do rebase? |
2f0dd74
to
c6241b7
Compare
Rebased on the last |
c6241b7
to
41f9b95
Compare
Everything looks good, but can you do rebase? |
41f9b95
to
a5410d7
Compare
I've rebased but due to fact than previous commits impacted PubSub significantly I had to make some changes (remove |
I've updated |
airflow/gcp/sensors/pubsub.py
Outdated
) | ||
|
||
self._messages = [MessageToDict(m) for m in pulled_messages] | ||
|
||
if self._messages and self.ack_messages: | ||
if self.ack_messages: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self.ack_messages: |
This is a double assertion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Thanks, I've fixed it.
bed237f
to
64f7bba
Compare
Updated due to comments, slightly improved test, rebased on last master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one thing - It seems that the messages should not be base64 encoded when using the client (see example here: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/pubsub/cloud-client/publisher.py). This means that in all tests, it should be fixed and comments should be updated. Also UPDATING.Md should mention that this should be changed in the input data (or maybe even we should somehow provide backwards compatibility and detect if someone passes a string rather than bytes and decode/encode appropriately).
airflow/gcp/hooks/pubsub.py
Outdated
publisher = self.get_conn() | ||
topic_path = PublisherClient.topic_path(project_id, topic) # pylint: disable=no-member | ||
|
||
# TODO validation of messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we leave that TODO here? I think we might already check it here simply to check if we can 'base64' decode What kind of validation we think we miss here? I think we already handle case where "data" is missing. I believe the publish method must check it already if the data is correctly encoded. If so, we should rather remove this TODO. Otherwise we should check ourselves what is the type of "data" field in the loop. I believe
isinstance(data, bytes)
for python 3+ should work fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment should not be present here.
I have separate PR for message validation waiting in queue:
https://github.com/PolideaInternal/airflow/pull/304/commits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the last fixup I've included validation and deprecation warning.
airflow/gcp/hooks/pubsub.py
Outdated
""" | ||
Publishes messages to a Pub/Sub topic. | ||
|
||
:param project: the GCP project ID in which to publish | ||
:type project: str | ||
:param topic: the Pub/Sub topic to which to publish; do not | ||
include the ``projects/{project}/topics/`` prefix. | ||
:type topic: str | ||
:param messages: messages to publish; if the data field in a | ||
message is set, it should already be base64 encoded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment is wrong now. According to https://cloud.google.com/pubsub/docs/publisher (and the code below) the data parameter should be bytestring (utf-8 encoded) rather than base64 encoded string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Great point! I've updated docstrings.
:param topic: the Pub/Sub topic name to create; do not | ||
include the ``projects/{project}/topics/`` prefix. | ||
:type topic: str | ||
:param project_id: Optional, the GCP project ID in which to create the topic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool with all those params below. Looks like fully-featured implementation now 👍
|
||
# Add airflow-version label to the topic | ||
labels = labels or {} | ||
labels['airflow-version'] = 'v' + version.replace('.', '-').replace('+', '-') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
ack_deadline_secs: int = 10, | ||
fail_if_exists: bool = False, | ||
push_config: Optional[Union[Dict, PushConfig]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here 👍
*args, | ||
**kwargs) -> None: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice that we keep backwards compatibility.
""" | ||
template_fields = ['project', 'subscription'] | ||
template_fields = ['project_id', 'subscription'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be OK. Maybe (nice to have) we could add a warning in deprecation message, that templates won't work ?
64f7bba
to
b612cb8
Compare
Co-Authored-By: Kamil Breguła <[email protected]>
b612cb8
to
04b9a03
Compare
@potiuk I've added message validation. Now when user pass message data as a string which can be base64 decoded, an additional deprecation warning will be displayed, afterwards |
04b9a03
to
9db5c94
Compare
Make sure you have checked all steps below.
Jira
Description
Tests
Commits
Documentation