Skip to content

Commit

Permalink
Tighten sequence numbers recovery
Browse files Browse the repository at this point in the history
This commit touches addresses issues related to recovery and sequence numbers:
 - A sequence number can be assigned and a Lucene commit created with a
   maximum sequence number at least as large as that sequence number,
   yet the operation corresponding to that sequence number can be
   missing from both the Lucene commit and the translog. This means that
   upon recovery the local checkpoint will be stuck at or below this
   missing sequence number. To address this, we force the local
   checkpoint to the maximum sequence number in the Lucene commit when
   opening the engine. Note that there can still be gaps in the history
   in the translog but we do not address those here.
 - The global checkpoint is transferred to the target shard at the end
   of peer recovery.
 - Additionally, we reenable the relocation integration tests.

Lastly, this work uncovered some bugs in the assignment of sequence
numbers on replica operations:
 - setting the sequence number on replica write requests was missing,
   very likely introduced as a result of resolving merge conflicts
 - handling operations that arrive out of order on a replica and have a
   version conflict with a previous operation were never marked as
   processed

Relates #22212
  • Loading branch information
jasontedor authored Dec 17, 2016
1 parent 1f3eb06 commit 58d73ba
Show file tree
Hide file tree
Showing 14 changed files with 368 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
Expand Down Expand Up @@ -150,6 +151,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
final long version = indexResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(indexResult.getSeqNo());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(),
indexResult.getVersion(), indexResult.isCreated());
Expand All @@ -173,6 +175,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(deleteResult.getVersion());
deleteRequest.seqNo(deleteResult.getSeqNo());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(),
deleteResult.getVersion(), deleteResult.isFound());
Expand All @@ -182,6 +185,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}

// update the bulk item request because update request execution can mutate the bulk item request
request.items()[requestIndex] = replicaRequest;
if (operationResult == null) { // in case of noop update operation
Expand Down Expand Up @@ -282,6 +286,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
final long version = updateOperationResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(updateOperationResult.getSeqNo());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
}
break;
Expand All @@ -292,6 +297,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(updateOperationResult.getVersion());
deleteRequest.seqNo(updateOperationResult.getSeqNo());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
}
break;
Expand Down Expand Up @@ -342,6 +348,10 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
break;
}
assert (replicaRequest.request() instanceof IndexRequest
&& ((IndexRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) ||
(replicaRequest.request() instanceof DeleteRequest
&& ((DeleteRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO);
// successful operation
break; // out of retry loop
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
Expand All @@ -364,10 +374,10 @@ protected WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, I
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
operationResult = executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica);
operationResult = executeIndexRequestOnReplica((IndexRequest) docWriteRequest, replica);
break;
case DELETE:
operationResult = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica);
operationResult = executeDeleteRequestOnReplica((DeleteRequest) docWriteRequest, replica);
break;
default:
throw new IllegalStateException("Unexpected request operation type on replica: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, Inde
// update the request with the version so it will go to the replicas
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.version(result.getVersion());
request.seqNo(result.getSeqNo());
assert request.versionType().validateVersionForWrites(request.version());
response = new DeleteResponse(
primary.shardId(),
Expand Down
143 changes: 96 additions & 47 deletions core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -86,6 +87,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

public class InternalEngine extends Engine {

Expand Down Expand Up @@ -175,9 +177,18 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
throw new IllegalArgumentException(openMode.toString());
}
logger.trace("recovered [{}]", seqNoStats);
indexWriter = writer;
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint);
// norelease
/*
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means
* that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local
* checkpoint to the maximum sequence number in the commit (at the potential expense of correctness).
*/
while (seqNoService().getLocalCheckpoint() < seqNoService().getMaxSeqNo()) {
seqNoService().markSeqNoAsCompleted(seqNoService().getLocalCheckpoint() + 1);
}
indexWriter = writer;
translog = openTranslog(engineConfig, writer, () -> seqNoService().getLocalCheckpoint());
assert translog.getGeneration() != null;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand Down Expand Up @@ -412,7 +423,7 @@ private SearcherManager createSearcherManager() throws EngineException {

@Override
public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
if (get.realtime()) {
VersionValue versionValue = versionMap.getUnderLock(get.uid());
Expand All @@ -434,11 +445,28 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
}
}

private boolean checkVersionConflict(
final Operation op,
final long currentVersion,
final long expectedVersion,
final boolean deleted) {
/**
* Checks for version conflicts. If a version conflict exists, the optional return value represents the operation result. Otherwise, if
* no conflicts are found, the optional return value is not present.
*
* @param <T> the result type
* @param op the operation
* @param currentVersion the current version
* @param expectedVersion the expected version
* @param deleted {@code true} if the current version is not found or represents a delete
* @param onSuccess if there is a version conflict that can be ignored, the result of the operation
* @param onFailure if there is a version conflict that can not be ignored, the result of the operation
* @return if there is a version conflict, the optional value is present and represents the operation result, otherwise the return value
* is not present
*/
private <T extends Result> Optional<T> checkVersionConflict(
final Operation op,
final long currentVersion,
final long expectedVersion,
final boolean deleted,
final Supplier<T> onSuccess,
final Function<VersionConflictEngineException, T> onFailure) {
final T result;
if (op.versionType() == VersionType.FORCE) {
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
// If index was created in 5.0 or later, 'force' is not allowed at all
Expand All @@ -452,14 +480,22 @@ private boolean checkVersionConflict(
if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (op.origin().isRecovery()) {
// version conflict, but okay
return true;
result = onSuccess.get();
} else {
// fatal version conflict
throw new VersionConflictEngineException(shardId, op.type(), op.id(),
final VersionConflictEngineException e =
new VersionConflictEngineException(
shardId,
op.type(),
op.id(),
op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
result = onFailure.apply(e);
}

return Optional.of(result);
} else {
return Optional.empty();
}
return false;
}

private long checkDeletedAndGCed(VersionValue versionValue) {
Expand All @@ -475,7 +511,7 @@ private long checkDeletedAndGCed(VersionValue versionValue) {
@Override
public IndexResult index(Index index) {
IndexResult result;
try (ReleasableLock lock = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
if (index.origin().isRecovery()) {
// Don't throttle recovery operations
Expand Down Expand Up @@ -573,7 +609,7 @@ private IndexResult innerIndex(Index index) throws IOException {
assert assertSequenceNumber(index.origin(), index.seqNo());
final Translog.Location location;
final long updatedVersion;
IndexResult indexResult = null;
long seqNo = index.seqNo();
try (Releasable ignored = acquireLock(index.uid())) {
lastWriteNanos = index.startTime();
/* if we have an autoGeneratedID that comes into the engine we can potentially optimize
Expand Down Expand Up @@ -638,27 +674,32 @@ private IndexResult innerIndex(Index index) throws IOException {
}
}
final long expectedVersion = index.version();
if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) {
// skip index operation because of version conflict on recovery
indexResult = new IndexResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, false);
final Optional<IndexResult> checkVersionConflictResult =
checkVersionConflict(
index,
currentVersion,
expectedVersion,
deleted,
() -> new IndexResult(currentVersion, index.seqNo(), false),
e -> new IndexResult(e, currentVersion, index.seqNo()));

final IndexResult indexResult;
if (checkVersionConflictResult.isPresent()) {
indexResult = checkVersionConflictResult.get();
} else {
final long seqNo;
// no version conflict
if (index.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService.generateSeqNo();
} else {
seqNo = index.seqNo();
seqNo = seqNoService().generateSeqNo();
}
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);

// Update the document's sequence number and primary term, the
// sequence number here is derived here from either the sequence
// number service if this is on the primary, or the existing
// document's sequence number if this is on the replica. The
// primary term here has already been set, see
// IndexShard.prepareIndex where the Engine.Index operation is
// created
/**
* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
*/
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);

if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create, but double check if assertions are running
Expand All @@ -669,17 +710,17 @@ private IndexResult innerIndex(Index index) throws IOException {
}
indexResult = new IndexResult(updatedVersion, seqNo, deleted);
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
? translog.add(new Translog.Index(index, indexResult))
: null;
? translog.add(new Translog.Index(index, indexResult))
: null;
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
indexResult.setTranslogLocation(location);
}
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
return indexResult;
} finally {
if (indexResult != null && indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo());
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService().markSeqNoAsCompleted(seqNo);
}
}

Expand Down Expand Up @@ -724,7 +765,7 @@ private static void update(final Term uid, final List<ParseContext.Document> doc
@Override
public DeleteResult delete(Delete delete) {
DeleteResult result;
try (ReleasableLock lock = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
result = innerDelete(delete);
Expand All @@ -748,7 +789,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
final Translog.Location location;
final long updatedVersion;
final boolean found;
DeleteResult deleteResult = null;
long seqNo = delete.seqNo();
try (Releasable ignored = acquireLock(delete.uid())) {
lastWriteNanos = delete.startTime();
final long currentVersion;
Expand All @@ -764,32 +805,40 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
}

final long expectedVersion = delete.version();
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) {
// skip executing delete because of version conflict on recovery
deleteResult = new DeleteResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, true);

final Optional<DeleteResult> result =
checkVersionConflict(
delete,
currentVersion,
expectedVersion,
deleted,
() -> new DeleteResult(expectedVersion, delete.seqNo(), true),
e -> new DeleteResult(e, expectedVersion, delete.seqNo()));

final DeleteResult deleteResult;
if (result.isPresent()) {
deleteResult = result.get();
} else {
final long seqNo;
if (delete.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService.generateSeqNo();
} else {
seqNo = delete.seqNo();
seqNo = seqNoService().generateSeqNo();
}

updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
? translog.add(new Translog.Delete(delete, deleteResult))
: null;
? translog.add(new Translog.Delete(delete, deleteResult))
: null;
versionMap.putUnderLock(delete.uid().bytes(),
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
deleteResult.setTranslogLocation(location);
}
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
return deleteResult;
} finally {
if (deleteResult != null && deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo());
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService().markSeqNoAsCompleted(seqNo);
}
}
}
Expand Down
Loading

0 comments on commit 58d73ba

Please sign in to comment.