Skip to content

Commit

Permalink
feat: allow task ownership as group
Browse files Browse the repository at this point in the history
  • Loading branch information
Francesco Macagno committed Jun 18, 2024
1 parent b8214b0 commit e4949dc
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 17 deletions.
30 changes: 16 additions & 14 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ enabled = True # default
```

| Name | Default value | Description |
| -------------------------- | -------------------- | ---------------------------------------------------------------------------------------- |
|----------------------------|----------------------|------------------------------------------------------------------------------------------|
| enabled | true | If the plugin should be enabled. |
| conn_id | datahub_rest_default | The name of the datahub rest connection. |
| cluster | prod | name of the airflow cluster, this is equivalent to the `env` of the instance |
| cluster | prod | name of the airflow cluster, this is equivalent to the `env` of the instance |
| capture_ownership_info | true | Extract DAG ownership. |
| capture_ownership_as_group | false | When extracting DAG ownership, treat DAG owner as a group rather than a user |
| capture_tags_info | true | Extract DAG tags. |
| capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
Expand Down Expand Up @@ -130,18 +131,19 @@ conn_id = datahub_rest_default # or datahub_kafka_default
# etc.
```

| Name | Default value | Description |
| ---------------------- | -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| enabled | true | If the plugin should be enabled. |
| conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. |
| cluster | prod | name of the airflow cluster |
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
| Name | Default value | Description |
|----------------------------|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| enabled | true | If the plugin should be enabled. |
| conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. |
| cluster | prod | name of the airflow cluster |
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| capture_ownership_as_group | false | When extracting DAG ownership, treat DAG owner as a group rather than a user. |
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |

#### Validate that the plugin is working

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ class DatahubLineageConfig(ConfigModel):
# Cluster to associate with the pipelines and tasks. Defaults to "prod".
cluster: str = builder.DEFAULT_FLOW_CLUSTER

# If true, the owners field of the DAG will be capture as a DataHub corpuser.
# If true, the owners field of the DAG will be captured as a DataHub corpuser.
capture_ownership_info: bool = True

# If true, the owners field of the DAG will instead be captured as a DataHub corpgroup.
capture_ownership_as_group: bool = False

# If true, the tags field of the DAG will be captured as DataHub tags.
capture_tags_info: bool = True

Expand Down Expand Up @@ -70,6 +73,9 @@ def get_lineage_config() -> DatahubLineageConfig:
capture_ownership_info = conf.get(
"datahub", "capture_ownership_info", fallback=True
)
capture_ownership_as_group = conf.get(
"datahub", "capture_ownership_as_group", fallback=False
)
capture_executions = conf.get("datahub", "capture_executions", fallback=True)
materialize_iolets = conf.get("datahub", "materialize_iolets", fallback=True)
enable_extractors = conf.get("datahub", "enable_extractors", fallback=True)
Expand All @@ -87,6 +93,7 @@ def get_lineage_config() -> DatahubLineageConfig:
datahub_conn_id=datahub_conn_id,
cluster=cluster,
capture_ownership_info=capture_ownership_info,
capture_ownership_as_group=capture_ownership_as_group,
capture_tags_info=capture_tags_info,
capture_executions=capture_executions,
materialize_iolets=materialize_iolets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ def generate_dataflow(
data_flow.url = f"{base_url}/tree?dag_id={dag.dag_id}"

if config.capture_ownership_info and dag.owner:
data_flow.owners.update(owner.strip() for owner in dag.owner.split(","))
owners = [owner.strip() for owner in dag.owner.split(",")]
if config.capture_ownership_as_group:
data_flow.group_owners.update(owners)
else:
data_flow.owners.update(owners)

if config.capture_tags_info and dag.tags:
data_flow.tags.update(dag.tags)
Expand Down Expand Up @@ -278,7 +282,10 @@ def generate_datajob(
datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.flow_id}&_flt_3_task_id={task.task_id}"

if capture_owner and dag.owner:
datajob.owners.add(dag.owner)
if config and config.capture_ownership_as_group:
datajob.group_owners.add(dag.owner)
else:
datajob.owners.add(dag.owner)

if capture_tags and dag.tags:
datajob.tags.update(dag.tags)
Expand Down

0 comments on commit e4949dc

Please sign in to comment.