Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(entity-service): fallback logic for aspect version #11304

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public AspectsBatchImplBuilder mcps(
mcp, auditStamp, retrieverContext.getAspectRetriever());
}
} catch (IllegalArgumentException e) {
log.error("Invalid proposal, skipping and proceeding with batch: " + mcp, e);
log.error("Invalid proposal, skipping and proceeding with batch: {}", mcp, e);
return null;
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import io.ebean.Transaction;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -27,10 +25,10 @@
* aspect is set to 0 for efficient retrieval. In most cases only the latest state of an aspect will
* be fetched. See {@link EntityServiceImpl} for more details.
*
* <p>TODO: This interface exposes {@link #runInTransactionWithRetry(Supplier, int)} because {@link
* EntityServiceImpl} concerns itself with batching multiple commands into a single transaction. It
* exposes storage concerns somewhat and it'd be worth looking into ways to move this responsibility
* inside {@link AspectDao} implementations.
* <p>TODO: This interface exposes {@link #runInTransactionWithRetry(Function, int)}
* (TransactionContext)} because {@link EntityServiceImpl} concerns itself with batching multiple
* commands into a single transaction. It exposes storage concerns somewhat and it'd be worth
* looking into ways to move this responsibility inside {@link AspectDao} implementations.
*/
public interface AspectDao {
String ASPECT_WRITE_COUNT_METRIC_NAME = "aspectWriteCount";
Expand Down Expand Up @@ -77,7 +75,7 @@ Map<String, Map<String, EntityAspect>> getLatestAspects(
Map<String, Set<String>> urnAspects, boolean forUpdate);

void saveAspect(
@Nullable Transaction tx,
@Nullable TransactionContext txContext,
@Nonnull final String urn,
@Nonnull final String aspectName,
@Nonnull final String aspectMetadata,
Expand All @@ -89,10 +87,12 @@ void saveAspect(
final boolean insert);

void saveAspect(
@Nullable Transaction tx, @Nonnull final EntityAspect aspect, final boolean insert);
@Nullable TransactionContext txContext,
@Nonnull final EntityAspect aspect,
final boolean insert);

long saveLatestAspect(
@Nullable Transaction tx,
@Nullable TransactionContext txContext,
@Nonnull final String urn,
@Nonnull final String aspectName,
@Nullable final String oldAspectMetadata,
Expand All @@ -107,7 +107,7 @@ long saveLatestAspect(
@Nullable final String newSystemMetadata,
final Long nextVersion);

void deleteAspect(@Nullable Transaction tx, @Nonnull final EntityAspect aspect);
void deleteAspect(@Nullable TransactionContext txContext, @Nonnull final EntityAspect aspect);

@Nonnull
ListResult<String> listUrns(
Expand All @@ -125,7 +125,7 @@ ListResult<String> listUrns(
@Nonnull
Stream<EntityAspect> streamAspects(String entityName, String aspectName);

int deleteUrn(@Nullable Transaction tx, @Nonnull final String urn);
int deleteUrn(@Nullable TransactionContext txContext, @Nonnull final String urn);

@Nonnull
ListResult<String> listLatestAspectMetadata(
Expand Down Expand Up @@ -159,11 +159,11 @@ default Map<String, Long> getNextVersions(

@Nonnull
<T> T runInTransactionWithRetry(
@Nonnull final Function<Transaction, T> block, final int maxTransactionRetry);
@Nonnull final Function<TransactionContext, T> block, final int maxTransactionRetry);

@Nonnull
default <T> List<T> runInTransactionWithRetry(
@Nonnull final Function<Transaction, T> block,
@Nonnull final Function<TransactionContext, T> block,
AspectsBatch batch,
final int maxTransactionRetry) {
return List.of(runInTransactionWithRetry(block, maxTransactionRetry));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.SYSTEM_ACTOR;
import static com.linkedin.metadata.Constants.UI_SOURCE;
import static com.linkedin.metadata.entity.TransactionContext.DEFAULT_MAX_TRANSACTION_RETRY;
import static com.linkedin.metadata.utils.PegasusUtils.constructMCL;
import static com.linkedin.metadata.utils.PegasusUtils.getDataTemplateClassFromSchema;
import static com.linkedin.metadata.utils.PegasusUtils.urnToEntityName;
Expand Down Expand Up @@ -79,7 +80,6 @@
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import io.ebean.Transaction;
import io.opentelemetry.extension.annotations.WithSpan;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -146,8 +146,6 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
* As described above, the latest version of an aspect should <b>always</b> take the value 0, with
* monotonically increasing version incrementing as usual once the latest version is replaced.
*/
private static final int DEFAULT_MAX_TRANSACTION_RETRY = 3;

protected final AspectDao aspectDao;

@VisibleForTesting @Getter private final EventProducer producer;
Expand Down Expand Up @@ -837,7 +835,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(

return aspectDao
.runInTransactionWithRetry(
(tx) -> {
(txContext) -> {
// Generate default aspects within the transaction (they are re-calculated on retry)
AspectsBatch batchWithDefaults =
DefaultAspectsUtil.withAdditionalChanges(
Expand All @@ -852,7 +850,8 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
aspectDao.getLatestAspects(urnAspects, true));
// read #2 (potentially)
final Map<String, Map<String, Long>> nextVersions =
EntityUtils.calculateNextVersions(aspectDao, latestAspects, urnAspects);
EntityUtils.calculateNextVersions(
txContext, aspectDao, latestAspects, urnAspects);

// 1. Convert patches to full upserts
// 2. Run any entity/aspect level hooks
Expand All @@ -872,7 +871,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(

Map<String, Map<String, Long>> newNextVersions =
EntityUtils.calculateNextVersions(
aspectDao, updatedLatestAspects, updatedItems.getFirst());
txContext, aspectDao, updatedLatestAspects, updatedItems.getFirst());
// merge
updatedNextVersions = AspectsBatch.merge(nextVersions, newNextVersions);
} else {
Expand Down Expand Up @@ -939,7 +938,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
if (overwrite || latest == null) {
result =
ingestAspectToLocalDB(
tx,
txContext,
item.getUrn(),
item.getAspectName(),
item.getRecordTemplate(),
Expand Down Expand Up @@ -973,8 +972,8 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
.collect(Collectors.toList());

// commit upserts prior to retention or kafka send, if supported by impl
if (tx != null) {
tx.commitAndContinue();
if (txContext != null) {
txContext.commitAndContinue();
}
long took = ingestToLocalDBTimer.stop();
log.info(
Expand Down Expand Up @@ -2209,7 +2208,7 @@ private RollbackResult deleteAspectWithoutMCL(

final RollbackResult result =
aspectDao.runInTransactionWithRetry(
(tx) -> {
(txContext) -> {
Integer additionalRowsDeleted = 0;

// 1. Fetch the latest existing version of the aspect.
Expand Down Expand Up @@ -2282,7 +2281,7 @@ private RollbackResult deleteAspectWithoutMCL(
}

// 5. Apply deletes and fix up latest row
aspectsToDelete.forEach(aspect -> aspectDao.deleteAspect(tx, aspect));
aspectsToDelete.forEach(aspect -> aspectDao.deleteAspect(txContext, aspect));

if (survivingAspect != null) {
// if there was a surviving aspect, copy its information into the latest row
Expand All @@ -2300,16 +2299,16 @@ private RollbackResult deleteAspectWithoutMCL(
latest
.getEntityAspect()
.setCreatedFor(survivingAspect.getEntityAspect().getCreatedFor());
aspectDao.saveAspect(tx, latest.getEntityAspect(), false);
aspectDao.saveAspect(txContext, latest.getEntityAspect(), false);
// metrics
aspectDao.incrementWriteMetrics(
aspectName, 1, latest.getMetadataRaw().getBytes(StandardCharsets.UTF_8).length);
aspectDao.deleteAspect(tx, survivingAspect.getEntityAspect());
aspectDao.deleteAspect(txContext, survivingAspect.getEntityAspect());
} else {
if (isKeyAspect) {
if (hardDelete) {
// If this is the key aspect, delete the entity entirely.
additionalRowsDeleted = aspectDao.deleteUrn(tx, urn);
additionalRowsDeleted = aspectDao.deleteUrn(txContext, urn);
} else if (deleteItem.getEntitySpec().hasAspect(Constants.STATUS_ASPECT_NAME)) {
// soft delete by setting status.removed=true (if applicable)
final Status statusAspect = new Status();
Expand All @@ -2326,7 +2325,7 @@ private RollbackResult deleteAspectWithoutMCL(
}
} else {
// Else, only delete the specific aspect.
aspectDao.deleteAspect(tx, latest.getEntityAspect());
aspectDao.deleteAspect(txContext, latest.getEntityAspect());
}
}

Expand Down Expand Up @@ -2466,7 +2465,7 @@ private Map<EntityAspectIdentifier, EnvelopedAspect> getEnvelopedAspects(

@Nonnull
private UpdateAspectResult ingestAspectToLocalDB(
@Nullable Transaction tx,
@Nullable TransactionContext txContext,
@Nonnull final Urn urn,
@Nonnull final String aspectName,
@Nonnull final RecordTemplate newValue,
Expand Down Expand Up @@ -2495,7 +2494,7 @@ private UpdateAspectResult ingestAspectToLocalDB(
latest.getEntityAspect().setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata));

log.info("Ingesting aspect with name {}, urn {}", aspectName, urn);
aspectDao.saveAspect(tx, latest.getEntityAspect(), false);
aspectDao.saveAspect(txContext, latest.getEntityAspect(), false);

// metrics
aspectDao.incrementWriteMetrics(
Expand All @@ -2518,7 +2517,7 @@ private UpdateAspectResult ingestAspectToLocalDB(
String newValueStr = EntityApiUtils.toJsonAspect(newValue);
long versionOfOld =
aspectDao.saveLatestAspect(
tx,
txContext,
urn.toString(),
aspectName,
latest == null ? null : EntityApiUtils.toJsonAspect(oldValue),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,38 +285,51 @@ public static List<SystemAspect> toSystemAspects(
* Use the precalculated next version from system metadata if it exists, otherwise lookup the next
* version the normal way from the database
*
* @param txContext
* @param aspectDao database access
* @param latestAspects aspect version 0 with system metadata
* @param urnAspects urn/aspects which we need next version information for
* @return map of the urn/aspect to the next aspect version
*/
public static Map<String, Map<String, Long>> calculateNextVersions(
TransactionContext txContext,
AspectDao aspectDao,
Map<String, Map<String, SystemAspect>> latestAspects,
Map<String, Set<String>> urnAspects) {
Map<String, Map<String, Long>> precalculatedVersions =
latestAspects.entrySet().stream()
.map(
entry ->
Map.entry(
entry.getKey(), convertSystemAspectToNextVersionMap(entry.getValue())))
.filter(entry -> !entry.getValue().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Map<String, Set<String>> missingAspectVersions =
urnAspects.entrySet().stream()
.flatMap(
entry ->
entry.getValue().stream()
.map(aspectName -> Pair.of(entry.getKey(), aspectName)))
.filter(
urnAspectName ->
!precalculatedVersions
.getOrDefault(urnAspectName.getKey(), Map.of())
.containsKey(urnAspectName.getValue()))
.collect(
Collectors.groupingBy(
Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet())));

final Map<String, Map<String, Long>> precalculatedVersions;
final Map<String, Set<String>> missingAspectVersions;
if (txContext.getFailedAttempts() > 2 && txContext.lastExceptionIsDuplicateKey()) {
log.warn(
"Multiple exceptions detected, last exception detected as DuplicateKey, fallback to database max(version)+1");
precalculatedVersions = Map.of();
missingAspectVersions = urnAspects;
} else {
precalculatedVersions =
latestAspects.entrySet().stream()
.map(
entry ->
Map.entry(
entry.getKey(), convertSystemAspectToNextVersionMap(entry.getValue())))
.filter(entry -> !entry.getValue().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

missingAspectVersions =
urnAspects.entrySet().stream()
.flatMap(
entry ->
entry.getValue().stream()
.map(aspectName -> Pair.of(entry.getKey(), aspectName)))
.filter(
urnAspectName ->
!precalculatedVersions
.getOrDefault(urnAspectName.getKey(), Map.of())
.containsKey(urnAspectName.getValue()))
.collect(
Collectors.groupingBy(
Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet())));
}

Map<String, Map<String, Long>> databaseVersions =
missingAspectVersions.isEmpty()
? Map.of()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.linkedin.metadata.entity;

import io.ebean.DuplicateKeyException;
import io.ebean.Transaction;
import java.util.ArrayList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import org.springframework.lang.Nullable;

/** Wrap the transaction with additional information about the exceptions during retry. */
@Data
@AllArgsConstructor
@Accessors(fluent = true)
public class TransactionContext {
public static final int DEFAULT_MAX_TRANSACTION_RETRY = 3;

public static TransactionContext empty() {
return empty(DEFAULT_MAX_TRANSACTION_RETRY);
}

public static TransactionContext empty(@Nullable Integer maxRetries) {
return empty(null, maxRetries == null ? DEFAULT_MAX_TRANSACTION_RETRY : maxRetries);
}

public static TransactionContext empty(Transaction tx, int maxRetries) {
return new TransactionContext(tx, maxRetries, new ArrayList<>());
}

@Nullable private Transaction tx;
private int maxRetries;
@NonNull private List<RuntimeException> exceptions;

public TransactionContext success() {
exceptions.clear();
return this;
}

public TransactionContext addException(RuntimeException e) {
exceptions.add(e);
return this;
}

public int getFailedAttempts() {
return exceptions.size();
}

@Nullable
public RuntimeException lastException() {
return exceptions.isEmpty() ? null : exceptions.get(exceptions.size() - 1);
}

public boolean lastExceptionIsDuplicateKey() {
return lastException() instanceof DuplicateKeyException;
}

public boolean shouldAttemptRetry() {
return exceptions.size() <= maxRetries;
}

public void commitAndContinue() {
if (tx != null) {
tx.commitAndContinue();
}
success();
}
}
Loading
Loading