Skip to content

Commit

Permalink
fix(Siblings): Have sibling hook use entity client (datahub-project#5279
Browse files Browse the repository at this point in the history
)

* fixing dbt platform issues

* have sibling hook use entity client over entity service

* switching search service as well

* lint

* more lint

* more specific exceptions
  • Loading branch information
gabe-lyons authored and maggiehays committed Aug 1, 2022
1 parent 0f0610f commit e5ef8bb
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.linkedin.metadata.kafka.hook.siblings;

import com.datahub.authentication.Authentication;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.Siblings;
import com.linkedin.common.SubTypes;
Expand All @@ -10,21 +12,25 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.dataset.UpstreamArray;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.gms.factory.entity.EntityServiceFactory;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
import com.linkedin.gms.factory.entity.RestliEntityClientFactory;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.search.SearchServiceFactory;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.gms.factory.search.EntitySearchServiceFactory;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.r2.RemoteInvocationException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -53,26 +59,29 @@
@Slf4j
@Component
@Singleton
@Import({EntityRegistryFactory.class, EntityServiceFactory.class, SearchServiceFactory.class})
@Import({EntityRegistryFactory.class, RestliEntityClientFactory.class, EntitySearchServiceFactory.class, SystemAuthenticationFactory.class})
public class SiblingAssociationHook implements MetadataChangeLogHook {

public static final String SIBLING_ASSOCIATION_SYSTEM_ACTOR = "urn:li:corpuser:__datahub_system_sibling_hook";
public static final String DBT_PLATFORM_NAME = "dbt";
public static final String SOURCE_SUBTYPE = "source";

private final EntityRegistry _entityRegistry;
private final EntityService _entityService;
private final SearchService _searchService;
private final RestliEntityClient _entityClient;
private final EntitySearchService _searchService;
private final Authentication _systemAuthentication;

@Autowired
public SiblingAssociationHook(
@Nonnull final EntityRegistry entityRegistry,
@Nonnull final EntityService entityService,
@Nonnull final SearchService searchService
@Nonnull final RestliEntityClient entityClient,
@Nonnull final EntitySearchService searchService,
@Nonnull final Authentication systemAuthentication
) {
_entityRegistry = entityRegistry;
_entityService = entityService;
_entityClient = entityClient;
_searchService = searchService;
_systemAuthentication = systemAuthentication;
}

@Value("${siblings.enabled:false}")
Expand Down Expand Up @@ -123,8 +132,7 @@ private void handleEntityKeyEvent(DatasetUrn datasetUrn) {
entitiesWithYouAsSiblingFilter,
null,
0,
10,
null);
10);

// we have a match of an entity with you as a sibling, associate yourself back
searchResult.getEntities().forEach(entity -> {
Expand All @@ -146,21 +154,12 @@ private void handleDbtDatasetEvent(MetadataChangeLog event, DatasetUrn datasetUr

if (event.getAspectName().equals(UPSTREAM_LINEAGE_ASPECT_NAME)) {
upstreamLineage = getUpstreamLineageFromEvent(event);
subTypesAspectOfEntity =
(SubTypes) _entityService.getLatestAspect(
datasetUrn,
SUB_TYPES_ASPECT_NAME
);

subTypesAspectOfEntity = getSubtypesFromEntityClient(datasetUrn);
}

if (event.getAspectName().equals(SUB_TYPES_ASPECT_NAME)) {
subTypesAspectOfEntity = getSubtypesFromEvent(event);
upstreamLineage =
(UpstreamLineage) _entityService.getLatestAspect(
datasetUrn,
UPSTREAM_LINEAGE_ASPECT_NAME
);
upstreamLineage = getUpstreamLineageFromEntityClient(datasetUrn);
}

if (
Expand Down Expand Up @@ -195,10 +194,8 @@ private void handleSourceDatasetEvent(MetadataChangeLog event, DatasetUrn source
}

private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) {
Siblings existingDbtSiblingAspect =
(Siblings) _entityService.getLatestAspect(dbtUrn, SIBLINGS_ASPECT_NAME);
Siblings existingSourceSiblingAspect =
(Siblings) _entityService.getLatestAspect(sourceUrn, SIBLINGS_ASPECT_NAME);
Siblings existingDbtSiblingAspect = getSiblingsFromEntityClient(dbtUrn);
Siblings existingSourceSiblingAspect = getSiblingsFromEntityClient(sourceUrn);

log.info("Associating {} and {} as siblings.", dbtUrn.toString(), sourceUrn.toString());

Expand Down Expand Up @@ -228,7 +225,12 @@ private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) {
dbtSiblingProposal.setChangeType(ChangeType.UPSERT);
dbtSiblingProposal.setEntityUrn(dbtUrn);

_entityService.ingestProposal(dbtSiblingProposal, auditStamp);
try {
_entityClient.ingestProposal(dbtSiblingProposal, _systemAuthentication);
} catch (RemoteInvocationException e) {
log.error("Error while associating {} with {}: {}", dbtUrn.toString(), sourceUrn.toString(), e.toString());
throw new RuntimeException("Error ingesting sibling proposal. Skipping processing.", e);
}

// set dbt as a sibling of source

Expand All @@ -245,7 +247,14 @@ private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) {

// clean up any references to stale siblings that have been deleted
List<Urn> filteredNewSiblingsArray =
newSiblingsUrnArray.stream().filter(urn -> _entityService.exists(urn)).collect(Collectors.toList());
newSiblingsUrnArray.stream().filter(urn -> {
try {
return _entityClient.exists(urn, _systemAuthentication);
} catch (RemoteInvocationException e) {
log.error("Error while checking existence of {}: {}", urn.toString(), e.toString());
throw new RuntimeException("Error checking existence. Skipping processing.", e);
}
}).collect(Collectors.toList());

sourceSiblingAspect.setSiblings(new UrnArray(filteredNewSiblingsArray));
sourceSiblingAspect.setPrimary(false);
Expand All @@ -259,7 +268,12 @@ private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) {
sourceSiblingProposal.setChangeType(ChangeType.UPSERT);
sourceSiblingProposal.setEntityUrn(sourceUrn);

_entityService.ingestProposal(sourceSiblingProposal, auditStamp);
try {
_entityClient.ingestProposal(sourceSiblingProposal, _systemAuthentication);
} catch (RemoteInvocationException e) {
log.error("Error while associating {} with {}: {}", dbtUrn.toString(), sourceUrn.toString(), e.toString());
throw new RuntimeException("Error ingesting sibling proposal. Skipping processing.", e);
}
}


Expand Down Expand Up @@ -362,4 +376,67 @@ private Filter createFilterForEntitiesWithYouAsSibling(
return filter;
}

private SubTypes getSubtypesFromEntityClient(
final Urn urn
) {
try {
EntityResponse entityResponse = _entityClient.getV2(
DATASET_ENTITY_NAME,
urn,
ImmutableSet.of(SUB_TYPES_ASPECT_NAME),
_systemAuthentication
);

if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.SUB_TYPES_ASPECT_NAME)) {
return new SubTypes(entityResponse.getAspects().get(Constants.SUB_TYPES_ASPECT_NAME).getValue().data());
} else {
return null;
}
} catch (RemoteInvocationException | URISyntaxException e) {
throw new RuntimeException("Failed to retrieve Subtypes", e);
}
}

private UpstreamLineage getUpstreamLineageFromEntityClient(
final Urn urn
) {
try {
EntityResponse entityResponse = _entityClient.getV2(
DATASET_ENTITY_NAME,
urn,
ImmutableSet.of(UPSTREAM_LINEAGE_ASPECT_NAME),
_systemAuthentication
);

if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) {
return new UpstreamLineage(entityResponse.getAspects().get(Constants.UPSTREAM_LINEAGE_ASPECT_NAME).getValue().data());
} else {
return null;
}
} catch (RemoteInvocationException | URISyntaxException e) {
throw new RuntimeException("Failed to retrieve UpstreamLineage", e);
}
}

private Siblings getSiblingsFromEntityClient(
final Urn urn
) {
try {
EntityResponse entityResponse = _entityClient.getV2(
DATASET_ENTITY_NAME,
urn,
ImmutableSet.of(SIBLINGS_ASPECT_NAME),
_systemAuthentication
);

if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.SIBLINGS_ASPECT_NAME)) {
return new Siblings(entityResponse.getAspects().get(Constants.SIBLINGS_ASPECT_NAME).getValue().data());
} else {
return null;
}
} catch (RemoteInvocationException | URISyntaxException e) {
throw new RuntimeException("Failed to retrieve UpstreamLineage", e);
}
}

}
Loading

0 comments on commit e5ef8bb

Please sign in to comment.