Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add CorpUser and CorpGroup to the Python SDK #5930

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import time
import uuid

from datahub.api.entities.corpgroup.corpgroup import CorpGroup
from datahub.api.entities.corpuser.corpuser import CorpUser
from datahub.api.entities.datajob.dataflow import DataFlow
from datahub.api.entities.datajob.datajob import DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
Expand All @@ -22,7 +24,7 @@
dataJob2 = DataJob(flow_urn=jobFlow.urn, id="job2", name="My Job 2")
dataJob2.upstream_urns.append(dataJob.urn)
dataJob2.tags.add("TestTag")
dataJob2.owners.add("[email protected]")
dataJob2.owners.add("testUser")
dataJob2.emit(emitter)

dataJob3 = DataJob(flow_urn=jobFlow.urn, id="job3", name="My Job 3")
Expand All @@ -32,6 +34,7 @@
dataJob4 = DataJob(flow_urn=jobFlow.urn, id="job4", name="My Job 4")
dataJob4.upstream_urns.append(dataJob2.urn)
dataJob4.upstream_urns.append(dataJob3.urn)
dataJob4.group_owners.add("testGroup")
dataJob4.emit(emitter)

# Hello World
Expand Down Expand Up @@ -105,3 +108,20 @@
end_timestamp_millis=int(time.time() * 1000),
result=InstanceRunResult.SUCCESS,
)

user1 = CorpUser(
id="testUser",
display_name="Test User",
email="[email protected]",
groups=["testGroup"],
)
user1.emit(emitter)

group1 = CorpGroup(
id="testGroup",
display_name="Test Group",
email="[email protected]",
slack="#test-group",
overrideEditable=True,
)
group1.emit(emitter)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from datahub.api.entities.corpgroup.corpgroup import CorpGroup
97 changes: 97 additions & 0 deletions metadata-ingestion/src/datahub/api/entities/corpgroup/corpgroup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Callable, Iterable, Optional, Union, cast

import datahub.emitter.mce_builder as builder
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
ChangeTypeClass,
CorpGroupEditableInfoClass,
CorpGroupInfoClass,
)


@dataclass
class CorpGroup:
"""This is a CorpGroup class which represents a CorpGroup

Args:
id (str): The id of the group
display_name (Optional[str]): The name of the group
email (Optional[str]): email of this group
description (Optional[str]): A description of the group
overrideEditable (bool): If True, group information that is editable in the UI will be overridden
picture_link (Optional[str]): A URL which points to a picture which user wants to set as the photo for the group
slack (Optional[str]): Slack channel for the group
"""

id: str
urn: str = field(init=False)

# These are for CorpGroupInfo
display_name: Optional[str] = None
email: Optional[str] = None
description: Optional[str] = None

# These are for CorpGroupEditableInfo
overrideEditable: bool = False
picture_link: Optional[str] = None
slack: Optional[str] = None

def __post_init__(self):
self.urn = builder.make_group_urn(self.id)

def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
if self.overrideEditable:
mcp = MetadataChangeProposalWrapper(
entityType="corpgroup",
entityUrn=str(self.urn),
aspectName="corpGroupEditableInfo",
aspect=CorpGroupEditableInfoClass(
description=self.description,
pictureLink=self.picture_link,
slack=self.slack,
email=self.email,
),
changeType=ChangeTypeClass.UPSERT,
)
yield mcp

mcp = MetadataChangeProposalWrapper(
entityType="corpgroup",
entityUrn=str(self.urn),
aspectName="corpGroupInfo",
aspect=CorpGroupInfoClass(
admins=[], # Deprecated, replaced by Ownership aspect
members=[], # Deprecated, replaced by GroupMembership aspect
groups=[], # Deprecated, this field is unused
displayName=self.display_name,
email=self.email,
description=self.description,
),
changeType=ChangeTypeClass.UPSERT,
)
yield mcp

def emit(
self,
emitter: Union[DatahubRestEmitter, DatahubKafkaEmitter],
callback: Optional[Callable[[Exception, str], None]] = None,
) -> None:
"""
Emit the CorpGroup entity to Datahub

:param emitter: Datahub Emitter to emit the proccess event
:param callback: The callback method for KafkaEmitter if it is used
"""
for mcp in self.generate_mcp():
if type(emitter).__name__ == "DatahubKafkaEmitter":
assert callback is not None
kafka_emitter = cast("DatahubKafkaEmitter", emitter)
kafka_emitter.emit(mcp, callback)
else:
rest_emitter = cast("DatahubRestEmitter", emitter)
rest_emitter.emit(mcp)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from datahub.api.entities.corpuser.corpuser import CorpUser
109 changes: 109 additions & 0 deletions metadata-ingestion/src/datahub/api/entities/corpuser/corpuser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Callable, Iterable, List, Optional, Union, cast

import datahub.emitter.mce_builder as builder
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
ChangeTypeClass,
CorpUserInfoClass,
GroupMembershipClass,
)


@dataclass
class CorpUser:
"""This is a CorpUser class which represents a CorpUser

Args:
id (str): The id of the user
display_name (Optional[str]): The name of the user to display in the UI
email (Optional[str]): email address of this user
title (Optional[str]): title of this user
manager_urn (Optional[str]): direct manager of this user
department_id (Optional[int]): department id this user belongs to
department_name (Optional[str]): department name this user belongs to
first_name (Optional[str]): first name of this user
last_name (Optional[str]): last name of this user
full_name (Optional[str]): Common name of this user, format is firstName + lastName (split by a whitespace)
country_code (Optional[str]): two uppercase letters country code. e.g. US
groups (List[str]): List of group ids the user belongs to
"""

id: str
urn: str = field(init=False)
display_name: Optional[str] = None
email: Optional[str] = None
title: Optional[str] = None
manager_urn: Optional[str] = None
department_id: Optional[int] = None
department_name: Optional[str] = None
first_name: Optional[str] = None
last_name: Optional[str] = None
full_name: Optional[str] = None
country_code: Optional[str] = None
groups: List[str] = field(default_factory=list)

def __post_init__(self):
self.urn = builder.make_user_urn(self.id)

def generate_group_membership_aspect(self) -> Iterable[GroupMembershipClass]:
group_membership = GroupMembershipClass(
groups=[builder.make_group_urn(group) for group in self.groups]
)
return [group_membership]

def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]:
mcp = MetadataChangeProposalWrapper(
entityType="corpuser",
entityUrn=str(self.urn),
aspectName="corpUserInfo",
aspect=CorpUserInfoClass(
active=True, # Deprecated, use CorpUserStatus instead.
displayName=self.display_name,
email=self.email,
title=self.title,
managerUrn=self.manager_urn,
departmentId=self.department_id,
departmentName=self.department_name,
firstName=self.first_name,
lastName=self.last_name,
fullName=self.full_name,
countryCode=self.country_code,
),
changeType=ChangeTypeClass.UPSERT,
)
yield mcp

for group_membership in self.generate_group_membership_aspect():
mcp = MetadataChangeProposalWrapper(
entityType="corpuser",
entityUrn=str(self.urn),
aspectName="groupMembership",
aspect=group_membership,
changeType=ChangeTypeClass.UPSERT,
)
yield mcp

def emit(
self,
emitter: Union[DatahubRestEmitter, DatahubKafkaEmitter],
callback: Optional[Callable[[Exception, str], None]] = None,
) -> None:
"""
Emit the CorpUser entity to Datahub

:param emitter: Datahub Emitter to emit the proccess event
:param callback: The callback method for KafkaEmitter if it is used
"""
for mcp in self.generate_mcp():
if type(emitter).__name__ == "DatahubKafkaEmitter":
assert callback is not None
kafka_emitter = cast("DatahubKafkaEmitter", emitter)
kafka_emitter.emit(mcp, callback)
else:
rest_emitter = cast("DatahubRestEmitter", emitter)
rest_emitter.emit(mcp)
10 changes: 8 additions & 2 deletions metadata-ingestion/src/datahub/api/entities/datajob/datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class DataJob:
parent_instance (Optional[DataProcessInstanceUrn]): The parent execution's urn if applicable
properties Dict[str, str]: Custom properties to set for the DataProcessInstance
url (Optional[str]): Url which points to the DataJob at the orchestrator
owners Set[str]): A list of user ids that own this job.
group_owners Set[str]): A list of group ids that own this job.
inlets (List[str]): List of urns the DataProcessInstance consumes
outlets (List[str]): List of urns the DataProcessInstance produces
input_datajob_urns: List[DataJobUrn] = field(default_factory=list)
Expand All @@ -65,6 +67,7 @@ class DataJob:
url: Optional[str] = None
tags: Set[str] = field(default_factory=set)
owners: Set[str] = field(default_factory=set)
group_owners: Set[str] = field(default_factory=set)
inlets: List[DatasetUrn] = field(default_factory=list)
outlets: List[DatasetUrn] = field(default_factory=list)
upstream_urns: List[DataJobUrn] = field(default_factory=list)
Expand All @@ -80,17 +83,20 @@ def __post_init__(self):
)

def generate_ownership_aspect(self) -> Iterable[OwnershipClass]:
owners = set([builder.make_user_urn(owner) for owner in self.owners]) | set(
[builder.make_group_urn(owner) for owner in self.group_owners]
)
ownership = OwnershipClass(
owners=[
OwnerClass(
owner=builder.make_user_urn(owner),
owner=urn,
type=OwnershipTypeClass.DEVELOPER,
source=OwnershipSourceClass(
type=OwnershipSourceTypeClass.SERVICE,
# url=dag.filepath,
),
)
for owner in (self.owners or [])
for urn in (owners or [])
],
lastModified=AuditStampClass(
time=0,
Expand Down