Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5477] Rewrite Google PubSub Hook to Google Cloud Python #6096

Merged

Conversation

TobKed
Copy link
Contributor

@TobKed TobKed commented Sep 13, 2019

Make sure you have checked all steps below.

Jira

  • My PR addresses the following Airflow Jira issues and references them in the PR title. For example, "[AIRFLOW-XXX] My Airflow PR"
    • https://issues.apache.org/jira/browse/AIRFLOW-5477
    • In case you are fixing a typo in the documentation you can prepend your commit with [AIRFLOW-XXX], code changes always need a Jira issue.
    • In case you are proposing a fundamental code change, you need to create an Airflow Improvement Proposal (AIP).
    • In case you are adding a dependency, check if the license complies with the ASF 3rd Party License Policy.

Description

  • Here are some details about my PR, including screenshots of any UI changes:

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain docstrings that explain what it does
    • If you implement backwards incompatible changes, please leave a note in the Updating.md so we can assign it to a appropriate release

@mik-laj mik-laj added the provider:google Google (including GCP) related issues label Sep 13, 2019
@TobKed TobKed force-pushed the pubsub-hook-with-google-api-python branch from 123eb84 to e019de8 Compare September 13, 2019 17:55
@mik-laj mik-laj force-pushed the pubsub-hook-with-google-api-python branch from e019de8 to 61a8117 Compare September 13, 2019 19:18
@codecov-io
Copy link

codecov-io commented Sep 13, 2019

Codecov Report

Merging #6096 into master will decrease coverage by 0.55%.
The diff coverage is 92.59%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #6096      +/-   ##
==========================================
- Coverage   80.01%   79.45%   -0.56%     
==========================================
  Files         610      610              
  Lines       35176    35286     +110     
==========================================
- Hits        28147    28038     -109     
- Misses       7029     7248     +219
Impacted Files Coverage Δ
airflow/gcp/example_dags/example_pubsub.py 0% <0%> (ø) ⬆️
airflow/gcp/sensors/pubsub.py 94.28% <83.33%> (-5.72%) ⬇️
airflow/gcp/operators/pubsub.py 90.69% <83.78%> (-9.31%) ⬇️
airflow/gcp/hooks/pubsub.py 99.33% <99.22%> (+5.58%) ⬆️
airflow/operators/postgres_operator.py 0% <0%> (-100%) ⬇️
airflow/operators/mysql_operator.py 0% <0%> (-100%) ⬇️
airflow/operators/mysql_to_hive.py 0% <0%> (-100%) ⬇️
airflow/operators/generic_transfer.py 0% <0%> (-100%) ⬇️
airflow/executors/celery_executor.py 40.74% <0%> (-35.56%) ⬇️
airflow/utils/log/wasb_task_handler.py 32.87% <0%> (-9.59%) ⬇️
... and 14 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 781d001...9db5c94. Read the comment docs.

)
except NotFound:
message = 'Topic does not exist: {}'.format(topic_path)
self.log.warning(message)
Copy link
Member

@mik-laj mik-laj Sep 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid formatting string before passing to logger.
Reference
https://github.com/apache/airflow/pull/4804/files
#5681

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

data=message.get("data", b''),
**message.get('attributes', {})
)
except GoogleAPICallError as e:
raise PubSubException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of code is not covered by unit test. Can you complete missing unit tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added test.

self.log.warning(message)
if fail_if_exists:
raise PubSubException(message)
except GoogleAPICallError as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of code is not covered by unit test. Can you complete missing unit tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added test.

except HttpError as e:
raise PubSubException(
'Error pulling messages from subscription {}'.format(
full_subscription), e)
subscription_path), e)
except GoogleAPICallError as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of code is not covered by unit test. Can you complete missing unit tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added test.

@TobKed
Copy link
Contributor Author

TobKed commented Sep 16, 2019

@mik-laj I've applied changes. Please verify is correct.

@mik-laj
Copy link
Member

mik-laj commented Sep 17, 2019

@TobKed Today we have had various conversations about this integration. Is this PR done?

@TobKed TobKed force-pushed the pubsub-hook-with-google-api-python branch from bd5d94a to abe0842 Compare September 17, 2019 11:29
@TobKed TobKed requested a review from mik-laj September 17, 2019 11:29
@TobKed TobKed force-pushed the pubsub-hook-with-google-api-python branch from abe0842 to 5afb4d0 Compare September 17, 2019 12:00
@TobKed
Copy link
Contributor Author

TobKed commented Sep 17, 2019

@mik-laj Yes, hopefully PR is done. I've applied all necessary patches.

"""
template_fields = ['project', 'subscription']
template_fields = ['project_id', 'subscription']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
template_fields = ['project_id', 'subscription']
template_fields = ['project', 'project_id', 'subscription']

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to revert these changes (only "project" shall remain) because of the errors due to errors:
File "/workspace/airflow/models/baseoperator.py", line 662, in _do_render_template_fields content = getattr(parent, attr_name) AttributeError: 'PubSubPublishOperator' object has no attribute 'project'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be OK. Maybe (nice to have) we could add a warning in deprecation message, that templates won't work ?

"""
template_fields = ['project', 'topic', 'messages']
template_fields = ['project_id', 'topic', 'messages']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
template_fields = ['project_id', 'topic', 'messages']
template_fields = ['project', 'project_id', 'topic', 'messages']

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to revert these changes (only "project" shall remain) because of the errors due to errors:
File "/workspace/airflow/models/baseoperator.py", line 662, in _do_render_template_fields content = getattr(parent, attr_name) AttributeError: 'PubSubPublishOperator' object has no attribute 'project'

@TobKed TobKed requested a review from mik-laj September 18, 2019 13:12
@mik-laj
Copy link
Member

mik-laj commented Sep 18, 2019

Can you do rebase?

@TobKed TobKed force-pushed the pubsub-hook-with-google-api-python branch from 2f0dd74 to c6241b7 Compare September 19, 2019 06:17
@TobKed
Copy link
Contributor Author

TobKed commented Sep 19, 2019

Rebased on the last master.

@TobKed TobKed force-pushed the pubsub-hook-with-google-api-python branch from c6241b7 to 41f9b95 Compare September 19, 2019 10:34
@mik-laj
Copy link
Member

mik-laj commented Sep 20, 2019

Everything looks good, but can you do rebase?

@TobKed TobKed force-pushed the pubsub-hook-with-google-api-python branch from 41f9b95 to a5410d7 Compare September 20, 2019 14:08
@TobKed
Copy link
Contributor Author

TobKed commented Sep 20, 2019

I've rebased but due to fact than previous commits impacted PubSub significantly I had to make some changes (remove project from template fields, update PubSubPullSensor and adjust unit and system tests). Please take a look on last fixup.

@TobKed
Copy link
Contributor Author

TobKed commented Sep 20, 2019

I've updated TestPubSubPullSensor (hook mocking).

)

self._messages = [MessageToDict(m) for m in pulled_messages]

if self._messages and self.ack_messages:
if self.ack_messages:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self.ack_messages:

This is a double assertion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Thanks, I've fixed it.

@TobKed TobKed force-pushed the pubsub-hook-with-google-api-python branch from bed237f to 64f7bba Compare September 23, 2019 07:20
@TobKed
Copy link
Contributor Author

TobKed commented Sep 23, 2019

Updated due to comments, slightly improved test, rebased on last master.

@TobKed
Copy link
Contributor Author

TobKed commented Sep 26, 2019

cc @mik-laj @potiuk

@TobKed TobKed requested a review from mik-laj September 26, 2019 13:11
Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one thing - It seems that the messages should not be base64 encoded when using the client (see example here: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/pubsub/cloud-client/publisher.py). This means that in all tests, it should be fixed and comments should be updated. Also UPDATING.Md should mention that this should be changed in the input data (or maybe even we should somehow provide backwards compatibility and detect if someone passes a string rather than bytes and decode/encode appropriately).

publisher = self.get_conn()
topic_path = PublisherClient.topic_path(project_id, topic) # pylint: disable=no-member

# TODO validation of messages
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we leave that TODO here? I think we might already check it here simply to check if we can 'base64' decode What kind of validation we think we miss here? I think we already handle case where "data" is missing. I believe the publish method must check it already if the data is correctly encoded. If so, we should rather remove this TODO. Otherwise we should check ourselves what is the type of "data" field in the loop. I believe

isinstance(data, bytes)

for python 3+ should work fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment should not be present here.
I have separate PR for message validation waiting in queue:
https://github.com/PolideaInternal/airflow/pull/304/commits

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the last fixup I've included validation and deprecation warning.

"""
Publishes messages to a Pub/Sub topic.

:param project: the GCP project ID in which to publish
:type project: str
:param topic: the Pub/Sub topic to which to publish; do not
include the ``projects/{project}/topics/`` prefix.
:type topic: str
:param messages: messages to publish; if the data field in a
message is set, it should already be base64 encoded.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment is wrong now. According to https://cloud.google.com/pubsub/docs/publisher (and the code below) the data parameter should be bytestring (utf-8 encoded) rather than base64 encoded string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Great point! I've updated docstrings.

:param topic: the Pub/Sub topic name to create; do not
include the ``projects/{project}/topics/`` prefix.
:type topic: str
:param project_id: Optional, the GCP project ID in which to create the topic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool with all those params below. Looks like fully-featured implementation now 👍


# Add airflow-version label to the topic
labels = labels or {}
labels['airflow-version'] = 'v' + version.replace('.', '-').replace('+', '-')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

ack_deadline_secs: int = 10,
fail_if_exists: bool = False,
push_config: Optional[Union[Dict, PushConfig]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here 👍

*args,
**kwargs) -> None:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice that we keep backwards compatibility.

"""
template_fields = ['project', 'subscription']
template_fields = ['project_id', 'subscription']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be OK. Maybe (nice to have) we could add a warning in deprecation message, that templates won't work ?

@TobKed TobKed force-pushed the pubsub-hook-with-google-api-python branch from 64f7bba to b612cb8 Compare September 30, 2019 12:11
@TobKed TobKed force-pushed the pubsub-hook-with-google-api-python branch from b612cb8 to 04b9a03 Compare September 30, 2019 12:13
@TobKed
Copy link
Contributor Author

TobKed commented Sep 30, 2019

@potiuk I've added message validation. Now when user pass message data as a string which can be base64 decoded, an additional deprecation warning will be displayed, afterwardsPubSub exception will be raised. I've updated tests, added tests for message validation and added additional info in UPDATING.md.
About deprecation warning for template_fields I think it is possible however in my opinion it will introduce unnecessary amount of complication. I've included Information about template_filelds in UPDATING.md as well.

@TobKed TobKed force-pushed the pubsub-hook-with-google-api-python branch from 04b9a03 to 9db5c94 Compare September 30, 2019 12:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants