Skip to content

Commit

Permalink
[AIRFLOW-5477] Rewrite Google PubSub Hook to Google Cloud Python (#6096)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobiasz Kędzierski authored and kaxil committed Oct 2, 2019
1 parent 02a9f62 commit 265a1d5
Show file tree
Hide file tree
Showing 9 changed files with 1,230 additions and 468 deletions.
21 changes: 21 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@ assists users migrating to a new version.

## Airflow Master

### Changes to Google PubSub Operators, Hook and Sensor
In the `PubSubPublishOperator` and `PubSubHook.publsh` method the data field in a message should be bytestring (utf-8 encoded) rather than base64 encoded string.

Due to the normalization of the parameters within GCP operators and hooks a parameters like `project` or `topic_project`
are deprecated and will be substituted by parameter `project_id`.
In `PubSubHook.create_subscription` hook method in the parameter `subscription_project` is replaced by `subscription_project_id`.
Template fields are updated accordingly and old ones may not work.

It is required now to pass key-word only arguments to `PubSub` hook.

These changes are not backward compatible.

Affected components:
* airflow.gcp.hooks.pubsub.PubSubHook
* airflow.gcp.operators.pubsub.PubSubTopicCreateOperator
* airflow.gcp.operators.pubsub.PubSubSubscriptionCreateOperator
* airflow.gcp.operators.pubsub.PubSubTopicDeleteOperator
* airflow.gcp.operators.pubsub.PubSubSubscriptionDeleteOperator
* airflow.gcp.operators.pubsub.PubSubPublishOperator
* airflow.gcp.sensors.pubsub.PubSubPullSensor

### Changes to `aws_default` Connection's default region

The region of Airflow's default connection to AWS (`aws_default`) was previously
Expand Down
16 changes: 7 additions & 9 deletions airflow/gcp/example_dags/example_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import airflow
from airflow import models


from airflow.gcp.operators.pubsub import (
PubSubTopicCreateOperator,
PubSubSubscriptionDeleteOperator,
Expand All @@ -38,7 +37,7 @@

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
TOPIC = "PubSubTestTopic"
MESSAGE = {"attributes": {"name": "wrench", "mass": "1.3kg", "count": "3"}}
MESSAGE = {"data": b"Tool", "attributes": {"name": "wrench", "mass": "1.3kg", "count": "3"}}

default_args = {"start_date": airflow.utils.dates.days_ago(1)}

Expand All @@ -54,18 +53,18 @@
schedule_interval=None, # Override to match your needs
) as example_dag:
create_topic = PubSubTopicCreateOperator(
task_id="create_topic", topic=TOPIC, project=GCP_PROJECT_ID
task_id="create_topic", topic=TOPIC, project_id=GCP_PROJECT_ID
)

subscribe_task = PubSubSubscriptionCreateOperator(
task_id="subscribe_task", topic_project=GCP_PROJECT_ID, topic=TOPIC
task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC
)
subscription = "{{ task_instance.xcom_pull('subscribe_task') }}"

pull_messages = PubSubPullSensor(
task_id="pull_messages",
ack_messages=True,
project=GCP_PROJECT_ID,
project_id=GCP_PROJECT_ID,
subscription=subscription,
)

Expand All @@ -75,19 +74,18 @@

publish_task = PubSubPublishOperator(
task_id="publish_task",
project=GCP_PROJECT_ID,
project_id=GCP_PROJECT_ID,
topic=TOPIC,
messages=[MESSAGE, MESSAGE, MESSAGE],
)

unsubscribe_task = PubSubSubscriptionDeleteOperator(
task_id="unsubscribe_task",
project=GCP_PROJECT_ID,
project_id=GCP_PROJECT_ID,
subscription="{{ task_instance.xcom_pull('subscribe_task') }}",
)

delete_topic = PubSubTopicDeleteOperator(
task_id="delete_topic", topic=TOPIC, project=GCP_PROJECT_ID
task_id="delete_topic", topic=TOPIC, project_id=GCP_PROJECT_ID
)

create_topic >> subscribe_task >> publish_task
Expand Down
Loading

0 comments on commit 265a1d5

Please sign in to comment.