Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Black on Providers Packages #10543

Merged
merged 17 commits into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[flake8]
max-line-length = 110
ignore = E231,E731,W504,I001,W503
ignore = E203,E231,E731,W504,I001,W503
exclude = .svn,CVS,.bzr,.hg,.git,__pycache__,.eggs,*.egg,node_modules
format = ${cyan}%(path)s${reset}:${yellow_bold}%(row)d${reset}:${green_bold}%(col)d${reset}: ${red_bold}%(code)s${reset} %(text)s
per-file-ignores =
Expand Down
5 changes: 3 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ repos:
rev: stable
hooks:
- id: black
files: api_connexion/.*\.py
files: api_connexion/.*\.py|.*providers.*\.py
exclude: .*kubernetes_pod\.py|.*google/common/hooks/base_google\.py$
args: [--config=./pyproject.toml]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
Expand Down Expand Up @@ -190,7 +191,7 @@ repos:
name: Run isort to sort imports
types: [python]
# To keep consistent with the global isort skip config defined in setup.cfg
exclude: ^build/.*$|^.tox/.*$|^venv/.*$|.*api_connexion/.*\.py
exclude: ^build/.*$|^.tox/.*$|^venv/.*$|.*api_connexion/.*\.py|.*providers.*\.py
- repo: https://github.com/pycqa/pydocstyle
rev: 5.0.2
hooks:
Expand Down
17 changes: 5 additions & 12 deletions airflow/providers/amazon/aws/example_dags/example_datasync_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,13 @@
from airflow.utils.dates import days_ago

# [START howto_operator_datasync_1_args_1]
TASK_ARN = getenv(
"TASK_ARN", "my_aws_datasync_task_arn")
TASK_ARN = getenv("TASK_ARN", "my_aws_datasync_task_arn")
# [END howto_operator_datasync_1_args_1]

# [START howto_operator_datasync_1_args_2]
SOURCE_LOCATION_URI = getenv(
"SOURCE_LOCATION_URI", "smb://hostname/directory/")
SOURCE_LOCATION_URI = getenv("SOURCE_LOCATION_URI", "smb://hostname/directory/")

DESTINATION_LOCATION_URI = getenv(
"DESTINATION_LOCATION_URI", "s3://mybucket/prefix")
DESTINATION_LOCATION_URI = getenv("DESTINATION_LOCATION_URI", "s3://mybucket/prefix")
# [END howto_operator_datasync_1_args_2]


Expand All @@ -55,16 +52,12 @@

# [START howto_operator_datasync_1_1]
datasync_task_1 = AWSDataSyncOperator(
aws_conn_id="aws_default",
task_id="datasync_task_1",
task_arn=TASK_ARN
aws_conn_id="aws_default", task_id="datasync_task_1", task_arn=TASK_ARN
)
# [END howto_operator_datasync_1_1]

with models.DAG(
"example_datasync_1_2",
start_date=days_ago(1),
schedule_interval=None, # Override to match your needs
"example_datasync_1_2", start_date=days_ago(1), schedule_interval=None, # Override to match your needs
) as dag:
# [START howto_operator_datasync_1_2]
datasync_task_2 = AWSDataSyncOperator(
Expand Down
29 changes: 8 additions & 21 deletions airflow/providers/amazon/aws/example_dags/example_datasync_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,40 +42,30 @@
from airflow.utils.dates import days_ago

# [START howto_operator_datasync_2_args]
SOURCE_LOCATION_URI = getenv(
"SOURCE_LOCATION_URI", "smb://hostname/directory/")
SOURCE_LOCATION_URI = getenv("SOURCE_LOCATION_URI", "smb://hostname/directory/")

DESTINATION_LOCATION_URI = getenv(
"DESTINATION_LOCATION_URI", "s3://mybucket/prefix")
DESTINATION_LOCATION_URI = getenv("DESTINATION_LOCATION_URI", "s3://mybucket/prefix")

default_create_task_kwargs = '{"Name": "Created by Airflow"}'
CREATE_TASK_KWARGS = json.loads(
getenv("CREATE_TASK_KWARGS", default_create_task_kwargs)
)
CREATE_TASK_KWARGS = json.loads(getenv("CREATE_TASK_KWARGS", default_create_task_kwargs))

default_create_source_location_kwargs = "{}"
CREATE_SOURCE_LOCATION_KWARGS = json.loads(
getenv("CREATE_SOURCE_LOCATION_KWARGS",
default_create_source_location_kwargs)
getenv("CREATE_SOURCE_LOCATION_KWARGS", default_create_source_location_kwargs)
)

bucket_access_role_arn = (
"arn:aws:iam::11112223344:role/r-11112223344-my-bucket-access-role"
)
bucket_access_role_arn = "arn:aws:iam::11112223344:role/r-11112223344-my-bucket-access-role"
default_destination_location_kwargs = """\
{"S3BucketArn": "arn:aws:s3:::mybucket",
"S3Config": {"BucketAccessRoleArn":
"arn:aws:iam::11112223344:role/r-11112223344-my-bucket-access-role"}
}"""
CREATE_DESTINATION_LOCATION_KWARGS = json.loads(
getenv("CREATE_DESTINATION_LOCATION_KWARGS",
re.sub(r"[\s+]", '', default_destination_location_kwargs))
getenv("CREATE_DESTINATION_LOCATION_KWARGS", re.sub(r"[\s+]", '', default_destination_location_kwargs))
)

default_update_task_kwargs = '{"Name": "Updated by Airflow"}'
UPDATE_TASK_KWARGS = json.loads(
getenv("UPDATE_TASK_KWARGS", default_update_task_kwargs)
)
UPDATE_TASK_KWARGS = json.loads(getenv("UPDATE_TASK_KWARGS", default_update_task_kwargs))

# [END howto_operator_datasync_2_args]

Expand All @@ -92,13 +82,10 @@
task_id="datasync_task",
source_location_uri=SOURCE_LOCATION_URI,
destination_location_uri=DESTINATION_LOCATION_URI,

create_task_kwargs=CREATE_TASK_KWARGS,
create_source_location_kwargs=CREATE_SOURCE_LOCATION_KWARGS,
create_destination_location_kwargs=CREATE_DESTINATION_LOCATION_KWARGS,

update_task_kwargs=UPDATE_TASK_KWARGS,

delete_task_after_execution=True
delete_task_after_execution=True,
)
# [END howto_operator_datasync_2]
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,7 @@
task_definition="hello-world",
launch_type="FARGATE",
overrides={
"containerOverrides": [
{
"name": "hello-world-container",
"command": ["echo", "hello", "world"],
},
],
"containerOverrides": [{"name": "hello-world-container", "command": ["echo", "hello", "world"],},],
},
network_configuration={
"awsvpcConfiguration": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False
'email_on_retry': False,
}

# [START howto_operator_emr_automatic_steps_config]
Expand All @@ -40,12 +40,8 @@
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'/usr/lib/spark/bin/run-example',
'SparkPi',
'10'
]
}
'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
},
}
]

Expand Down Expand Up @@ -85,13 +81,13 @@
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default'
emr_conn_id='emr_default',
)

job_sensor = EmrJobFlowSensor(
task_id='check_job_flow',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default'
aws_conn_id='aws_default',
)

job_flow_creator >> job_sensor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False
'email_on_retry': False,
}

SPARK_STEPS = [
Expand All @@ -44,12 +44,8 @@
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'/usr/lib/spark/bin/run-example',
'SparkPi',
'10'
]
}
'Args': ['/usr/lib/spark/bin/run-example', 'SparkPi', '10'],
},
}
]

Expand Down Expand Up @@ -87,27 +83,27 @@
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default'
emr_conn_id='emr_default',
)

step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS
steps=SPARK_STEPS,
)

step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default'
aws_conn_id='aws_default',
)

cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default'
aws_conn_id='aws_default',
)

cluster_creator >> step_adder >> step_checker >> cluster_remover
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _check_and_transform_video_ids(xcom_key, task_ids, task_instance, **kwargs):
dag_id="example_google_api_to_s3_transfer_advanced",
schedule_interval=None,
start_date=days_ago(1),
tags=['example']
tags=['example'],
) as dag:
# [START howto_operator_google_api_to_s3_transfer_advanced_task_1]
task_video_ids_to_s3 = GoogleApiToS3Operator(
Expand All @@ -89,21 +89,18 @@ def _check_and_transform_video_ids(xcom_key, task_ids, task_instance, **kwargs):
'publishedAfter': YOUTUBE_VIDEO_PUBLISHED_AFTER,
'publishedBefore': YOUTUBE_VIDEO_PUBLISHED_BEFORE,
'type': 'video',
'fields': 'items/id/videoId'
'fields': 'items/id/videoId',
},
google_api_response_via_xcom='video_ids_response',
s3_destination_key=f'{s3_directory}/youtube_search_{s3_file_name}.json',
task_id='video_ids_to_s3'
task_id='video_ids_to_s3',
)
# [END howto_operator_google_api_to_s3_transfer_advanced_task_1]
# [START howto_operator_google_api_to_s3_transfer_advanced_task_1_1]
task_check_and_transform_video_ids = BranchPythonOperator(
python_callable=_check_and_transform_video_ids,
op_args=[
task_video_ids_to_s3.google_api_response_via_xcom,
task_video_ids_to_s3.task_id
],
task_id='check_and_transform_video_ids'
op_args=[task_video_ids_to_s3.google_api_response_via_xcom, task_video_ids_to_s3.task_id],
task_id='check_and_transform_video_ids',
)
# [END howto_operator_google_api_to_s3_transfer_advanced_task_1_1]
# [START howto_operator_google_api_to_s3_transfer_advanced_task_2]
Expand All @@ -115,16 +112,14 @@ def _check_and_transform_video_ids(xcom_key, task_ids, task_instance, **kwargs):
google_api_endpoint_params={
'part': YOUTUBE_VIDEO_PARTS,
'maxResults': 50,
'fields': YOUTUBE_VIDEO_FIELDS
'fields': YOUTUBE_VIDEO_FIELDS,
},
google_api_endpoint_params_via_xcom='video_ids',
s3_destination_key=f'{s3_directory}/youtube_videos_{s3_file_name}.json',
task_id='video_data_to_s3'
task_id='video_data_to_s3',
)
# [END howto_operator_google_api_to_s3_transfer_advanced_task_2]
# [START howto_operator_google_api_to_s3_transfer_advanced_task_2_1]
task_no_video_ids = DummyOperator(
task_id='no_video_ids'
)
task_no_video_ids = DummyOperator(task_id='no_video_ids')
# [END howto_operator_google_api_to_s3_transfer_advanced_task_2_1]
task_video_ids_to_s3 >> task_check_and_transform_video_ids >> [task_video_data_to_s3, task_no_video_ids]
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,16 @@
dag_id="example_google_api_to_s3_transfer_basic",
schedule_interval=None,
start_date=days_ago(1),
tags=['example']
tags=['example'],
) as dag:
# [START howto_operator_google_api_to_s3_transfer_basic_task_1]
task_google_sheets_values_to_s3 = GoogleApiToS3Operator(
google_api_service_name='sheets',
google_api_service_version='v4',
google_api_endpoint_path='sheets.spreadsheets.values.get',
google_api_endpoint_params={
'spreadsheetId': GOOGLE_SHEET_ID,
'range': GOOGLE_SHEET_RANGE
},
google_api_endpoint_params={'spreadsheetId': GOOGLE_SHEET_ID, 'range': GOOGLE_SHEET_RANGE},
s3_destination_key=S3_DESTINATION_KEY,
task_id='google_sheets_values_to_s3',
dag=dag
dag=dag,
)
# [END howto_operator_google_api_to_s3_transfer_basic_task_1]
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@
# [END howto_operator_imap_attachment_to_s3_env_variables]

with DAG(
dag_id="example_imap_attachment_to_s3",
start_date=days_ago(1),
schedule_interval=None,
tags=['example']
dag_id="example_imap_attachment_to_s3", start_date=days_ago(1), schedule_interval=None, tags=['example']
) as dag:
# [START howto_operator_imap_attachment_to_s3_task_1]
task_transfer_imap_attachment_to_s3 = ImapAttachmentToS3Operator(
Expand All @@ -46,6 +43,6 @@
imap_mail_folder=IMAP_MAIL_FOLDER,
imap_mail_filter=IMAP_MAIL_FILTER,
task_id='transfer_imap_attachment_to_s3',
dag=dag
dag=dag,
)
# [END howto_operator_imap_attachment_to_s3_task_1]
15 changes: 4 additions & 11 deletions airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ def upload_keys():
s3_hook = S3Hook()
for i in range(0, 3):
s3_hook.load_string(
string_data="input",
key=f"path/data{i}",
bucket_name=BUCKET_NAME,
string_data="input", key=f"path/data{i}", bucket_name=BUCKET_NAME,
)


Expand All @@ -46,20 +44,15 @@ def upload_keys():
) as dag:

create_bucket = S3CreateBucketOperator(
task_id='s3_bucket_dag_create',
bucket_name=BUCKET_NAME,
region_name='us-east-1',
task_id='s3_bucket_dag_create', bucket_name=BUCKET_NAME, region_name='us-east-1',
)

add_keys_to_bucket = PythonOperator(
task_id="s3_bucket_dag_add_keys_to_bucket",
python_callable=upload_keys
task_id="s3_bucket_dag_add_keys_to_bucket", python_callable=upload_keys
)

delete_bucket = S3DeleteBucketOperator(
task_id='s3_bucket_dag_delete',
bucket_name=BUCKET_NAME,
force_delete=True,
task_id='s3_bucket_dag_delete', bucket_name=BUCKET_NAME, force_delete=True,
)

create_bucket >> add_keys_to_bucket >> delete_bucket
Loading