Skip to content

Commit

Permalink
Documentation and example dag for CloudDLPDeidentifyContentOperator, …
Browse files Browse the repository at this point in the history
…GCSObjectExistenceSensor, GCSObjectsWithPrefixExistenceSensor (#14033)

* Add documentation and example dag for: CloudDLPDeidentifyContentOperator, GCSObjectExistenceSensor, GCSObjectsWtihPrefixExistenceSensor

* Moving gcs sensor docs and example dags to gcs operators docs/example dags

* Add system tests for dlp and gcs

* Adding further information on DLPDeidentifyContent operators

* Pre-Commit tidyup: Renamed gcs/dlp system tests

* Apply suggestions from code review

Co-authored-by: Kamil Breguła <[email protected]>

* reverting some changes following code review

* removed redundant @pytest.mark.system("google.cloud")

* removed operators with newly added examples from missing examples list (pytest fix)

* updated all references to GCSObjectsWtihPrefixExistenceSensor (typo) to newly fixed: GCSObjectsWithPrefixExistenceSensor

* fixing merge issue: including deprecated operator to be excluded from test suite of operators

Co-authored-by: rachael-ds <[email protected]>
Co-authored-by: Kamil Breguła <[email protected]>
  • Loading branch information
3 people authored Feb 23, 2021
1 parent b995127 commit c281979
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 16 deletions.
31 changes: 31 additions & 0 deletions airflow/providers/google/cloud/example_dags/example_dlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
CloudDLPCreateInspectTemplateOperator,
CloudDLPCreateJobTriggerOperator,
CloudDLPCreateStoredInfoTypeOperator,
CloudDLPDeidentifyContentOperator,
CloudDLPDeleteInspectTemplateOperator,
CloudDLPDeleteJobTriggerOperator,
CloudDLPDeleteStoredInfoTypeOperator,
Expand Down Expand Up @@ -177,3 +178,33 @@
)
# [END howto_operator_dlp_delete_job_trigger]
create_trigger >> update_trigger >> delete_trigger

# [START dlp_deidentify_config_example]
DEIDENTIFY_CONFIG = {
"info_type_transformations": {
"transformations": [
{
"primitive_transformation": {
"replace_config": {"new_value": {"string_value": "[deidentified_number]"}}
}
}
]
}
}
# [END dlp_deidentify_config_example]

with models.DAG(
"example_gcp_dlp_deidentify_content",
schedule_interval=None,
start_date=days_ago(1),
tags=["example", "dlp", "deidentify"],
) as dag4:
# [START _howto_operator_dlp_deidentify_content]
deidentify_content = CloudDLPDeidentifyContentOperator(
project_id=GCP_PROJECT,
item=ITEM,
deidentify_config=DEIDENTIFY_CONFIG,
inspect_config=INSPECT_CONFIG,
task_id="deidentify_content",
)
# [END _howto_operator_dlp_deidentify_content]
40 changes: 40 additions & 0 deletions airflow/providers/google/cloud/example_dags/example_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
GCSListObjectsOperator,
GCSObjectCreateAclEntryOperator,
)
from airflow.providers.google.cloud.sensors.gcs import (
GCSObjectExistenceSensor,
GCSObjectsWithPrefixExistenceSensor,
)
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
Expand All @@ -48,6 +52,7 @@

PATH_TO_TRANSFORM_SCRIPT = os.environ.get('GCP_GCS_PATH_TO_TRANSFORM_SCRIPT', 'test.py')
PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt")
PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt")

BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
Expand Down Expand Up @@ -151,6 +156,41 @@
copy_file >> delete_bucket_2
delete_files >> delete_bucket_1

with models.DAG(
"example_gcs_sensors",
start_date=days_ago(1),
schedule_interval=None,
tags=['example'],
) as dag2:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_1, project_id=PROJECT_ID
)
upload_file = LocalFilesystemToGCSOperator(
task_id="upload_file",
src=PATH_TO_UPLOAD_FILE,
dst=BUCKET_FILE_LOCATION,
bucket=BUCKET_1,
)
# [START howto_sensor_object_exists_task]
gcs_object_exists = GCSObjectExistenceSensor(
bucket=BUCKET_1,
object=PATH_TO_UPLOAD_FILE,
mode='poke',
task_id="gcs_object_exists_task",
)
# [END howto_sensor_object_exists_task]
# [START howto_sensor_object_with_prefix_exists_task]
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
bucket=BUCKET_1,
prefix=PATH_TO_UPLOAD_FILE_PREFIX,
mode='poke',
task_id="gcs_object_with_prefix_exists_task",
)
# [END howto_sensor_object_with_prefix_exists_task]
delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_1)

create_bucket >> upload_file >> [gcs_object_exists, gcs_object_with_prefix_exists] >> delete_bucket


if __name__ == '__main__':
dag.clear(dag_run_state=State.NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,25 @@ Unlike storage methods (Jobs) content method are synchronous, stateless methods.

De-identify Content
"""""""""""""""""""
De-identification is the process of removing identifying information from data.
Configuration information defines how you want the sensitive data de-identified.

To de-identify potentially sensitive info from a content item, you can use
This config can either be saved and persisted in de-identification templates or defined in a :class:`~google.cloud.dlp_v2.types.DeidentifyConfig` object:

.. literalinclude:: /../../airflow/providers/google/cloud/example_dags/example_dlp.py
:language: python
:start-after: [START dlp_deidentify_config_example]
:end-before: [END dlp_deidentify_config_example]

To de-identify potentially sensitive information from a content item, you can use
:class:`~airflow.providers.google.cloud.operators.cloud.dlp.CloudDLPDeidentifyContentOperator`.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dlp.py
:language: python
:dedent: 4
:start-after: [START _howto_operator_dlp_deidentify_content]
:end-before: [END _howto_operator_dlp_deidentify_content]

.. _howto/operator:CloudDLPReidentifyContentOperator:

Re-identify Content
Expand Down
62 changes: 54 additions & 8 deletions docs/apache-airflow-providers-google/operators/cloud/gcs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
Google Cloud Storage Operators
==============================

Cloud Storage allows world-wide storage and retrieval of any amount of data at any time.
You can use Cloud Storage for a range of scenarios including serving website content,
storing data for archival and disaster recovery, or distributing large data objects to users via direct download.

.. contents::
:depth: 1
:local:
Expand All @@ -29,6 +33,9 @@ Prerequisite Tasks

.. include::/operators/_partials/prerequisite_tasks.rst
Operators
^^^^^^^^^

.. _howto/operator:GCSToBigQueryOperator:

GCSToBigQueryOperator
Expand Down Expand Up @@ -111,13 +118,6 @@ More information
See Google Cloud Storage insert documentation to `create a ACL entry for ObjectAccess
<https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert>`_.

Reference
---------

For further information, look at:

* `Client Library Documentation <https://googleapis.github.io/google-cloud-python/latest/storage/index.html>`__
* `Product Documentation <https://cloud.google.com/storage/docs/>`__

.. _howto/operator:GCSDeleteBucketOperator:

Expand All @@ -134,14 +134,60 @@ It is performed through the
:start-after: [START howto_operator_gcs_delete_bucket]
:end-before: [END howto_operator_gcs_delete_bucket]


You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.gcs.GCSDeleteBucketOperator`
parameters which allows you to dynamically determine values.

Reference
^^^^^^^^^
---------

For further information, look at:

* `Client Library Documentation <https://googleapis.dev/python/storage/latest/buckets.html>`__
* `Product Documentation <https://cloud.google.com/storage/docs/json_api/v1/buckets>`__

Sensors
^^^^^^^

.. _howto/sensor:GCSObjectExistenceSensor:

GCSObjectExistenceSensor
------------------------

Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor` to wait (poll) for the existence of a file in Google Cloud Storage.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_object_exists_task]
:end-before: [END howto_sensor_object_exists_task]

.. _howto/sensor:GCSObjectsWithPrefixExistenceSensor:

GCSObjectsWithPrefixExistenceSensor
-----------------------------------

Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectsWithPrefixExistenceSensor` to wait (poll) for the existence of a file with a specified prefix in Google Cloud Storage.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_object_with_prefix_exists_task]
:end-before: [END howto_sensor_object_with_prefix_exists_task]

More information
""""""""""""""""

Sensors have different modes that determine the behaviour of resources while the task is executing.
See `Airflow sensors documentation
<https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#sensors>`_ for best practices when using sensors.


Reference
^^^^^^^^^

For further information, look at:

* `Client Library Documentation <https://googleapis.github.io/google-cloud-python/latest/storage/index.html>`__
* `Product Documentation <https://cloud.google.com/storage/docs/>`__
4 changes: 1 addition & 3 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,7 @@
'README.rst',
]
elif PACKAGE_NAME.startswith('apache-airflow-providers-'):
exclude_patterns = [
'operators/_partials',
]
exclude_patterns = ['operators/_partials']
else:
exclude_patterns = []

Expand Down
6 changes: 2 additions & 4 deletions tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
# Deprecated operator. Ignore it.
'airflow.providers.google.cloud.operators.cloud_storage_transfer_service'
'.CloudDataTransferServiceGCSToGCSOperator',
# Deprecated operator. Ignore it.
'airflow.providers.google.cloud.sensors.gcs.GCSObjectsWtihPrefixExistenceSensor',
# Base operator. Ignore it.
'airflow.providers.google.cloud.operators.cloud_sql.CloudSQLBaseOperator',
# Deprecated operator. Ignore it
Expand Down Expand Up @@ -198,7 +200,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
'airflow.providers.google.cloud.operators.dlp.CloudDLPCreateDeidentifyTemplateOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPCreateDLPJobOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPUpdateDeidentifyTemplateOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPDeidentifyContentOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDLPJobTriggerOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPListDeidentifyTemplatesOperator',
'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDeidentifyTemplateOperator',
Expand All @@ -218,10 +219,7 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
'airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator',
# Base operator. Ignore it
'airflow.providers.google.cloud.operators.compute.ComputeEngineBaseOperator',
'airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor',
'airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor',
'airflow.providers.google.cloud.sensors.gcs.GCSObjectsWithPrefixExistenceSensor',
'airflow.providers.google.cloud.sensors.gcs.GCSObjectsWtihPrefixExistenceSensor',
'airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor',
}

Expand Down
4 changes: 4 additions & 0 deletions tests/providers/google/cloud/operators/test_dlp_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,7 @@ def test_run_example_info_types(self):
@provide_gcp_context(GCP_DLP_KEY)
def test_run_example_dlp_job(self):
self.run_dag('example_gcp_dlp_job', CLOUD_DAG_FOLDER)

@provide_gcp_context(GCP_DLP_KEY)
def test_run_example_dlp_deidentify_content(self):
self.run_dag('example_gcp_dlp_deidentify_content', CLOUD_DAG_FOLDER)
4 changes: 4 additions & 0 deletions tests/providers/google/cloud/operators/test_gcs_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ def tearDown(self):
@provide_gcp_context(GCP_GCS_KEY)
def test_run_example_dag(self):
self.run_dag('example_gcs', CLOUD_DAG_FOLDER)

@provide_gcp_context(GCP_GCS_KEY)
def test_run_example_gcs_sensor_dag(self):
self.run_dag('example_gcs_sensors', CLOUD_DAG_FOLDER)

0 comments on commit c281979

Please sign in to comment.