Skip to content

Commit

Permalink
[AIRFLOW-3276] Cloud SQL: database create / patch / delete operators (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sprzedwojski authored and kaxil committed Nov 2, 2018
1 parent 38b54a0 commit 92cb5c7
Show file tree
Hide file tree
Showing 6 changed files with 701 additions and 19 deletions.
60 changes: 52 additions & 8 deletions airflow/contrib/example_dags/example_gcp_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,30 @@
# under the License.

"""
Example Airflow DAG that deploys, updates, patches and deletes a Cloud SQL instance
in Google Cloud Platform.
Example Airflow DAG that creates, patches and deletes a Cloud SQL instance, and also
creates, patches and deletes a database inside the instance, in Google Cloud Platform.
This DAG relies on the following Airflow variables
https://airflow.apache.org/concepts.html#variables
This DAG relies on the following environment variables
* PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance.
* INSTANCE_NAME - Name of the Cloud SQL instance.
* DB_NAME - Name of the database inside a Cloud SQL instance.
"""

import os
import datetime

import airflow
from airflow import models

from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \
CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator
CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator, \
CloudSqlInstanceDatabaseCreateOperator, CloudSqlInstanceDatabasePatchOperator, \
CloudSqlInstanceDatabaseDeleteOperator

# [START howto_operator_cloudsql_arguments]
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
INSTANCE_NAME = models.Variable.get('INSTANCE_NAME', '')
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
# [END howto_operator_cloudsql_arguments]

# Bodies below represent Cloud SQL instance resources:
Expand Down Expand Up @@ -97,6 +101,19 @@
}
}
# [END howto_operator_cloudsql_patch_body]
# [START howto_operator_cloudsql_db_create_body]
db_create_body = {
"instance": INSTANCE_NAME,
"name": DB_NAME,
"project": PROJECT_ID
}
# [END howto_operator_cloudsql_db_create_body]
# [START howto_operator_cloudsql_db_patch_body]
db_patch_body = {
"charset": "utf16",
"collation": "utf16_general_ci"
}
# [END howto_operator_cloudsql_db_patch_body]

default_args = {
'start_date': airflow.utils.dates.days_ago(1)
Expand All @@ -123,6 +140,31 @@
task_id='sql_instance_patch_task'
)
# [END howto_operator_cloudsql_patch]
# [START howto_operator_cloudsql_db_create]
sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
project_id=PROJECT_ID,
body=db_create_body,
instance=INSTANCE_NAME,
task_id='sql_db_create_task'
)
# [END howto_operator_cloudsql_db_create]
# [START howto_operator_cloudsql_db_patch]
sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
project_id=PROJECT_ID,
body=db_patch_body,
instance=INSTANCE_NAME,
database=DB_NAME,
task_id='sql_db_patch_task'
)
# [END howto_operator_cloudsql_db_patch]
# [START howto_operator_cloudsql_db_delete]
sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
project_id=PROJECT_ID,
instance=INSTANCE_NAME,
database=DB_NAME,
task_id='sql_db_delete_task'
)
# [END howto_operator_cloudsql_db_delete]
# [START howto_operator_cloudsql_delete]
sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
project_id=PROJECT_ID,
Expand All @@ -131,4 +173,6 @@
)
# [END howto_operator_cloudsql_delete]

sql_instance_create_task >> sql_instance_patch_task >> sql_instance_delete_task
sql_instance_create_task >> sql_instance_patch_task \
>> sql_db_create_task >> sql_db_patch_task \
>> sql_db_delete_task >> sql_instance_delete_task
90 changes: 90 additions & 0 deletions airflow/contrib/hooks/gcp_sql_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,96 @@ def delete_instance(self, project_id, instance):
operation_name = response["name"]
return self._wait_for_operation_to_complete(project_id, operation_name)

def get_database(self, project_id, instance, database):
"""
Retrieves a database resource from a Cloud SQL instance.
:param project_id: Project ID of the project that contains the instance.
:type project_id: str
:param instance: Database instance ID. This does not include the project ID.
:type instance: str
:param database: Name of the database in the instance.
:type database: str
:return: A Cloud SQL database resource, as described in
https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases#resource
:rtype: dict
"""
return self.get_conn().databases().get(
project=project_id,
instance=instance,
database=database
).execute(num_retries=NUM_RETRIES)

def create_database(self, project, instance, body):
"""
Creates a new database inside a Cloud SQL instance.
:param project: Project ID of the project that contains the instance.
:type project: str
:param instance: Database instance ID. This does not include the project ID.
:type instance: str
:param body: The request body, as described in
https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body
:type body: dict
:return: True if the operation succeeded, raises an error otherwise
:rtype: bool
"""
response = self.get_conn().databases().insert(
project=project,
instance=instance,
body=body
).execute(num_retries=NUM_RETRIES)
operation_name = response["name"]
return self._wait_for_operation_to_complete(project, operation_name)

def patch_database(self, project, instance, database, body):
"""
Updates a database resource inside a Cloud SQL instance.
This method supports patch semantics.
See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
:param project: Project ID of the project that contains the instance.
:type project: str
:param instance: Database instance ID. This does not include the project ID.
:type instance: str
:param database: Name of the database to be updated in the instance.
:type database: str
:param body: The request body, as described in
https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body
:type body: dict
:return: True if the operation succeeded, raises an error otherwise
:rtype: bool
"""
response = self.get_conn().databases().patch(
project=project,
instance=instance,
database=database,
body=body
).execute(num_retries=NUM_RETRIES)
operation_name = response["name"]
return self._wait_for_operation_to_complete(project, operation_name)

def delete_database(self, project, instance, database):
"""
Deletes a database from a Cloud SQL instance.
:param project: Project ID of the project that contains the instance.
:type project: str
:param instance: Database instance ID. This does not include the project ID.
:type instance: str
:param database: Name of the database to be deleted in the instance.
:type database: str
:return: True if the operation succeeded, raises an error otherwise
:rtype: bool
"""
response = self.get_conn().databases().delete(
project=project,
instance=instance,
database=database
).execute(num_retries=NUM_RETRIES)
operation_name = response["name"]
return self._wait_for_operation_to_complete(project, operation_name)

def _wait_for_operation_to_complete(self, project_id, operation_name):
"""
Waits for the named operation to complete - checks status of the
Expand Down
Loading

0 comments on commit 92cb5c7

Please sign in to comment.