Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RedshiftDataOperator and update doc #22157

Merged
merged 3 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,17 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime, timedelta
from datetime import datetime
from os import getenv

from airflow.decorators import dag, task
from airflow import DAG
from airflow.decorators import task
from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator

# [START howto_operator_redshift_data_env_variables]
REDSHIFT_CLUSTER_IDENTIFIER = getenv("REDSHIFT_CLUSTER_IDENTIFIER", "test-cluster")
REDSHIFT_DATABASE = getenv("REDSHIFT_DATABASE", "test-database")
REDSHIFT_CLUSTER_IDENTIFIER = getenv("REDSHIFT_CLUSTER_IDENTIFIER", "redshift_cluster_identifier")
REDSHIFT_DATABASE = getenv("REDSHIFT_DATABASE", "redshift_database")
REDSHIFT_DATABASE_USER = getenv("REDSHIFT_DATABASE_USER", "awsuser")
# [END howto_operator_redshift_data_env_variables]

REDSHIFT_QUERY = """
SELECT table_schema,
Expand All @@ -40,29 +39,26 @@
POLL_INTERVAL = 10


# [START howto_redshift_data]
@dag(
dag_id='example_redshift_data',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
tags=['example'],
catchup=False,
)
Comment on lines -44 to -51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why update away from the @dag decorator? More curiosity than a suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. We (AWS) are trying to be consistent across the Amazon providers package and decided to go with this format

def example_redshift_data():
@task(task_id="output_results")
def output_results_fn(id):
"""This is a python decorator task that returns a Redshift query"""
hook = RedshiftDataHook()
@task(task_id="output_results")
def output_query_results(statement_id):
hook = RedshiftDataHook()
resp = hook.conn.get_statement_result(
Id=statement_id,
)

print(resp)
return resp

resp = hook.get_statement_result(
id=id,
)
print(resp)
return resp

# Run a SQL statement and wait for completion
redshift_query = RedshiftDataOperator(
with DAG(
dag_id="example_redshift_data_execute_sql",
start_date=datetime(2021, 1, 1),
schedule_interval=None,
catchup=False,
tags=['example'],
) as dag:
# [START howto_redshift_data]
task_query = RedshiftDataOperator(
task_id='redshift_query',
cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
database=REDSHIFT_DATABASE,
Expand All @@ -71,10 +67,6 @@ def output_results_fn(id):
poll_interval=POLL_INTERVAL,
await_result=True,
)
# [END howto_redshift_data]

# Using a task-decorated function to output the list of tables in a Redshift cluster
output_results_fn(redshift_query.output)


example_redshift_data_dag = example_redshift_data()
# [END howto_redshift_data]
task_output = output_query_results(task_query.output)
25 changes: 14 additions & 11 deletions airflow/providers/amazon/aws/operators/redshift_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
import sys
from time import sleep
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional

if sys.version_info >= (3, 8):
from functools import cached_property
Expand Down Expand Up @@ -109,16 +109,19 @@ def hook(self) -> RedshiftDataHook:
return RedshiftDataHook(aws_conn_id=self.aws_conn_id, region_name=self.region)

def execute_query(self):
resp = self.hook.conn.execute_statement(
ClusterIdentifier=self.cluster_identifier,
Database=self.database,
DbUser=self.db_user,
Sql=self.sql,
Parameters=self.parameters,
SecretArn=self.secret_arn,
StatementName=self.statement_name,
WithEvent=self.with_event,
)
kwargs: Dict[str, Any] = {
"ClusterIdentifier": self.cluster_identifier,
"Database": self.database,
"Sql": self.sql,
"DbUser": self.db_user,
"Parameters": self.parameters,
"WithEvent": self.with_event,
"SecretArn": self.secret_arn,
"StatementName": self.statement_name,
}

filter_values = {key: val for key, val in kwargs.items() if val is not None}
resp = self.hook.conn.execute_statement(**filter_values)
return resp['Id']

def wait_for_results(self, statement_id):
Expand Down
39 changes: 19 additions & 20 deletions docs/apache-airflow-providers-amazon/operators/redshift_data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,37 @@
specific language governing permissions and limitations
under the License.

.. _howto/operator:RedshiftDataOperator:
Amazon Redshift Data Operators
==============================

RedshiftDataOperator
====================
Use the :class:`RedshiftDataOperator <airflow.providers.amazon.aws.operators.redshift_data>` to execute
statements against an Amazon Redshift cluster.

.. contents::
:depth: 1
:local:
This differs from ``RedshiftSQLOperator`` in that it allows users to query and retrieve data via the AWS API and avoid the necessity of a Postgres connection.

Overview
--------
Prerequisite Tasks
^^^^^^^^^^^^^^^^^^

Use the :class:`RedshiftDataOperator <airflow.providers.amazon.aws.operators.redshift_data>` to execute
statements against an Amazon Redshift cluster.
.. include:: _partials/prerequisite_tasks.rst

This differs from RedshiftSQLOperator in that it allows users to query and retrieve data via the AWS API and avoid the necessity of a Postgres connection.
Amazon Redshift Data
^^^^^^^^^^^^^^^^^^^^

example_redshift_data_execute_sql.py
------------------------------------
.. _howto/operator:RedshiftDataOperator:

Purpose
"""""""
Execute a statement on an Amazon Redshift Cluster
"""""""""""""""""""""""""""""""""""""""""""""""""

This is a basic example DAG for using :class:`RedshiftDataOperator <airflow.providers.amazon.aws.operators.redshift_data>`
to execute statements against an Amazon Redshift cluster.

List tables in database
"""""""""""""""""""""""

In the following code we list the tables in the provided database.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_data_execute_sql.py
:language: python
:dedent: 4
:start-after: [START howto_redshift_data]
:end-before: [END howto_redshift_data]

Reference
^^^^^^^^^

* `AWS boto3 Library Documentation for Amazon Redshift Data <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data.html>`__
26 changes: 15 additions & 11 deletions tests/providers/amazon/aws/operators/test_redshift_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,38 +40,42 @@ def test_execute_without_waiting(self, mock_conn):
)
operator.execute(None)
mock_conn.execute_statement.assert_called_once_with(
ClusterIdentifier=None,
Database=DATABASE,
DbUser=None,
Sql=SQL,
Parameters=None,
SecretArn=None,
StatementName=None,
WithEvent=False,
)
mock_conn.describe_statement.assert_not_called()

@mock.patch("airflow.providers.amazon.aws.hooks.redshift_data.RedshiftDataHook.conn")
def test_execute(self, mock_conn):
def test_execute_with_all_parameters(self, mock_conn):
cluster_identifier = "cluster_identifier"
db_user = "db_user"
secret_arn = "secret_arn"
statement_name = "statement_name"
parameters = [{"name": "id", "value": "1"}]
mock_conn.execute_statement.return_value = {'Id': STATEMENT_ID}
mock_conn.describe_statement.return_value = {"Status": "FINISHED"}

operator = RedshiftDataOperator(
aws_conn_id=CONN_ID,
task_id=TASK_ID,
sql=SQL,
parameters=parameters,
database=DATABASE,
cluster_identifier=cluster_identifier,
db_user=db_user,
secret_arn=secret_arn,
statement_name=statement_name,
parameters=parameters,
)
operator.execute(None)
mock_conn.execute_statement.assert_called_once_with(
ClusterIdentifier=None,
Database=DATABASE,
DbUser=None,
Sql=SQL,
ClusterIdentifier=cluster_identifier,
DbUser=db_user,
SecretArn=secret_arn,
StatementName=statement_name,
Parameters=parameters,
SecretArn=None,
StatementName=None,
WithEvent=False,
)
mock_conn.describe_statement.assert_called_once_with(
Expand Down