Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documentation for Google Cloud Data Loss Prevention #9651

Merged
merged 1 commit into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions airflow/providers/google/cloud/example_dags/example_dlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,14 @@
from airflow import models
from airflow.providers.google.cloud.operators.dlp import (
CloudDLPCreateInspectTemplateOperator,
CloudDLPCreateJobTriggerOperator,
CloudDLPCreateStoredInfoTypeOperator,
CloudDLPDeleteInspectTemplateOperator,
CloudDLPDeleteJobTriggerOperator,
CloudDLPDeleteStoredInfoTypeOperator,
CloudDLPInspectContentOperator,
CloudDLPUpdateJobTriggerOperator,
CloudDLPUpdateStoredInfoTypeOperator,
)
from airflow.utils.dates import days_ago

Expand All @@ -54,6 +60,7 @@
start_date=days_ago(1),
tags=['example'],
) as dag:
# [START howto_operator_dlp_create_inspect_template]
create_template = CloudDLPCreateInspectTemplateOperator(
project_id=GCP_PROJECT,
inspect_template=INSPECT_TEMPLATE,
Expand All @@ -62,17 +69,104 @@
do_xcom_push=True,
dag=dag,
)
# [END howto_operator_dlp_create_inspect_template]

# [START howto_operator_dlp_use_inspect_template]
inspect_content = CloudDLPInspectContentOperator(
task_id="inpsect_content",
project_id=GCP_PROJECT,
item=ITEM,
inspect_template_name="{{ task_instance.xcom_pull('create_template', key='return_value')['name'] }}",
dag=dag,
)
# [END howto_operator_dlp_use_inspect_template]

# [START howto_operator_dlp_delete_inspect_template]
delete_template = CloudDLPDeleteInspectTemplateOperator(
task_id="delete_template", template_id=TEMPLATE_ID, project_id=GCP_PROJECT, dag=dag,
)
# [END howto_operator_dlp_delete_inspect_template]

create_template >> inspect_content >> delete_template


CUSTOM_INFO_TYPES = [{"info_type": {"name": "C_MRN"}, "regex": {"pattern": "[1-9]{3}-[1-9]{1}-[1-9]{5}"},}]
CUSTOM_INFO_TYPE_ID = "custom_info_type"
UPDATE_CUSTOM_INFO_TYPE = [
{"info_type": {"name": "C_MRN"}, "regex": {"pattern": "[a-z]{3}-[a-z]{1}-[a-z]{5}"},}
]

with models.DAG(
"example_gcp_dlp_info_types",
schedule_interval=None,
start_date=days_ago(1),
tags=["example", "dlp", "info-types"],
) as dag:
# [START howto_operator_dlp_create_info_type]
create_info_type = CloudDLPCreateStoredInfoTypeOperator(
project_id=GCP_PROJECT,
config=CUSTOM_INFO_TYPES,
stored_info_type_id=CUSTOM_INFO_TYPE_ID,
dag=dag,
task_id="create_info_type",
)
# [END howto_operator_dlp_create_info_type]
# [START howto_operator_dlp_update_info_type]
update_info_type = CloudDLPUpdateStoredInfoTypeOperator(
project_id=GCP_PROJECT,
stored_info_type_id=CUSTOM_INFO_TYPE_ID,
config=UPDATE_CUSTOM_INFO_TYPE,
dag=dag,
task_id="update_info_type",
)
# [END howto_operator_dlp_update_info_type]
# [START howto_operator_dlp_delete_info_type]
delete_info_type = CloudDLPDeleteStoredInfoTypeOperator(
project_id=GCP_PROJECT, stored_info_type_id=CUSTOM_INFO_TYPE_ID, dag=dag, task_id="delete_info_type",
)
# [END howto_operator_dlp_delete_info_type]
create_info_type >> update_info_type >> delete_info_type

SCHEDULE = {"recurrence_period_duration": {"seconds": 60 * 60 * 24}}
JOB = {
"inspect_config": INSPECT_CONFIG,
}

JOB_TRIGGER = {
"inspect_job": JOB,
"triggers": [{"schedule": SCHEDULE}],
"status": "HEALTHY",
}

TRIGGER_ID = "example_trigger"

with models.DAG(
"example_gcp_dlp_job", schedule_interval=None, start_date=days_ago(1), tags=["example", "dlp_job"],
) as dag: # [START howto_operator_dlp_create_job_trigger]
create_trigger = CloudDLPCreateJobTriggerOperator(
project_id=GCP_PROJECT,
job_trigger=JOB_TRIGGER,
trigger_id=TRIGGER_ID,
dag=dag,
task_id="create_trigger",
)
Comment on lines +146 to +152
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This task fails with this error:

  File "/airflow/airflow/providers/google/cloud/operators/dlp.py", line 433, in execute
    metadata=self.metadata,
  File "/airflow/airflow/providers/google/common/hooks/base_google.py", line 376, in inner_wrapper
    return func(self, *args, **kwargs)
  File "/airflow/airflow/providers/google/cloud/hooks/dlp.py", line 375, in create_job_trigger
    metadata=metadata,
  File "/.virtualenvs/airflow/lib/python3.6/site-packages/google/cloud/dlp_v2/gapic/dlp_service_client.py", line 2557, in create_job_trigger
    request, retry=retry, timeout=timeout, metadata=metadata
  File "/.virtualenvs/airflow/lib/python3.6/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__
    return wrapped_func(*args, **kwargs)
  File "/.virtualenvs/airflow/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "<string>", line 3, in raise_from
google.api_core.exceptions.InvalidArgument: 400 `StorageConfig` must be set.

# [END howto_operator_dlp_create_job_trigger]
UPDATED_SCHEDULE = {"recurrence_period_duration": {"seconds": 2 * 60 * 60 * 24}}

JOB_TRIGGER["triggers"] = [{"schedule": UPDATED_SCHEDULE}]

# [START howto_operator_dlp_update_job_trigger]
update_trigger = CloudDLPUpdateJobTriggerOperator(
project_id=GCP_PROJECT,
job_trigger_id=TRIGGER_ID,
job_trigger=JOB_TRIGGER,
dag=dag,
task_id="update_info_type",
)
# [END howto_operator_dlp_update_job_trigger]
# [START howto_operator_dlp_delete_job_trigger]
delete_trigger = CloudDLPDeleteJobTriggerOperator(
project_id=GCP_PROJECT, job_trigger_id=TRIGGER_ID, dag=dag, task_id="delete_info_type",
)
# [END howto_operator_dlp_delete_job_trigger]
create_trigger >> update_trigger >> delete_trigger
Loading