diff --git a/airflow/contrib/example_dags/example_gcp_sql.py b/airflow/contrib/example_dags/example_gcp_sql.py index a484456f6e6d9..136c88c843014 100644 --- a/airflow/contrib/example_dags/example_gcp_sql.py +++ b/airflow/contrib/example_dags/example_gcp_sql.py @@ -18,26 +18,30 @@ # under the License. """ -Example Airflow DAG that deploys, updates, patches and deletes a Cloud SQL instance -in Google Cloud Platform. +Example Airflow DAG that creates, patches and deletes a Cloud SQL instance, and also +creates, patches and deletes a database inside the instance, in Google Cloud Platform. -This DAG relies on the following Airflow variables -https://airflow.apache.org/concepts.html#variables +This DAG relies on the following environment variables * PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance. * INSTANCE_NAME - Name of the Cloud SQL instance. +* DB_NAME - Name of the database inside a Cloud SQL instance. """ +import os import datetime import airflow from airflow import models from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \ - CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator + CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator, \ + CloudSqlInstanceDatabaseCreateOperator, CloudSqlInstanceDatabasePatchOperator, \ + CloudSqlInstanceDatabaseDeleteOperator # [START howto_operator_cloudsql_arguments] -PROJECT_ID = models.Variable.get('PROJECT_ID', '') -INSTANCE_NAME = models.Variable.get('INSTANCE_NAME', '') +PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project') +INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance') +DB_NAME = os.environ.get('DB_NAME', 'testdb') # [END howto_operator_cloudsql_arguments] # Bodies below represent Cloud SQL instance resources: @@ -97,6 +101,19 @@ } } # [END howto_operator_cloudsql_patch_body] +# [START howto_operator_cloudsql_db_create_body] +db_create_body = { + "instance": INSTANCE_NAME, + "name": DB_NAME, + "project": PROJECT_ID +} +# [END howto_operator_cloudsql_db_create_body] +# [START howto_operator_cloudsql_db_patch_body] +db_patch_body = { + "charset": "utf16", + "collation": "utf16_general_ci" +} +# [END howto_operator_cloudsql_db_patch_body] default_args = { 'start_date': airflow.utils.dates.days_ago(1) @@ -123,6 +140,31 @@ task_id='sql_instance_patch_task' ) # [END howto_operator_cloudsql_patch] + # [START howto_operator_cloudsql_db_create] + sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator( + project_id=PROJECT_ID, + body=db_create_body, + instance=INSTANCE_NAME, + task_id='sql_db_create_task' + ) + # [END howto_operator_cloudsql_db_create] + # [START howto_operator_cloudsql_db_patch] + sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator( + project_id=PROJECT_ID, + body=db_patch_body, + instance=INSTANCE_NAME, + database=DB_NAME, + task_id='sql_db_patch_task' + ) + # [END howto_operator_cloudsql_db_patch] + # [START howto_operator_cloudsql_db_delete] + sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator( + project_id=PROJECT_ID, + instance=INSTANCE_NAME, + database=DB_NAME, + task_id='sql_db_delete_task' + ) + # [END howto_operator_cloudsql_db_delete] # [START howto_operator_cloudsql_delete] sql_instance_delete_task = CloudSqlInstanceDeleteOperator( project_id=PROJECT_ID, @@ -131,4 +173,6 @@ ) # [END howto_operator_cloudsql_delete] - sql_instance_create_task >> sql_instance_patch_task >> sql_instance_delete_task + sql_instance_create_task >> sql_instance_patch_task \ + >> sql_db_create_task >> sql_db_patch_task \ + >> sql_db_delete_task >> sql_instance_delete_task diff --git a/airflow/contrib/hooks/gcp_sql_hook.py b/airflow/contrib/hooks/gcp_sql_hook.py index e0b3f92d8f20e..549ceaf49cec3 100644 --- a/airflow/contrib/hooks/gcp_sql_hook.py +++ b/airflow/contrib/hooks/gcp_sql_hook.py @@ -144,6 +144,96 @@ def delete_instance(self, project_id, instance): operation_name = response["name"] return self._wait_for_operation_to_complete(project_id, operation_name) + def get_database(self, project_id, instance, database): + """ + Retrieves a database resource from a Cloud SQL instance. + + :param project_id: Project ID of the project that contains the instance. + :type project_id: str + :param instance: Database instance ID. This does not include the project ID. + :type instance: str + :param database: Name of the database in the instance. + :type database: str + :return: A Cloud SQL database resource, as described in + https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases#resource + :rtype: dict + """ + return self.get_conn().databases().get( + project=project_id, + instance=instance, + database=database + ).execute(num_retries=NUM_RETRIES) + + def create_database(self, project, instance, body): + """ + Creates a new database inside a Cloud SQL instance. + + :param project: Project ID of the project that contains the instance. + :type project: str + :param instance: Database instance ID. This does not include the project ID. + :type instance: str + :param body: The request body, as described in + https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body + :type body: dict + :return: True if the operation succeeded, raises an error otherwise + :rtype: bool + """ + response = self.get_conn().databases().insert( + project=project, + instance=instance, + body=body + ).execute(num_retries=NUM_RETRIES) + operation_name = response["name"] + return self._wait_for_operation_to_complete(project, operation_name) + + def patch_database(self, project, instance, database, body): + """ + Updates a database resource inside a Cloud SQL instance. + This method supports patch semantics. + See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch + + :param project: Project ID of the project that contains the instance. + :type project: str + :param instance: Database instance ID. This does not include the project ID. + :type instance: str + :param database: Name of the database to be updated in the instance. + :type database: str + :param body: The request body, as described in + https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body + :type body: dict + :return: True if the operation succeeded, raises an error otherwise + :rtype: bool + """ + response = self.get_conn().databases().patch( + project=project, + instance=instance, + database=database, + body=body + ).execute(num_retries=NUM_RETRIES) + operation_name = response["name"] + return self._wait_for_operation_to_complete(project, operation_name) + + def delete_database(self, project, instance, database): + """ + Deletes a database from a Cloud SQL instance. + + :param project: Project ID of the project that contains the instance. + :type project: str + :param instance: Database instance ID. This does not include the project ID. + :type instance: str + :param database: Name of the database to be deleted in the instance. + :type database: str + :return: True if the operation succeeded, raises an error otherwise + :rtype: bool + """ + response = self.get_conn().databases().delete( + project=project, + instance=instance, + database=database + ).execute(num_retries=NUM_RETRIES) + operation_name = response["name"] + return self._wait_for_operation_to_complete(project, operation_name) + def _wait_for_operation_to_complete(self, project_id, operation_name): """ Waits for the named operation to complete - checks status of the diff --git a/airflow/contrib/operators/gcp_sql_operator.py b/airflow/contrib/operators/gcp_sql_operator.py index 0ba7a300c9e41..aca2e8ad40a34 100644 --- a/airflow/contrib/operators/gcp_sql_operator.py +++ b/airflow/contrib/operators/gcp_sql_operator.py @@ -91,6 +91,45 @@ ], optional=True), ], optional=True) ] +CLOUD_SQL_EXPORT_VALIDATION = [ + dict(name="exportContext", type="dict", fields=[ + dict(name="fileType", allow_empty=False), + dict(name="uri", allow_empty=False), + dict(name="databases", type="list"), + dict(name="sqlExportOptions", type="dict", optional=True, fields=[ + dict(name="tables", type="list"), + dict(name="schemaOnly") + ]), + dict(name="csvExportOptions", type="dict", optional=True, fields=[ + dict(name="selectQuery") + ]) + ]) +] +CLOUD_SQL_IMPORT_VALIDATION = [ + dict(name="importContext", type="dict", fields=[ + dict(name="fileType", allow_empty=False), + dict(name="uri", allow_empty=False), + dict(name="database", optional=True, allow_empty=False), + dict(name="importUser", optional=True), + dict(name="csvImportOptions", type="dict", optional=True, fields=[ + dict(name="table"), + dict(name="columns", type="list", optional=True) + ]) + ]) +] +CLOUD_SQL_DATABASE_INSERT_VALIDATION = [ + dict(name="instance", allow_empty=False), + dict(name="name", allow_empty=False), + dict(name="project", allow_empty=False), +] +CLOUD_SQL_DATABASE_PATCH_VALIDATION = [ + dict(name="instance", optional=True), + dict(name="name", optional=True), + dict(name="project", optional=True), + dict(name="etag", optional=True), + dict(name="charset", optional=True), + dict(name="collation", optional=True), +] class CloudSqlBaseOperator(BaseOperator): @@ -137,6 +176,15 @@ def _check_if_instance_exists(self, instance): return False raise e + def _check_if_db_exists(self, db_name): + try: + return self._hook.get_database(self.project_id, self.instance, db_name) + except HttpError as e: + status = e.resp.status + if status == 404: + return False + raise e + def execute(self, context): pass @@ -162,7 +210,7 @@ class CloudSqlInstanceCreateOperator(CloudSqlBaseOperator): :type instance: str :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str - :param api_version: API version used (e.g. v1). + :param api_version: API version used (e.g. v1beta4). :type api_version: str :param validate_body: True if body should be validated, False otherwise. :type validate_body: bool @@ -226,7 +274,7 @@ class CloudSqlInstancePatchOperator(CloudSqlBaseOperator): :type instance: str :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str - :param api_version: API version used (e.g. v1). + :param api_version: API version used (e.g. v1beta4). :type api_version: str """ # [START gcp_sql_patch_template_fields] @@ -270,7 +318,7 @@ class CloudSqlInstanceDeleteOperator(CloudSqlBaseOperator): :type instance: str :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str - :param api_version: API version used (e.g. v1). + :param api_version: API version used (e.g. v1beta4). :type api_version: str """ # [START gcp_sql_delete_template_fields] @@ -295,3 +343,184 @@ def execute(self, context): return True else: return self._hook.delete_instance(self.project_id, self.instance) + + +class CloudSqlInstanceDatabaseCreateOperator(CloudSqlBaseOperator): + """ + Creates a new database inside a Cloud SQL instance. + + :param project_id: Project ID of the project that contains the instance. + :type project_id: str + :param instance: Database instance ID. This does not include the project ID. + :type instance: str + :param body: The request body, as described in + https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body + :type body: dict + :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + :param api_version: API version used (e.g. v1beta4). + :type api_version: str + :param validate_body: Whether the body should be validated. Defaults to True. + :type validate_body: bool + """ + # [START gcp_sql_db_create_template_fields] + template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version') + # [END gcp_sql_db_create_template_fields] + + @apply_defaults + def __init__(self, + project_id, + instance, + body, + gcp_conn_id='google_cloud_default', + api_version='v1beta4', + validate_body=True, + *args, **kwargs): + self.body = body + self.validate_body = validate_body + super(CloudSqlInstanceDatabaseCreateOperator, self).__init__( + project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id, + api_version=api_version, *args, **kwargs) + + def _validate_inputs(self): + super(CloudSqlInstanceDatabaseCreateOperator, self)._validate_inputs() + if not self.body: + raise AirflowException("The required parameter 'body' is empty") + + def _validate_body_fields(self): + if self.validate_body: + GcpBodyFieldValidator(CLOUD_SQL_DATABASE_INSERT_VALIDATION, + api_version=self.api_version).validate(self.body) + + def execute(self, context): + self._validate_body_fields() + database = self.body.get("name") + if not database: + self.log.error("Body doesn't contain 'name'. Cannot check if the" + " database already exists in the instance {}." + .format(self.instance)) + return False + if self._check_if_db_exists(database): + self.log.info("Cloud SQL instance with ID {} already contains database" + " '{}'. Aborting database insert." + .format(self.instance, database)) + return True + else: + return self._hook.create_database(self.project_id, self.instance, self.body) + + +class CloudSqlInstanceDatabasePatchOperator(CloudSqlBaseOperator): + """ + Updates a resource containing information about a database inside a Cloud SQL + instance using patch semantics. + See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch + + :param project_id: Project ID of the project that contains the instance. + :type project_id: str + :param instance: Database instance ID. This does not include the project ID. + :type instance: str + :param database: Name of the database to be updated in the instance. + :type database: str + :param body: The request body, as described in + https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/patch#request-body + :type body: dict + :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + :param api_version: API version used (e.g. v1beta4). + :type api_version: str + :param validate_body: Whether the body should be validated. Defaults to True. + :type validate_body: bool + """ + # [START gcp_sql_db_patch_template_fields] + template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id', + 'api_version') + # [END gcp_sql_db_patch_template_fields] + + @apply_defaults + def __init__(self, + project_id, + instance, + database, + body, + gcp_conn_id='google_cloud_default', + api_version='v1beta4', + validate_body=True, + *args, **kwargs): + self.database = database + self.body = body + self.validate_body = validate_body + super(CloudSqlInstanceDatabasePatchOperator, self).__init__( + project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id, + api_version=api_version, *args, **kwargs) + + def _validate_inputs(self): + super(CloudSqlInstanceDatabasePatchOperator, self)._validate_inputs() + if not self.body: + raise AirflowException("The required parameter 'body' is empty") + if not self.database: + raise AirflowException("The required parameter 'database' is empty") + + def _validate_body_fields(self): + if self.validate_body: + GcpBodyFieldValidator(CLOUD_SQL_DATABASE_PATCH_VALIDATION, + api_version=self.api_version).validate(self.body) + + def execute(self, context): + self._validate_body_fields() + if not self._check_if_db_exists(self.database): + raise AirflowException("Cloud SQL instance with ID {} does not contain " + "database '{}'. " + "Please specify another database to patch." + .format(self.instance, self.database)) + else: + return self._hook.patch_database(self.project_id, self.instance, + self.database, self.body) + + +class CloudSqlInstanceDatabaseDeleteOperator(CloudSqlBaseOperator): + """ + Deletes a database from a Cloud SQL instance. + + :param project_id: Project ID of the project that contains the instance. + :type project_id: str + :param instance: Database instance ID. This does not include the project ID. + :type instance: str + :param database: Name of the database to be deleted in the instance. + :type database: str + :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + :param api_version: API version used (e.g. v1beta4). + :type api_version: str + """ + # [START gcp_sql_db_delete_template_fields] + template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id', + 'api_version') + # [END gcp_sql_db_delete_template_fields] + + @apply_defaults + def __init__(self, + project_id, + instance, + database, + gcp_conn_id='google_cloud_default', + api_version='v1beta4', + *args, **kwargs): + self.database = database + super(CloudSqlInstanceDatabaseDeleteOperator, self).__init__( + project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id, + api_version=api_version, *args, **kwargs) + + def _validate_inputs(self): + super(CloudSqlInstanceDatabaseDeleteOperator, self)._validate_inputs() + if not self.database: + raise AirflowException("The required parameter 'database' is empty") + + def execute(self, context): + if not self._check_if_db_exists(self.database): + print("Cloud SQL instance with ID {} does not contain database '{}'. " + "Aborting database delete." + .format(self.instance, self.database)) + return True + else: + return self._hook.delete_database(self.project_id, self.instance, + self.database) diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst index 025274a5a41ae..6333e32dd7d67 100644 --- a/docs/howto/operator.rst +++ b/docs/howto/operator.rst @@ -284,6 +284,148 @@ If the source code for your function is in Google Source Repository, make sure t your service account has the Source Repository Viewer role so that the source code can be downloaded if necessary. +CloudSqlInstanceDatabaseCreateOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Creates a new database inside a Cloud SQL instance. + +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseCreateOperator`. + +Arguments +""""""""" + +Some arguments in the example DAG are taken from environment variables: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py + :language: python + :start-after: [START howto_operator_cloudsql_arguments] + :end-before: [END howto_operator_cloudsql_arguments] + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_cloudsql_db_create] + :end-before: [END howto_operator_cloudsql_db_create] + +Example request body: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py + :language: python + :start-after: [START howto_operator_cloudsql_db_create_body] + :end-before: [END howto_operator_cloudsql_db_create_body] + +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py + :language: python + :dedent: 4 + :start-after: [START gcp_sql_db_create_template_fields] + :end-before: [END gcp_sql_db_create_template_fields] + +More information +"""""""""""""""" + +See `Google Cloud SQL API documentation for database insert +`_. + +CloudSqlInstanceDatabaseDeleteOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Deletes a database from a Cloud SQL instance. + +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseDeleteOperator`. + +Arguments +""""""""" + +Some arguments in the example DAG are taken from environment variables: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py + :language: python + :start-after: [START howto_operator_cloudsql_arguments] + :end-before: [END howto_operator_cloudsql_arguments] + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_cloudsql_db_delete] + :end-before: [END howto_operator_cloudsql_db_delete] + +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py + :language: python + :dedent: 4 + :start-after: [START gcp_sql_db_delete_template_fields] + :end-before: [END gcp_sql_db_delete_template_fields] + +More information +"""""""""""""""" + +See `Google Cloud SQL API documentation for database delete +`_. + +CloudSqlInstanceDatabasePatchOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Updates a resource containing information about a database inside a Cloud SQL instance +using patch semantics. +See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch + +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabasePatchOperator`. + +Arguments +""""""""" + +Some arguments in the example DAG are taken from environment variables: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py + :language: python + :start-after: [START howto_operator_cloudsql_arguments] + :end-before: [END howto_operator_cloudsql_arguments] + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_cloudsql_db_patch] + :end-before: [END howto_operator_cloudsql_db_patch] + +Example request body: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py + :language: python + :start-after: [START howto_operator_cloudsql_db_patch_body] + :end-before: [END howto_operator_cloudsql_db_patch_body] + +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py + :language: python + :dedent: 4 + :start-after: [START gcp_sql_db_patch_template_fields] + :end-before: [END gcp_sql_db_patch_template_fields] + +More information +"""""""""""""""" + +See `Google Cloud SQL API documentation for database patch +`_. + CloudSqlInstanceDeleteOperator ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -295,7 +437,7 @@ For parameter definition take a look at Arguments """"""""" -Some arguments in the example DAG are taken from Airflow variables: +Some arguments in the example DAG are taken from environment variables: .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py :language: python @@ -342,7 +484,7 @@ will succeed. Arguments """"""""" -Some arguments in the example DAG are taken from Airflow variables: +Some arguments in the example DAG are taken from environment variables: .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py :language: python @@ -398,7 +540,7 @@ unchanged. Arguments """"""""" -Some arguments in the example DAG are taken from Airflow variables: +Some arguments in the example DAG are taken from environment variables: .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py :language: python diff --git a/docs/integration.rst b/docs/integration.rst index 34e062437c530..cb05e941696c9 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -515,10 +515,37 @@ Cloud SQL Cloud SQL Operators """"""""""""""""""" +- :ref:`CloudSqlInstanceDatabaseDeleteOperator` : deletes a database from a Cloud SQL +instance. +- :ref:`CloudSqlInstanceDatabaseCreateOperator` : creates a new database inside a Cloud +SQL instance. +- :ref:`CloudSqlInstanceDatabasePatchOperator` : updates a database inside a Cloud +SQL instance. - :ref:`CloudSqlInstanceDeleteOperator` : delete a Cloud SQL instance. - :ref:`CloudSqlInstanceCreateOperator` : create a new Cloud SQL instance. - :ref:`CloudSqlInstancePatchOperator` : patch a Cloud SQL instance. +.. CloudSqlInstanceDatabaseDeleteOperator: + +CloudSqlInstanceDatabaseDeleteOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseDeleteOperator + +.. CloudSqlInstanceDatabaseCreateOperator: + +CloudSqlInstanceDatabaseCreateOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseCreateOperator + +.. CloudSqlInstanceDatabasePatchOperator: + +CloudSqlInstanceDatabasePatchOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabasePatchOperator + .. CloudSqlInstanceDeleteOperator: CloudSqlInstanceDeleteOperator @@ -536,12 +563,12 @@ CloudSqlInstanceCreateOperator .. CloudSqlInstancePatchOperator: CloudSqlInstancePatchOperator -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstancePatchOperator Cloud SQL Hook -"""""""""""""""""""" +"""""""""""""" .. autoclass:: airflow.contrib.hooks.gcp_sql_hook.CloudSqlHook :members: @@ -566,14 +593,14 @@ GceInstanceStartOperator .. _GceInstanceStopOperator: GceInstanceStopOperator -^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceInstanceStopOperator .. _GceSetMachineTypeOperator: GceSetMachineTypeOperator -^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator diff --git a/tests/contrib/operators/test_gcp_sql_operator.py b/tests/contrib/operators/test_gcp_sql_operator.py index 245631808a223..31ed3d37c3c52 100644 --- a/tests/contrib/operators/test_gcp_sql_operator.py +++ b/tests/contrib/operators/test_gcp_sql_operator.py @@ -21,7 +21,9 @@ from airflow import AirflowException from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \ - CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator + CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator, \ + CloudSqlInstanceDatabaseCreateOperator, CloudSqlInstanceDatabasePatchOperator, \ + CloudSqlInstanceDatabaseDeleteOperator try: # noinspection PyProtectedMember @@ -34,6 +36,7 @@ PROJECT_ID = "project-id" INSTANCE_NAME = "test-name" +DB_NAME = "db1" CREATE_BODY = { "name": INSTANCE_NAME, "settings": { @@ -109,6 +112,21 @@ }, "region": "europe-west4" } +DATABASE_INSERT_BODY = { + "name": DB_NAME, # The name of the database in the Cloud SQL instance. + # This does not include the project ID or instance name. + + "project": PROJECT_ID, # The project ID of the project containing the Cloud SQL + # database. The Google apps domain is prefixed if + # applicable. + + "instance": INSTANCE_NAME, # The name of the Cloud SQL instance. + # This does not include the project ID. +} +DATABASE_PATCH_BODY = { + "charset": "utf16", + "collation": "utf16_general_ci" +} class CloudSqlTest(unittest.TestCase): @@ -312,3 +330,135 @@ def test_instance_delete_should_abort_and_succeed_if_not_exists( mock_hook.assert_called_once_with(api_version="v1beta4", gcp_conn_id="google_cloud_default") mock_hook.return_value.delete_instance.assert_not_called() + + @mock.patch("airflow.contrib.operators.gcp_sql_operator" + ".CloudSqlInstanceDatabaseCreateOperator._check_if_db_exists") + @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook") + def test_instance_db_create(self, mock_hook, _check_if_db_exists): + _check_if_db_exists.return_value = False + op = CloudSqlInstanceDatabaseCreateOperator( + project_id=PROJECT_ID, + instance=INSTANCE_NAME, + body=DATABASE_INSERT_BODY, + task_id="id" + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version="v1beta4", + gcp_conn_id="google_cloud_default") + mock_hook.return_value.create_database.assert_called_once_with( + PROJECT_ID, INSTANCE_NAME, DATABASE_INSERT_BODY + ) + self.assertTrue(result) + + @mock.patch("airflow.contrib.operators.gcp_sql_operator" + ".CloudSqlInstanceDatabaseCreateOperator._check_if_db_exists") + @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook") + def test_instance_db_create_should_abort_and_succeed_if_exists( + self, mock_hook, _check_if_db_exists): + _check_if_db_exists.return_value = True + op = CloudSqlInstanceDatabaseCreateOperator( + project_id=PROJECT_ID, + instance=INSTANCE_NAME, + body=DATABASE_INSERT_BODY, + task_id="id" + ) + result = op.execute(None) + self.assertTrue(result) + mock_hook.assert_called_once_with(api_version="v1beta4", + gcp_conn_id="google_cloud_default") + mock_hook.return_value.create_database.assert_not_called() + + @mock.patch("airflow.contrib.operators.gcp_sql_operator" + ".CloudSqlInstanceDatabasePatchOperator._check_if_db_exists") + @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook") + def test_instance_db_patch(self, mock_hook, _check_if_db_exists): + _check_if_db_exists.return_value = True + op = CloudSqlInstanceDatabasePatchOperator( + project_id=PROJECT_ID, + instance=INSTANCE_NAME, + database=DB_NAME, + body=DATABASE_PATCH_BODY, + task_id="id" + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version="v1beta4", + gcp_conn_id="google_cloud_default") + mock_hook.return_value.patch_database.assert_called_once_with( + PROJECT_ID, INSTANCE_NAME, DB_NAME, DATABASE_PATCH_BODY + ) + self.assertTrue(result) + + @mock.patch("airflow.contrib.operators.gcp_sql_operator" + ".CloudSqlInstanceDatabasePatchOperator._check_if_db_exists") + @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook") + def test_instance_db_patch_should_throw_ex_if_not_exists( + self, mock_hook, _check_if_db_exists): + _check_if_db_exists.return_value = False + with self.assertRaises(AirflowException) as cm: + op = CloudSqlInstanceDatabasePatchOperator( + project_id=PROJECT_ID, + instance=INSTANCE_NAME, + database=DB_NAME, + body=DATABASE_PATCH_BODY, + task_id="id" + ) + op.execute(None) + err = cm.exception + self.assertIn("Cloud SQL instance with ID", str(err)) + self.assertIn("does not contain database", str(err)) + mock_hook.assert_called_once_with(api_version="v1beta4", + gcp_conn_id="google_cloud_default") + mock_hook.return_value.patch_database.assert_not_called() + + @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook") + def test_instance_db_patch_should_throw_ex_when_empty_database(self, mock_hook): + with self.assertRaises(AirflowException) as cm: + op = CloudSqlInstanceDatabasePatchOperator( + project_id=PROJECT_ID, + instance=INSTANCE_NAME, + database="", + body=DATABASE_INSERT_BODY, + task_id="id" + ) + op.execute(None) + err = cm.exception + self.assertIn("The required parameter 'database' is empty", str(err)) + mock_hook.assert_not_called() + mock_hook.return_value.patch_database.assert_not_called() + + @mock.patch("airflow.contrib.operators.gcp_sql_operator" + ".CloudSqlInstanceDatabaseDeleteOperator._check_if_db_exists") + @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook") + def test_instance_db_delete(self, mock_hook, _check_if_db_exists): + _check_if_db_exists.return_value = True + op = CloudSqlInstanceDatabaseDeleteOperator( + project_id=PROJECT_ID, + instance=INSTANCE_NAME, + database=DB_NAME, + task_id="id" + ) + result = op.execute(None) + self.assertTrue(result) + mock_hook.assert_called_once_with(api_version="v1beta4", + gcp_conn_id="google_cloud_default") + mock_hook.return_value.delete_database.assert_called_once_with( + PROJECT_ID, INSTANCE_NAME, DB_NAME + ) + + @mock.patch("airflow.contrib.operators.gcp_sql_operator" + ".CloudSqlInstanceDatabaseDeleteOperator._check_if_db_exists") + @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook") + def test_instance_db_delete_should_abort_and_succeed_if_not_exists( + self, mock_hook, _check_if_db_exists): + _check_if_db_exists.return_value = False + op = CloudSqlInstanceDatabaseDeleteOperator( + project_id=PROJECT_ID, + instance=INSTANCE_NAME, + database=DB_NAME, + task_id="id" + ) + result = op.execute(None) + self.assertTrue(result) + mock_hook.assert_called_once_with(api_version="v1beta4", + gcp_conn_id="google_cloud_default") + mock_hook.return_value.delete_database.assert_not_called()