Skip to content

Commit

Permalink
[AIRFLOW-3231] Basic operators for Google Cloud SQL (#4097)
Browse files Browse the repository at this point in the history
Add CloudSqlInstanceInsertOperator, CloudSqlInstancePatchOperator and CloudSqlInstanceDeleteOperator.

Each operator includes:
- core logic
- input params validation
- unit tests
- presence in the example DAG
- docstrings
- How-to and Integration documentation

Additionally, small improvements to GcpBodyFieldValidator were made:
- add simple list validation capability (type="list")
- introduced parameter allow_empty, which can be set to False
	to test for non-emptiness of a string instead of specifying
	a regexp.

Co-authored-by: sprzedwojski <[email protected]>
Co-authored-by: potiuk <[email protected]>
  • Loading branch information
3 people authored and kaxil committed Oct 31, 2018
1 parent 9ec863a commit 3be8ce7
Show file tree
Hide file tree
Showing 8 changed files with 1,145 additions and 9 deletions.
134 changes: 134 additions & 0 deletions airflow/contrib/example_dags/example_gcp_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Example Airflow DAG that deploys, updates, patches and deletes a Cloud SQL instance
in Google Cloud Platform.
This DAG relies on the following Airflow variables
https://airflow.apache.org/concepts.html#variables
* PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance.
* INSTANCE_NAME - Name of the Cloud SQL instance.
"""

import datetime

import airflow
from airflow import models

from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \
CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator

# [START howto_operator_cloudsql_arguments]
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
INSTANCE_NAME = models.Variable.get('INSTANCE_NAME', '')
# [END howto_operator_cloudsql_arguments]

# Bodies below represent Cloud SQL instance resources:
# https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances

# [START howto_operator_cloudsql_create_body]
body = {
"name": INSTANCE_NAME,
"settings": {
"tier": "db-n1-standard-1",
"backupConfiguration": {
"binaryLogEnabled": True,
"enabled": True,
"startTime": "05:00"
},
"activationPolicy": "ALWAYS",
"dataDiskSizeGb": 30,
"dataDiskType": "PD_SSD",
"databaseFlags": [],
"ipConfiguration": {
"ipv4Enabled": True,
"requireSsl": True,
},
"locationPreference": {
"zone": "europe-west4-a"
},
"maintenanceWindow": {
"hour": 5,
"day": 7,
"updateTrack": "canary"
},
"pricingPlan": "PER_USE",
"replicationType": "ASYNCHRONOUS",
"storageAutoResize": False,
"storageAutoResizeLimit": 0,
"userLabels": {
"my-key": "my-value"
}
},
"databaseVersion": "MYSQL_5_7",
"region": "europe-west4",
}
# [END howto_operator_cloudsql_create_body]
# [START howto_operator_cloudsql_patch_body]
patch_body = {
"name": INSTANCE_NAME,
"settings": {
"dataDiskSizeGb": 35,
"maintenanceWindow": {
"hour": 3,
"day": 6,
"updateTrack": "canary"
},
"userLabels": {
"my-key-patch": "my-value-patch"
}
}
}
# [END howto_operator_cloudsql_patch_body]

default_args = {
'start_date': airflow.utils.dates.days_ago(1)
}

with models.DAG(
'example_gcp_sql',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)
) as dag:
# [START howto_operator_cloudsql_create]
sql_instance_create_task = CloudSqlInstanceCreateOperator(
project_id=PROJECT_ID,
body=body,
instance=INSTANCE_NAME,
task_id='sql_instance_create_task'
)
# [END howto_operator_cloudsql_create]
# [START howto_operator_cloudsql_patch]
sql_instance_patch_task = CloudSqlInstancePatchOperator(
project_id=PROJECT_ID,
body=patch_body,
instance=INSTANCE_NAME,
task_id='sql_instance_patch_task'
)
# [END howto_operator_cloudsql_patch]
# [START howto_operator_cloudsql_delete]
sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
project_id=PROJECT_ID,
instance=INSTANCE_NAME,
task_id='sql_instance_delete_task'
)
# [END howto_operator_cloudsql_delete]

sql_instance_create_task >> sql_instance_patch_task >> sql_instance_delete_task
173 changes: 173 additions & 0 deletions airflow/contrib/hooks/gcp_sql_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import time
from googleapiclient.discovery import build

from airflow import AirflowException
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook

# Number of retries - used by googleapiclient method calls to perform retries
# For requests that are "retriable"
NUM_RETRIES = 5

# Time to sleep between active checks of the operation results
TIME_TO_SLEEP_IN_SECONDS = 1


class CloudSqlOperationStatus:
PENDING = "PENDING"
RUNNING = "RUNNING"
DONE = "DONE"
UNKNOWN = "UNKNOWN"


# noinspection PyAbstractClass
class CloudSqlHook(GoogleCloudBaseHook):
"""
Hook for Google Cloud SQL APIs.
"""
_conn = None

def __init__(self,
api_version,
gcp_conn_id='google_cloud_default',
delegate_to=None):
super(CloudSqlHook, self).__init__(gcp_conn_id, delegate_to)
self.api_version = api_version

def get_conn(self):
"""
Retrieves connection to Cloud SQL.
:return: Google Cloud SQL services object.
:rtype: dict
"""
if not self._conn:
http_authorized = self._authorize()
self._conn = build('sqladmin', self.api_version,
http=http_authorized, cache_discovery=False)
return self._conn

def get_instance(self, project_id, instance):
"""
Retrieves a resource containing information about a Cloud SQL instance.
:param project_id: Project ID of the project that contains the instance.
:type project_id: str
:param instance: Database instance ID. This does not include the project ID.
:type instance: str
:return: A Cloud SQL instance resource.
:rtype: dict
"""
return self.get_conn().instances().get(
project=project_id,
instance=instance
).execute(num_retries=NUM_RETRIES)

def create_instance(self, project_id, body):
"""
Creates a new Cloud SQL instance.
:param project_id: Project ID of the project to which the newly created
Cloud SQL instances should belong.
:type project_id: str
:param body: Body required by the Cloud SQL insert API, as described in
https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert#request-body
:type body: dict
:return: True if the operation succeeded, raises an error otherwise
:rtype: bool
"""
response = self.get_conn().instances().insert(
project=project_id,
body=body
).execute(num_retries=NUM_RETRIES)
operation_name = response["name"]
return self._wait_for_operation_to_complete(project_id, operation_name)

def patch_instance(self, project_id, body, instance):
"""
Updates settings of a Cloud SQL instance.
Caution: This is not a partial update, so you must include values for
all the settings that you want to retain.
:param project_id: Project ID of the project that contains the instance.
:type project_id: str
:param body: Body required by the Cloud SQL patch API, as described in
https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch#request-body
:type body: dict
:param instance: Cloud SQL instance ID. This does not include the project ID.
:type instance: str
:return: True if the operation succeeded, raises an error otherwise
:rtype: bool
"""
response = self.get_conn().instances().patch(
project=project_id,
instance=instance,
body=body
).execute(num_retries=NUM_RETRIES)
operation_name = response["name"]
return self._wait_for_operation_to_complete(project_id, operation_name)

def delete_instance(self, project_id, instance):
"""
Deletes a Cloud SQL instance.
:param project_id: Project ID of the project that contains the instance.
:type project_id: str
:param instance: Cloud SQL instance ID. This does not include the project ID.
:type instance: str
:return: True if the operation succeeded, raises an error otherwise
:rtype: bool
"""
response = self.get_conn().instances().delete(
project=project_id,
instance=instance,
).execute(num_retries=NUM_RETRIES)
operation_name = response["name"]
return self._wait_for_operation_to_complete(project_id, operation_name)

def _wait_for_operation_to_complete(self, project_id, operation_name):
"""
Waits for the named operation to complete - checks status of the
asynchronous call.
:param project_id: Project ID of the project that contains the instance.
:type project_id: str
:param operation_name: name of the operation
:type operation_name: str
:return: response returned by the operation
:rtype: dict
"""
service = self.get_conn()
while True:
operation_response = service.operations().get(
project=project_id,
operation=operation_name,
).execute(num_retries=NUM_RETRIES)
if operation_response.get("status") == CloudSqlOperationStatus.DONE:
error = operation_response.get("error")
if error:
# Extracting the errors list as string and trimming square braces
error_msg = str(error.get("errors"))[1:-1]
raise AirflowException(error_msg)
# No meaningful info to return from the response in case of success
return True
time.sleep(TIME_TO_SLEEP_IN_SECONDS)
Loading

0 comments on commit 3be8ce7

Please sign in to comment.