Skip to content

Commit

Permalink
fix(elasticsearch): fixes out of order runId writes (#6845)
Browse files Browse the repository at this point in the history
Co-authored-by: leifker <[email protected]>
Co-authored-by: Pedro Silva <[email protected]>
  • Loading branch information
3 people authored Dec 27, 2022
1 parent ec8a4e0 commit cb6314c
Show file tree
Hide file tree
Showing 16 changed files with 85 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ public IngestProposalResult ingestProposal(@Nonnull MetadataChangeProposal mcp,
_producer.produceMetadataChangeProposal(mcp);
return new IngestProposalResult(mcp.getEntityUrn(), false);
}
} else {
} else {
// For timeseries aspects
newAspect = convertToRecordTemplate(mcp, aspectSpec);
newSystemMetadata = mcp.getSystemMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
Expand All @@ -37,13 +36,12 @@ public class ESGraphWriteDAO {
* @param docId the ID of the document
*/
public void upsertDocument(@Nonnull String docId, @Nonnull String document) {
final IndexRequest indexRequest =
new IndexRequest(indexConvention.getIndexName(INDEX_NAME)).id(docId).source(document, XContentType.JSON);
final UpdateRequest updateRequest =
new UpdateRequest(indexConvention.getIndexName(INDEX_NAME), docId).doc(document, XContentType.JSON)
.detectNoop(false)
.retryOnConflict(numRetries)
.upsert(indexRequest);
final UpdateRequest updateRequest = new UpdateRequest(
indexConvention.getIndexName(INDEX_NAME), docId)
.detectNoop(false)
.docAsUpsert(true)
.doc(document, XContentType.JSON)
.retryOnConflict(numRetries);
bulkProcessor.add(updateRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
Expand Down Expand Up @@ -37,12 +36,13 @@ public class ESWriteDAO {
*/
public void upsertDocument(@Nonnull String entityName, @Nonnull String document, @Nonnull String docId) {
final String indexName = indexConvention.getIndexName(entityRegistry.getEntitySpec(entityName));
final IndexRequest indexRequest = new IndexRequest(indexName).id(docId).source(document, XContentType.JSON);
final UpdateRequest updateRequest =
new UpdateRequest(indexName, docId).doc(document, XContentType.JSON)
.detectNoop(false)
.retryOnConflict(numRetries)
.upsert(indexRequest);
final UpdateRequest updateRequest = new UpdateRequest(
indexName, docId)
.detectNoop(false)
.docAsUpsert(true)
.doc(document, XContentType.JSON)
.retryOnConflict(numRetries);

bulkProcessor.add(updateRequest);
}

Expand All @@ -62,7 +62,12 @@ public void deleteDocument(@Nonnull String entityName, @Nonnull String docId) {
*/
public void applyScriptUpdate(@Nonnull String entityName, @Nonnull String docId, @Nonnull String script) {
final String indexName = indexConvention.getIndexName(entityRegistry.getEntitySpec(entityName));
bulkProcessor.add(new UpdateRequest(indexName, docId).retryOnConflict(numRetries).script(new Script(script)));
UpdateRequest updateRequest = new UpdateRequest(indexName, docId)
.detectNoop(false)
.scriptedUpsert(true)
.retryOnConflict(numRetries)
.script(new Script(script));
bulkProcessor.add(updateRequest);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
Expand Down Expand Up @@ -49,13 +48,12 @@ public class ESSystemMetadataDAO {
* @param docId the ID of the document
*/
public void upsertDocument(@Nonnull String docId, @Nonnull String document) {
final IndexRequest indexRequest =
new IndexRequest(indexConvention.getIndexName(INDEX_NAME)).id(docId).source(document, XContentType.JSON);
final UpdateRequest updateRequest =
new UpdateRequest(indexConvention.getIndexName(INDEX_NAME), docId).doc(document, XContentType.JSON)
.detectNoop(false)
.retryOnConflict(numRetries)
.upsert(indexRequest);
final UpdateRequest updateRequest = new UpdateRequest(
indexConvention.getIndexName(INDEX_NAME), docId)
.detectNoop(false)
.docAsUpsert(true)
.doc(document, XContentType.JSON)
.retryOnConflict(numRetries);
bulkProcessor.add(updateRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
Expand Down Expand Up @@ -119,12 +118,12 @@ public void configure() {
public void upsertDocument(@Nonnull String entityName, @Nonnull String aspectName, @Nonnull String docId,
@Nonnull JsonNode document) {
String indexName = _indexConvention.getTimeseriesAspectIndexName(entityName, aspectName);
final IndexRequest indexRequest =
new IndexRequest(indexName).id(docId).source(document.toString(), XContentType.JSON);
final UpdateRequest updateRequest = new UpdateRequest(indexName, docId).doc(document.toString(), XContentType.JSON)
final UpdateRequest updateRequest = new UpdateRequest(
indexName, docId)
.detectNoop(false)
.retryOnConflict(_numRetries)
.upsert(indexRequest);
.docAsUpsert(true)
.doc(document.toString(), XContentType.JSON)
.retryOnConflict(_numRetries);
_bulkProcessor.add(updateRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;

import static com.linkedin.metadata.ElasticSearchTestConfiguration.syncAfterWrite;
import static com.linkedin.metadata.systemmetadata.ElasticSearchSystemMetadataService.INDEX_NAME;
import static org.testng.Assert.assertEquals;

@Import(ElasticSearchTestConfiguration.class)
Expand All @@ -34,7 +33,7 @@ public class ElasticSearchSystemMetadataServiceTest extends AbstractTestNGSpring
@Autowired
private ESIndexBuilder _esIndexBuilder;
private final IndexConvention _indexConvention = new IndexConventionImpl("es_system_metadata_service_test");
private final String _indexName = _indexConvention.getIndexName(INDEX_NAME);

private ElasticSearchSystemMetadataService _client;

@BeforeClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentType;


@Slf4j
Expand Down Expand Up @@ -46,11 +47,12 @@ private static DeleteRequest createDeleteRequest(@Nonnull ElasticEvent event) {

@Nonnull
private UpdateRequest createUpsertRequest(@Nonnull ElasticEvent event) {
final IndexRequest indexRequest = new IndexRequest(event.getIndex()).id(event.getId()).source(event.buildJson());
return new UpdateRequest(event.getIndex(), event.getId()).doc(event.buildJson())
.detectNoop(false)
.retryOnConflict(_numRetries)
.upsert(indexRequest);
return new UpdateRequest(
event.getIndex(), event.getId())
.detectNoop(false)
.docAsUpsert(true)
.doc(event.buildJson(), XContentType.JSON)
.retryOnConflict(_numRetries);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import com.linkedin.restli.client.Client;
import com.linkedin.usage.UsageClient;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -26,10 +27,16 @@ public class UsageClientFactory {
@Value("${DATAHUB_GMS_SSL_PROTOCOL:#{null}}")
private String gmsSslProtocol;

@Value("${usageClient.retryInterval:2}")
private int retryInterval;

@Value("${usageClient.numRetries:3}")
private int numRetries;

@Bean("usageClient")
public UsageClient getUsageClient() {
Client restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
return new UsageClient(restClient);
return new UsageClient(restClient, new ExponentialBackoff(retryInterval), numRetries);
}
}

4 changes: 4 additions & 0 deletions metadata-service/factories/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,7 @@ views:
entityClient:
retryInterval: ${ENTITY_CLIENT_RETRY_INTERVAL:2}
numRetries: ${ENTITY_CLIENT_NUM_RETRIES:3}

usageClient:
retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2}
numRetries: ${USAGE_CLIENT_NUM_RETRIES:3}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.linkedin.common.callback.FutureCallback;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import com.linkedin.r2.RemoteInvocationException;

import java.util.Objects;
Expand All @@ -26,10 +25,6 @@ public abstract class BaseClient implements AutoCloseable {
protected final BackoffPolicy _backoffPolicy;
protected final int _retryCount;

protected BaseClient(@Nonnull Client restliClient) {
this(restliClient, new ExponentialBackoff(2), 3);
}

protected BaseClient(@Nonnull Client restliClient, BackoffPolicy backoffPolicy, int retryCount) {
_client = Objects.requireNonNull(restliClient);
_backoffPolicy = backoffPolicy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.linkedin.common.WindowDuration;
import com.linkedin.common.client.BaseClient;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.restli.client.Client;
import java.net.URISyntaxException;
Expand All @@ -16,8 +17,8 @@ public class UsageClient extends BaseClient {
private static final UsageStatsRequestBuilders USAGE_STATS_REQUEST_BUILDERS =
new UsageStatsRequestBuilders();

public UsageClient(@Nonnull final Client restliClient) {
super(restliClient);
public UsageClient(@Nonnull final Client restliClient, @Nonnull final BackoffPolicy backoffPolicy, int retryCount) {
super(restliClient, backoffPolicy, retryCount);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.parseq.Task;
import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.internal.server.methods.AnyRecord;
Expand All @@ -32,16 +33,17 @@
import java.net.URISyntaxException;
import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.resources.entity.ResourceUtils.*;
import static com.linkedin.metadata.resources.restli.RestliConstants.*;
import static com.linkedin.metadata.resources.restli.RestliConstants.PARAM_FILTER;
import static com.linkedin.metadata.resources.restli.RestliConstants.PARAM_LIMIT;
import static com.linkedin.metadata.resources.restli.RestliConstants.PARAM_URN;
import static com.linkedin.metadata.resources.restli.RestliConstants.PARAM_URN_LIKE;


/**
Expand Down Expand Up @@ -153,15 +155,18 @@ public Task<String> ingestProposal(
String actorUrnStr = authentication.getActor().toUrnStr();
final AuditStamp auditStamp = new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(actorUrnStr));

final List<MetadataChangeProposal> additionalChanges =
AspectUtils.getAdditionalChanges(metadataChangeProposal, _entityService);

return RestliUtil.toTask(() -> {
log.debug("Proposal: {}", metadataChangeProposal);
try {
Urn urn = _entityService.ingestProposal(metadataChangeProposal, auditStamp, asyncBool).getUrn();
additionalChanges.forEach(proposal -> _entityService.ingestProposal(proposal, auditStamp, asyncBool));
tryIndexRunId(urn, metadataChangeProposal.getSystemMetadata(), _entitySearchService);
EntityService.IngestProposalResult result = _entityService.ingestProposal(metadataChangeProposal, auditStamp, asyncBool);
Urn urn = result.getUrn();

AspectUtils.getAdditionalChanges(metadataChangeProposal, _entityService)
.forEach(proposal -> _entityService.ingestProposal(proposal, auditStamp, asyncBool));

if (!result.isDidUpdate()) {
tryIndexRunId(urn, metadataChangeProposal.getSystemMetadata(), _entitySearchService);
}
return urn.toString();
} catch (ValidationException e) {
throw new RestLiServiceException(HttpStatus.S_422_UNPROCESSABLE_ENTITY, e.getMessage());
Expand Down Expand Up @@ -201,4 +206,11 @@ public Task<String> restoreIndices(@ActionParam(PARAM_ASPECT) @Optional @Nonnull
return result.toString();
}, MetricRegistry.name(this.getClass(), "restoreIndices"));
}

private static void tryIndexRunId(final Urn urn, final @Nullable SystemMetadata systemMetadata,
final EntitySearchService entitySearchService) {
if (systemMetadata != null && systemMetadata.hasRunId()) {
entitySearchService.appendRunId(urn.getEntityType(), urn, systemMetadata.getRunId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.maven.artifact.versioning.ComparableVersion;

import static com.linkedin.metadata.resources.entity.ResourceUtils.*;
import static com.linkedin.metadata.entity.validation.ValidationUtils.*;
import static com.linkedin.metadata.resources.restli.RestliConstants.*;
import static com.linkedin.metadata.search.utils.SearchUtils.*;
Expand Down Expand Up @@ -223,8 +222,6 @@ public Task<Void> ingest(@ActionParam(PARAM_ENTITY) @Nonnull Entity entity,
final SystemMetadata finalSystemMetadata = systemMetadata;
return RestliUtil.toTask(() -> {
_entityService.ingestEntity(entity, auditStamp, finalSystemMetadata);
tryIndexRunId(com.datahub.util.ModelUtils.getUrnFromSnapshotUnion(entity.getValue()), systemMetadata,
_entitySearchService);
return null;
}, MetricRegistry.name(this.getClass(), "ingest"));
}
Expand Down Expand Up @@ -259,15 +256,8 @@ public Task<Void> batchIngest(@ActionParam(PARAM_ENTITIES) @Nonnull Entity[] ent
.map(systemMetadata -> populateDefaultFieldsIfEmpty(systemMetadata))
.collect(Collectors.toList());

SystemMetadata[] finalSystemMetadataList1 = systemMetadataList;
return RestliUtil.toTask(() -> {
_entityService.ingestEntities(Arrays.asList(entities), auditStamp, finalSystemMetadataList);
for (int i = 0; i < entities.length; i++) {
SystemMetadata systemMetadata = finalSystemMetadataList1[i];
Entity entity = entities[i];
tryIndexRunId(com.datahub.util.ModelUtils.getUrnFromSnapshotUnion(entity.getValue()), systemMetadata,
_entitySearchService);
}
return null;
}, MetricRegistry.name(this.getClass(), "batchIngest"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package com.linkedin.metadata.resources.entity;

import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.mxe.SystemMetadata;
import java.util.Set;
import javax.annotation.Nullable;


public class ResourceUtils {
Expand All @@ -17,10 +13,4 @@ public static Set<String> getAllAspectNames(final EntityService entityService, f
return entityService.getEntityAspectNames(entityName);
}

public static void tryIndexRunId(final Urn urn, final @Nullable SystemMetadata systemMetadata,
final EntitySearchService entitySearchService) {
if (systemMetadata != null && systemMetadata.hasRunId()) {
entitySearchService.appendRunId(urn.getEntityType(), urn, systemMetadata.getRunId());
}
}
}
Loading

0 comments on commit cb6314c

Please sign in to comment.