Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5477] Rewrite Google PubSub Hook to Google Cloud Python #6096

Merged
21 changes: 21 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@ assists users migrating to a new version.

## Airflow Master

### Changes to Google PubSub Operators, Hook and Sensor
In the `PubSubPublishOperator` and `PubSubHook.publsh` method the data field in a message should be bytestring (utf-8 encoded) rather than base64 encoded string.

Due to the normalization of the parameters within GCP operators and hooks a parameters like `project` or `topic_project`
are deprecated and will be substituted by parameter `project_id`.
In `PubSubHook.create_subscription` hook method in the parameter `subscription_project` is replaced by `subscription_project_id`.
Template fields are updated accordingly and old ones may not work.

It is required now to pass key-word only arguments to `PubSub` hook.

These changes are not backward compatible.

Affected components:
* airflow.gcp.hooks.pubsub.PubSubHook
* airflow.gcp.operators.pubsub.PubSubTopicCreateOperator
* airflow.gcp.operators.pubsub.PubSubSubscriptionCreateOperator
* airflow.gcp.operators.pubsub.PubSubTopicDeleteOperator
* airflow.gcp.operators.pubsub.PubSubSubscriptionDeleteOperator
* airflow.gcp.operators.pubsub.PubSubPublishOperator
* airflow.gcp.sensors.pubsub.PubSubPullSensor

### Changes to `aws_default` Connection's default region

The region of Airflow's default connection to AWS (`aws_default`) was previously
Expand Down
16 changes: 7 additions & 9 deletions airflow/gcp/example_dags/example_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import airflow
from airflow import models


from airflow.gcp.operators.pubsub import (
PubSubTopicCreateOperator,
PubSubSubscriptionDeleteOperator,
Expand All @@ -38,7 +37,7 @@

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
TOPIC = "PubSubTestTopic"
MESSAGE = {"attributes": {"name": "wrench", "mass": "1.3kg", "count": "3"}}
MESSAGE = {"data": b"Tool", "attributes": {"name": "wrench", "mass": "1.3kg", "count": "3"}}

default_args = {"start_date": airflow.utils.dates.days_ago(1)}

Expand All @@ -54,18 +53,18 @@
schedule_interval=None, # Override to match your needs
) as example_dag:
create_topic = PubSubTopicCreateOperator(
task_id="create_topic", topic=TOPIC, project=GCP_PROJECT_ID
task_id="create_topic", topic=TOPIC, project_id=GCP_PROJECT_ID
)

subscribe_task = PubSubSubscriptionCreateOperator(
task_id="subscribe_task", topic_project=GCP_PROJECT_ID, topic=TOPIC
task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC
)
subscription = "{{ task_instance.xcom_pull('subscribe_task') }}"

pull_messages = PubSubPullSensor(
task_id="pull_messages",
ack_messages=True,
project=GCP_PROJECT_ID,
project_id=GCP_PROJECT_ID,
subscription=subscription,
)

Expand All @@ -75,19 +74,18 @@

publish_task = PubSubPublishOperator(
task_id="publish_task",
project=GCP_PROJECT_ID,
project_id=GCP_PROJECT_ID,
topic=TOPIC,
messages=[MESSAGE, MESSAGE, MESSAGE],
)

unsubscribe_task = PubSubSubscriptionDeleteOperator(
task_id="unsubscribe_task",
project=GCP_PROJECT_ID,
project_id=GCP_PROJECT_ID,
subscription="{{ task_instance.xcom_pull('subscribe_task') }}",
)

delete_topic = PubSubTopicDeleteOperator(
task_id="delete_topic", topic=TOPIC, project=GCP_PROJECT_ID
task_id="delete_topic", topic=TOPIC, project_id=GCP_PROJECT_ID
)

create_topic >> subscribe_task >> publish_task
Expand Down
Loading