Skip to content

Commit

Permalink
feat(ingest): add CorpUser and CorpGroup to the Python SDK (datahub-p…
Browse files Browse the repository at this point in the history
…roject#5930)

Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
2 people authored and david-leifker committed Oct 6, 2022
1 parent 5be9557 commit 5388808
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import time
import uuid

from datahub.api.entities.corpgroup.corpgroup import CorpGroup
from datahub.api.entities.corpuser.corpuser import CorpUser
from datahub.api.entities.datajob.dataflow import DataFlow
from datahub.api.entities.datajob.datajob import DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
Expand All @@ -22,7 +24,7 @@
dataJob2 = DataJob(flow_urn=jobFlow.urn, id="job2", name="My Job 2")
dataJob2.upstream_urns.append(dataJob.urn)
dataJob2.tags.add("TestTag")
dataJob2.owners.add("[email protected]")
dataJob2.owners.add("testUser")
dataJob2.emit(emitter)

dataJob3 = DataJob(flow_urn=jobFlow.urn, id="job3", name="My Job 3")
Expand All @@ -32,6 +34,7 @@
dataJob4 = DataJob(flow_urn=jobFlow.urn, id="job4", name="My Job 4")
dataJob4.upstream_urns.append(dataJob2.urn)
dataJob4.upstream_urns.append(dataJob3.urn)
dataJob4.group_owners.add("testGroup")
dataJob4.emit(emitter)

# Hello World
Expand Down Expand Up @@ -105,3 +108,20 @@
end_timestamp_millis=int(time.time() * 1000),
result=InstanceRunResult.SUCCESS,
)

user1 = CorpUser(
id="testUser",
display_name="Test User",
email="[email protected]",
groups=["testGroup"],
)
user1.emit(emitter)

group1 = CorpGroup(
id="testGroup",
display_name="Test Group",
email="[email protected]",
slack="#test-group",
overrideEditable=True,
)
group1.emit(emitter)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from datahub.api.entities.corpgroup.corpgroup import CorpGroup
97 changes: 97 additions & 0 deletions metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Callable, Iterable, Optional, Union, cast

import datahub.emitter.mce_builder as builder
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
ChangeTypeClass,
CorpGroupEditableInfoClass,
CorpGroupInfoClass,
)


@dataclass
class CorpGroup:
"""This is a CorpGroup class which represents a CorpGroup
Args:
id (str): The id of the group
display_name (Optional[str]): The name of the group
email (Optional[str]): email of this group
description (Optional[str]): A description of the group
overrideEditable (bool): If True, group information that is editable in the UI will be overridden
picture_link (Optional[str]): A URL which points to a picture which user wants to set as the photo for the group
slack (Optional[str]): Slack channel for the group
"""

id: str
urn: str = field(init=False)

# These are for CorpGroupInfo
display_name: Optional[str] = None
email: Optional[str] = None
description: Optional[str] = None

# These are for CorpGroupEditableInfo
overrideEditable: bool = False
picture_link: Optional[str] = None
slack: Optional[str] = None

def __post_init__(self):
self.urn = builder.make_group_urn(self.id)

def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
if self.overrideEditable:
mcp = MetadataChangeProposalWrapper(
entityType="corpgroup",
entityUrn=str(self.urn),
aspectName="corpGroupEditableInfo",
aspect=CorpGroupEditableInfoClass(
description=self.description,
pictureLink=self.picture_link,
slack=self.slack,
email=self.email,
),
changeType=ChangeTypeClass.UPSERT,
)
yield mcp

mcp = MetadataChangeProposalWrapper(
entityType="corpgroup",
entityUrn=str(self.urn),
aspectName="corpGroupInfo",
aspect=CorpGroupInfoClass(
admins=[], # Deprecated, replaced by Ownership aspect
members=[], # Deprecated, replaced by GroupMembership aspect
groups=[], # Deprecated, this field is unused
displayName=self.display_name,
email=self.email,
description=self.description,
),
changeType=ChangeTypeClass.UPSERT,
)
yield mcp

def emit(
self,
emitter: Union[DatahubRestEmitter, DatahubKafkaEmitter],
callback: Optional[Callable[[Exception, str], None]] = None,
) -> None:
"""
Emit the CorpGroup entity to Datahub
:param emitter: Datahub Emitter to emit the proccess event
:param callback: The callback method for KafkaEmitter if it is used
"""
for mcp in self.generate_mcp():
if type(emitter).__name__ == "DatahubKafkaEmitter":
assert callback is not None
kafka_emitter = cast("DatahubKafkaEmitter", emitter)
kafka_emitter.emit(mcp, callback)
else:
rest_emitter = cast("DatahubRestEmitter", emitter)
rest_emitter.emit(mcp)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from datahub.api.entities.corpuser.corpuser import CorpUser
109 changes: 109 additions & 0 deletions metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Callable, Iterable, List, Optional, Union, cast

import datahub.emitter.mce_builder as builder
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
ChangeTypeClass,
CorpUserInfoClass,
GroupMembershipClass,
)


@dataclass
class CorpUser:
"""This is a CorpUser class which represents a CorpUser
Args:
id (str): The id of the user
display_name (Optional[str]): The name of the user to display in the UI
email (Optional[str]): email address of this user
title (Optional[str]): title of this user
manager_urn (Optional[str]): direct manager of this user
department_id (Optional[int]): department id this user belongs to
department_name (Optional[str]): department name this user belongs to
first_name (Optional[str]): first name of this user
last_name (Optional[str]): last name of this user
full_name (Optional[str]): Common name of this user, format is firstName + lastName (split by a whitespace)
country_code (Optional[str]): two uppercase letters country code. e.g. US
groups (List[str]): List of group ids the user belongs to
"""

id: str
urn: str = field(init=False)
display_name: Optional[str] = None
email: Optional[str] = None
title: Optional[str] = None
manager_urn: Optional[str] = None
department_id: Optional[int] = None
department_name: Optional[str] = None
first_name: Optional[str] = None
last_name: Optional[str] = None
full_name: Optional[str] = None
country_code: Optional[str] = None
groups: List[str] = field(default_factory=list)

def __post_init__(self):
self.urn = builder.make_user_urn(self.id)

def generate_group_membership_aspect(self) -> Iterable[GroupMembershipClass]:
group_membership = GroupMembershipClass(
groups=[builder.make_group_urn(group) for group in self.groups]
)
return [group_membership]

def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityType="corpuser",
entityUrn=str(self.urn),
aspectName="corpUserInfo",
aspect=CorpUserInfoClass(
active=True, # Deprecated, use CorpUserStatus instead.
displayName=self.display_name,
email=self.email,
title=self.title,
managerUrn=self.manager_urn,
departmentId=self.department_id,
departmentName=self.department_name,
firstName=self.first_name,
lastName=self.last_name,
fullName=self.full_name,
countryCode=self.country_code,
),
changeType=ChangeTypeClass.UPSERT,
)
yield mcp

for group_membership in self.generate_group_membership_aspect():
mcp = MetadataChangeProposalWrapper(
entityType="corpuser",
entityUrn=str(self.urn),
aspectName="groupMembership",
aspect=group_membership,
changeType=ChangeTypeClass.UPSERT,
)
yield mcp

def emit(
self,
emitter: Union[DatahubRestEmitter, DatahubKafkaEmitter],
callback: Optional[Callable[[Exception, str], None]] = None,
) -> None:
"""
Emit the CorpUser entity to Datahub
:param emitter: Datahub Emitter to emit the proccess event
:param callback: The callback method for KafkaEmitter if it is used
"""
for mcp in self.generate_mcp():
if type(emitter).__name__ == "DatahubKafkaEmitter":
assert callback is not None
kafka_emitter = cast("DatahubKafkaEmitter", emitter)
kafka_emitter.emit(mcp, callback)
else:
rest_emitter = cast("DatahubRestEmitter", emitter)
rest_emitter.emit(mcp)
10 changes: 8 additions & 2 deletions metadata-ingestion/src/datahub/api/entities/datajob/datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class DataJob:
parent_instance (Optional[DataProcessInstanceUrn]): The parent execution's urn if applicable
properties Dict[str, str]: Custom properties to set for the DataProcessInstance
url (Optional[str]): Url which points to the DataJob at the orchestrator
owners Set[str]): A list of user ids that own this job.
group_owners Set[str]): A list of group ids that own this job.
inlets (List[str]): List of urns the DataProcessInstance consumes
outlets (List[str]): List of urns the DataProcessInstance produces
input_datajob_urns: List[DataJobUrn] = field(default_factory=list)
Expand All @@ -65,6 +67,7 @@ class DataJob:
url: Optional[str] = None
tags: Set[str] = field(default_factory=set)
owners: Set[str] = field(default_factory=set)
group_owners: Set[str] = field(default_factory=set)
inlets: List[DatasetUrn] = field(default_factory=list)
outlets: List[DatasetUrn] = field(default_factory=list)
upstream_urns: List[DataJobUrn] = field(default_factory=list)
Expand All @@ -80,17 +83,20 @@ def __post_init__(self):
)

def generate_ownership_aspect(self) -> Iterable[OwnershipClass]:
owners = set([builder.make_user_urn(owner) for owner in self.owners]) | set(
[builder.make_group_urn(owner) for owner in self.group_owners]
)
ownership = OwnershipClass(
owners=[
OwnerClass(
owner=builder.make_user_urn(owner),
owner=urn,
type=OwnershipTypeClass.DEVELOPER,
source=OwnershipSourceClass(
type=OwnershipSourceTypeClass.SERVICE,
# url=dag.filepath,
),
)
for owner in (self.owners or [])
for urn in (owners or [])
],
lastModified=AuditStampClass(
time=0,
Expand Down

0 comments on commit 5388808

Please sign in to comment.