Skip to content

Commit

Permalink
feat(entity-service): fallback logic for aspect version (#11304)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Sep 10, 2024
1 parent 852a23b commit fc92d23
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public AspectsBatchImplBuilder mcps(
mcp, auditStamp, retrieverContext.getAspectRetriever());
}
} catch (IllegalArgumentException e) {
log.error("Invalid proposal, skipping and proceeding with batch: " + mcp, e);
log.error("Invalid proposal, skipping and proceeding with batch: {}", mcp, e);
return null;
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import io.ebean.Transaction;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -27,10 +25,10 @@
* aspect is set to 0 for efficient retrieval. In most cases only the latest state of an aspect will
* be fetched. See {@link EntityServiceImpl} for more details.
*
* <p>TODO: This interface exposes {@link #runInTransactionWithRetry(Supplier, int)} because {@link
* EntityServiceImpl} concerns itself with batching multiple commands into a single transaction. It
* exposes storage concerns somewhat and it'd be worth looking into ways to move this responsibility
* inside {@link AspectDao} implementations.
* <p>TODO: This interface exposes {@link #runInTransactionWithRetry(Function, int)}
* (TransactionContext)} because {@link EntityServiceImpl} concerns itself with batching multiple
* commands into a single transaction. It exposes storage concerns somewhat and it'd be worth
* looking into ways to move this responsibility inside {@link AspectDao} implementations.
*/
public interface AspectDao {
String ASPECT_WRITE_COUNT_METRIC_NAME = "aspectWriteCount";
Expand Down Expand Up @@ -77,7 +75,7 @@ Map<String, Map<String, EntityAspect>> getLatestAspects(
Map<String, Set<String>> urnAspects, boolean forUpdate);

void saveAspect(
@Nullable Transaction tx,
@Nullable TransactionContext txContext,
@Nonnull final String urn,
@Nonnull final String aspectName,
@Nonnull final String aspectMetadata,
Expand All @@ -89,10 +87,12 @@ void saveAspect(
final boolean insert);

void saveAspect(
@Nullable Transaction tx, @Nonnull final EntityAspect aspect, final boolean insert);
@Nullable TransactionContext txContext,
@Nonnull final EntityAspect aspect,
final boolean insert);

long saveLatestAspect(
@Nullable Transaction tx,
@Nullable TransactionContext txContext,
@Nonnull final String urn,
@Nonnull final String aspectName,
@Nullable final String oldAspectMetadata,
Expand All @@ -107,7 +107,7 @@ long saveLatestAspect(
@Nullable final String newSystemMetadata,
final Long nextVersion);

void deleteAspect(@Nullable Transaction tx, @Nonnull final EntityAspect aspect);
void deleteAspect(@Nullable TransactionContext txContext, @Nonnull final EntityAspect aspect);

@Nonnull
ListResult<String> listUrns(
Expand All @@ -125,7 +125,7 @@ ListResult<String> listUrns(
@Nonnull
Stream<EntityAspect> streamAspects(String entityName, String aspectName);

int deleteUrn(@Nullable Transaction tx, @Nonnull final String urn);
int deleteUrn(@Nullable TransactionContext txContext, @Nonnull final String urn);

@Nonnull
ListResult<String> listLatestAspectMetadata(
Expand Down Expand Up @@ -159,11 +159,11 @@ default Map<String, Long> getNextVersions(

@Nonnull
<T> T runInTransactionWithRetry(
@Nonnull final Function<Transaction, T> block, final int maxTransactionRetry);
@Nonnull final Function<TransactionContext, T> block, final int maxTransactionRetry);

@Nonnull
default <T> List<T> runInTransactionWithRetry(
@Nonnull final Function<Transaction, T> block,
@Nonnull final Function<TransactionContext, T> block,
AspectsBatch batch,
final int maxTransactionRetry) {
return List.of(runInTransactionWithRetry(block, maxTransactionRetry));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.SYSTEM_ACTOR;
import static com.linkedin.metadata.Constants.UI_SOURCE;
import static com.linkedin.metadata.entity.TransactionContext.DEFAULT_MAX_TRANSACTION_RETRY;
import static com.linkedin.metadata.utils.PegasusUtils.constructMCL;
import static com.linkedin.metadata.utils.PegasusUtils.getDataTemplateClassFromSchema;
import static com.linkedin.metadata.utils.PegasusUtils.urnToEntityName;
Expand Down Expand Up @@ -79,7 +80,6 @@
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import io.ebean.Transaction;
import io.opentelemetry.extension.annotations.WithSpan;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -146,8 +146,6 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
* As described above, the latest version of an aspect should <b>always</b> take the value 0, with
* monotonically increasing version incrementing as usual once the latest version is replaced.
*/
private static final int DEFAULT_MAX_TRANSACTION_RETRY = 3;

protected final AspectDao aspectDao;

@VisibleForTesting @Getter private final EventProducer producer;
Expand Down Expand Up @@ -837,7 +835,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(

return aspectDao
.runInTransactionWithRetry(
(tx) -> {
(txContext) -> {
// Generate default aspects within the transaction (they are re-calculated on retry)
AspectsBatch batchWithDefaults =
DefaultAspectsUtil.withAdditionalChanges(
Expand All @@ -852,7 +850,8 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
aspectDao.getLatestAspects(urnAspects, true));
// read #2 (potentially)
final Map<String, Map<String, Long>> nextVersions =
EntityUtils.calculateNextVersions(aspectDao, latestAspects, urnAspects);
EntityUtils.calculateNextVersions(
txContext, aspectDao, latestAspects, urnAspects);

// 1. Convert patches to full upserts
// 2. Run any entity/aspect level hooks
Expand All @@ -872,7 +871,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(

Map<String, Map<String, Long>> newNextVersions =
EntityUtils.calculateNextVersions(
aspectDao, updatedLatestAspects, updatedItems.getFirst());
txContext, aspectDao, updatedLatestAspects, updatedItems.getFirst());
// merge
updatedNextVersions = AspectsBatch.merge(nextVersions, newNextVersions);
} else {
Expand Down Expand Up @@ -939,7 +938,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
if (overwrite || latest == null) {
result =
ingestAspectToLocalDB(
tx,
txContext,
item.getUrn(),
item.getAspectName(),
item.getRecordTemplate(),
Expand Down Expand Up @@ -973,8 +972,8 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
.collect(Collectors.toList());

// commit upserts prior to retention or kafka send, if supported by impl
if (tx != null) {
tx.commitAndContinue();
if (txContext != null) {
txContext.commitAndContinue();
}
long took = ingestToLocalDBTimer.stop();
log.info(
Expand Down Expand Up @@ -2209,7 +2208,7 @@ private RollbackResult deleteAspectWithoutMCL(

final RollbackResult result =
aspectDao.runInTransactionWithRetry(
(tx) -> {
(txContext) -> {
Integer additionalRowsDeleted = 0;

// 1. Fetch the latest existing version of the aspect.
Expand Down Expand Up @@ -2282,7 +2281,7 @@ private RollbackResult deleteAspectWithoutMCL(
}

// 5. Apply deletes and fix up latest row
aspectsToDelete.forEach(aspect -> aspectDao.deleteAspect(tx, aspect));
aspectsToDelete.forEach(aspect -> aspectDao.deleteAspect(txContext, aspect));

if (survivingAspect != null) {
// if there was a surviving aspect, copy its information into the latest row
Expand All @@ -2300,16 +2299,16 @@ private RollbackResult deleteAspectWithoutMCL(
latest
.getEntityAspect()
.setCreatedFor(survivingAspect.getEntityAspect().getCreatedFor());
aspectDao.saveAspect(tx, latest.getEntityAspect(), false);
aspectDao.saveAspect(txContext, latest.getEntityAspect(), false);
// metrics
aspectDao.incrementWriteMetrics(
aspectName, 1, latest.getMetadataRaw().getBytes(StandardCharsets.UTF_8).length);
aspectDao.deleteAspect(tx, survivingAspect.getEntityAspect());
aspectDao.deleteAspect(txContext, survivingAspect.getEntityAspect());
} else {
if (isKeyAspect) {
if (hardDelete) {
// If this is the key aspect, delete the entity entirely.
additionalRowsDeleted = aspectDao.deleteUrn(tx, urn);
additionalRowsDeleted = aspectDao.deleteUrn(txContext, urn);
} else if (deleteItem.getEntitySpec().hasAspect(Constants.STATUS_ASPECT_NAME)) {
// soft delete by setting status.removed=true (if applicable)
final Status statusAspect = new Status();
Expand All @@ -2326,7 +2325,7 @@ private RollbackResult deleteAspectWithoutMCL(
}
} else {
// Else, only delete the specific aspect.
aspectDao.deleteAspect(tx, latest.getEntityAspect());
aspectDao.deleteAspect(txContext, latest.getEntityAspect());
}
}

Expand Down Expand Up @@ -2466,7 +2465,7 @@ private Map<EntityAspectIdentifier, EnvelopedAspect> getEnvelopedAspects(

@Nonnull
private UpdateAspectResult ingestAspectToLocalDB(
@Nullable Transaction tx,
@Nullable TransactionContext txContext,
@Nonnull final Urn urn,
@Nonnull final String aspectName,
@Nonnull final RecordTemplate newValue,
Expand Down Expand Up @@ -2495,7 +2494,7 @@ private UpdateAspectResult ingestAspectToLocalDB(
latest.getEntityAspect().setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata));

log.info("Ingesting aspect with name {}, urn {}", aspectName, urn);
aspectDao.saveAspect(tx, latest.getEntityAspect(), false);
aspectDao.saveAspect(txContext, latest.getEntityAspect(), false);

// metrics
aspectDao.incrementWriteMetrics(
Expand All @@ -2518,7 +2517,7 @@ private UpdateAspectResult ingestAspectToLocalDB(
String newValueStr = EntityApiUtils.toJsonAspect(newValue);
long versionOfOld =
aspectDao.saveLatestAspect(
tx,
txContext,
urn.toString(),
aspectName,
latest == null ? null : EntityApiUtils.toJsonAspect(oldValue),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,38 +285,51 @@ public static List<SystemAspect> toSystemAspects(
* Use the precalculated next version from system metadata if it exists, otherwise lookup the next
* version the normal way from the database
*
* @param txContext
* @param aspectDao database access
* @param latestAspects aspect version 0 with system metadata
* @param urnAspects urn/aspects which we need next version information for
* @return map of the urn/aspect to the next aspect version
*/
public static Map<String, Map<String, Long>> calculateNextVersions(
TransactionContext txContext,
AspectDao aspectDao,
Map<String, Map<String, SystemAspect>> latestAspects,
Map<String, Set<String>> urnAspects) {
Map<String, Map<String, Long>> precalculatedVersions =
latestAspects.entrySet().stream()
.map(
entry ->
Map.entry(
entry.getKey(), convertSystemAspectToNextVersionMap(entry.getValue())))
.filter(entry -> !entry.getValue().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Map<String, Set<String>> missingAspectVersions =
urnAspects.entrySet().stream()
.flatMap(
entry ->
entry.getValue().stream()
.map(aspectName -> Pair.of(entry.getKey(), aspectName)))
.filter(
urnAspectName ->
!precalculatedVersions
.getOrDefault(urnAspectName.getKey(), Map.of())
.containsKey(urnAspectName.getValue()))
.collect(
Collectors.groupingBy(
Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet())));

final Map<String, Map<String, Long>> precalculatedVersions;
final Map<String, Set<String>> missingAspectVersions;
if (txContext.getFailedAttempts() > 2 && txContext.lastExceptionIsDuplicateKey()) {
log.warn(
"Multiple exceptions detected, last exception detected as DuplicateKey, fallback to database max(version)+1");
precalculatedVersions = Map.of();
missingAspectVersions = urnAspects;
} else {
precalculatedVersions =
latestAspects.entrySet().stream()
.map(
entry ->
Map.entry(
entry.getKey(), convertSystemAspectToNextVersionMap(entry.getValue())))
.filter(entry -> !entry.getValue().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

missingAspectVersions =
urnAspects.entrySet().stream()
.flatMap(
entry ->
entry.getValue().stream()
.map(aspectName -> Pair.of(entry.getKey(), aspectName)))
.filter(
urnAspectName ->
!precalculatedVersions
.getOrDefault(urnAspectName.getKey(), Map.of())
.containsKey(urnAspectName.getValue()))
.collect(
Collectors.groupingBy(
Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet())));
}

Map<String, Map<String, Long>> databaseVersions =
missingAspectVersions.isEmpty()
? Map.of()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.linkedin.metadata.entity;

import io.ebean.DuplicateKeyException;
import io.ebean.Transaction;
import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import org.springframework.lang.Nullable;

/** Wrap the transaction with additional information about the exceptions during retry. */
@Data
@AllArgsConstructor
@Accessors(fluent = true)
public class TransactionContext {
public static final int DEFAULT_MAX_TRANSACTION_RETRY = 3;

public static TransactionContext empty() {
return empty(DEFAULT_MAX_TRANSACTION_RETRY);
}

public static TransactionContext empty(@Nullable Integer maxRetries) {
return empty(null, maxRetries == null ? DEFAULT_MAX_TRANSACTION_RETRY : maxRetries);
}

public static TransactionContext empty(Transaction tx, int maxRetries) {
return new TransactionContext(tx, maxRetries, new ArrayList<>());
}

@Nullable private Transaction tx;
private int maxRetries;
@NonNull private List<RuntimeException> exceptions;

public TransactionContext success() {
exceptions.clear();
return this;
}

public TransactionContext addException(RuntimeException e) {
exceptions.add(e);
return this;
}

public int getFailedAttempts() {
return exceptions.size();
}

@Nullable
public RuntimeException lastException() {
return exceptions.isEmpty() ? null : exceptions.get(exceptions.size() - 1);
}

public boolean lastExceptionIsDuplicateKey() {
return lastException() instanceof DuplicateKeyException;
}

public boolean shouldAttemptRetry() {
return exceptions.size() <= maxRetries;
}

public void commitAndContinue() {
if (tx != null) {
tx.commitAndContinue();
}
success();
}
}
Loading

0 comments on commit fc92d23

Please sign in to comment.