Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Siblings): Have sibling hook use entity client #5279

Merged
Merged
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.linkedin.metadata.kafka.hook.siblings;

import com.datahub.authentication.Authentication;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.Siblings;
import com.linkedin.common.SubTypes;
Expand All @@ -10,21 +12,25 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.dataset.UpstreamArray;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.gms.factory.entity.EntityServiceFactory;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
import com.linkedin.gms.factory.entity.RestliEntityClientFactory;
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
import com.linkedin.gms.factory.search.SearchServiceFactory;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.gms.factory.search.EntitySearchServiceFactory;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.r2.RemoteInvocationException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -53,26 +59,29 @@
@Slf4j
@Component
@Singleton
@Import({EntityRegistryFactory.class, EntityServiceFactory.class, SearchServiceFactory.class})
@Import({EntityRegistryFactory.class, RestliEntityClientFactory.class, EntitySearchServiceFactory.class, SystemAuthenticationFactory.class})
public class SiblingAssociationHook implements MetadataChangeLogHook {

public static final String SIBLING_ASSOCIATION_SYSTEM_ACTOR = "urn:li:corpuser:__datahub_system_sibling_hook";
public static final String DBT_PLATFORM_NAME = "dbt";
public static final String SOURCE_SUBTYPE = "source";

private final EntityRegistry _entityRegistry;
private final EntityService _entityService;
private final SearchService _searchService;
private final RestliEntityClient _entityClient;
private final EntitySearchService _searchService;
private final Authentication _systemAuthentication;

@Autowired
public SiblingAssociationHook(
@Nonnull final EntityRegistry entityRegistry,
@Nonnull final EntityService entityService,
@Nonnull final SearchService searchService
@Nonnull final RestliEntityClient entityClient,
@Nonnull final EntitySearchService searchService,
@Nonnull final Authentication systemAuthentication
) {
_entityRegistry = entityRegistry;
_entityService = entityService;
_entityClient = entityClient;
_searchService = searchService;
_systemAuthentication = systemAuthentication;
}

@Value("${siblings.enabled:false}")
Expand Down Expand Up @@ -123,8 +132,7 @@ private void handleEntityKeyEvent(DatasetUrn datasetUrn) {
entitiesWithYouAsSiblingFilter,
null,
0,
10,
null);
10);

// we have a match of an entity with you as a sibling, associate yourself back
searchResult.getEntities().forEach(entity -> {
Expand All @@ -146,21 +154,12 @@ private void handleDbtDatasetEvent(MetadataChangeLog event, DatasetUrn datasetUr

if (event.getAspectName().equals(UPSTREAM_LINEAGE_ASPECT_NAME)) {
upstreamLineage = getUpstreamLineageFromEvent(event);
subTypesAspectOfEntity =
(SubTypes) _entityService.getLatestAspect(
datasetUrn,
SUB_TYPES_ASPECT_NAME
);

subTypesAspectOfEntity = getSubtypesFromEntityClient(datasetUrn);
}

if (event.getAspectName().equals(SUB_TYPES_ASPECT_NAME)) {
subTypesAspectOfEntity = getSubtypesFromEvent(event);
upstreamLineage =
(UpstreamLineage) _entityService.getLatestAspect(
datasetUrn,
UPSTREAM_LINEAGE_ASPECT_NAME
);
upstreamLineage = getUpstreamLineageFromEntityClient(datasetUrn);
}

if (
Expand Down Expand Up @@ -195,10 +194,8 @@ private void handleSourceDatasetEvent(MetadataChangeLog event, DatasetUrn source
}

private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) {
Siblings existingDbtSiblingAspect =
(Siblings) _entityService.getLatestAspect(dbtUrn, SIBLINGS_ASPECT_NAME);
Siblings existingSourceSiblingAspect =
(Siblings) _entityService.getLatestAspect(sourceUrn, SIBLINGS_ASPECT_NAME);
Siblings existingDbtSiblingAspect = getSiblingsFromEntityClient(dbtUrn);
Siblings existingSourceSiblingAspect = getSiblingsFromEntityClient(sourceUrn);

log.info("Associating {} and {} as siblings.", dbtUrn.toString(), sourceUrn.toString());

Expand Down Expand Up @@ -228,7 +225,12 @@ private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) {
dbtSiblingProposal.setChangeType(ChangeType.UPSERT);
dbtSiblingProposal.setEntityUrn(dbtUrn);

_entityService.ingestProposal(dbtSiblingProposal, auditStamp);
try {
_entityClient.ingestProposal(dbtSiblingProposal, _systemAuthentication);
} catch (RemoteInvocationException e) {
log.error("Error while associating {} with {}: {}", dbtUrn.toString(), sourceUrn.toString(), e.toString());
throw new RuntimeException("Error ingesting sibling proposal. Skipping processing.", e);
}

// set dbt as a sibling of source

Expand All @@ -245,7 +247,14 @@ private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) {

// clean up any references to stale siblings that have been deleted
List<Urn> filteredNewSiblingsArray =
newSiblingsUrnArray.stream().filter(urn -> _entityService.exists(urn)).collect(Collectors.toList());
newSiblingsUrnArray.stream().filter(urn -> {
try {
return _entityClient.exists(urn, _systemAuthentication);
} catch (RemoteInvocationException e) {
log.error("Error while checking existence of {}: {}", urn.toString(), e.toString());
throw new RuntimeException("Error checking existence. Skipping processing.", e);
}
}).collect(Collectors.toList());

sourceSiblingAspect.setSiblings(new UrnArray(filteredNewSiblingsArray));
sourceSiblingAspect.setPrimary(false);
Expand All @@ -259,7 +268,12 @@ private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) {
sourceSiblingProposal.setChangeType(ChangeType.UPSERT);
sourceSiblingProposal.setEntityUrn(sourceUrn);

_entityService.ingestProposal(sourceSiblingProposal, auditStamp);
try {
_entityClient.ingestProposal(sourceSiblingProposal, _systemAuthentication);
} catch (RemoteInvocationException e) {
log.error("Error while associating {} with {}: {}", dbtUrn.toString(), sourceUrn.toString(), e.toString());
throw new RuntimeException("Error ingesting sibling proposal. Skipping processing.", e);
}
}


Expand Down Expand Up @@ -362,4 +376,67 @@ private Filter createFilterForEntitiesWithYouAsSibling(
return filter;
}

private SubTypes getSubtypesFromEntityClient(
final Urn urn
) {
try {
EntityResponse entityResponse = _entityClient.getV2(
DATASET_ENTITY_NAME,
urn,
ImmutableSet.of(SUB_TYPES_ASPECT_NAME),
_systemAuthentication
);

if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.SUB_TYPES_ASPECT_NAME)) {
return new SubTypes(entityResponse.getAspects().get(Constants.SUB_TYPES_ASPECT_NAME).getValue().data());
} else {
return null;
}
} catch (RemoteInvocationException | URISyntaxException e) {
throw new RuntimeException("Failed to retrieve Subtypes", e);
}
}

private UpstreamLineage getUpstreamLineageFromEntityClient(
final Urn urn
) {
try {
EntityResponse entityResponse = _entityClient.getV2(
DATASET_ENTITY_NAME,
urn,
ImmutableSet.of(UPSTREAM_LINEAGE_ASPECT_NAME),
_systemAuthentication
);

if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) {
return new UpstreamLineage(entityResponse.getAspects().get(Constants.UPSTREAM_LINEAGE_ASPECT_NAME).getValue().data());
} else {
return null;
}
} catch (RemoteInvocationException | URISyntaxException e) {
throw new RuntimeException("Failed to retrieve UpstreamLineage", e);
}
}

private Siblings getSiblingsFromEntityClient(
final Urn urn
) {
try {
EntityResponse entityResponse = _entityClient.getV2(
DATASET_ENTITY_NAME,
urn,
ImmutableSet.of(SIBLINGS_ASPECT_NAME),
_systemAuthentication
);

if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.SIBLINGS_ASPECT_NAME)) {
return new Siblings(entityResponse.getAspects().get(Constants.SIBLINGS_ASPECT_NAME).getValue().data());
} else {
return null;
}
} catch (RemoteInvocationException | URISyntaxException e) {
throw new RuntimeException("Failed to retrieve UpstreamLineage", e);
}
}

}
Loading