Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tighten sequence numbers recovery #22212

Merged
merged 7 commits into from
Dec 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
Expand Down Expand Up @@ -150,6 +151,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
final long version = indexResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(indexResult.getSeqNo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe assert at the end of this method that the seqNo is set on the replica request?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 1c71393.

assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(),
indexResult.getVersion(), indexResult.isCreated());
Expand All @@ -173,6 +175,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(deleteResult.getVersion());
deleteRequest.seqNo(deleteResult.getSeqNo());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(),
deleteResult.getVersion(), deleteResult.isFound());
Expand All @@ -182,6 +185,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
break;
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
}

// update the bulk item request because update request execution can mutate the bulk item request
request.items()[requestIndex] = replicaRequest;
if (operationResult == null) { // in case of noop update operation
Expand Down Expand Up @@ -282,6 +286,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
final long version = updateOperationResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(updateOperationResult.getSeqNo());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
}
break;
Expand All @@ -292,6 +297,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(updateOperationResult.getVersion());
deleteRequest.seqNo(updateOperationResult.getSeqNo());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
}
break;
Expand Down Expand Up @@ -342,6 +348,10 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
break;
}
assert (replicaRequest.request() instanceof IndexRequest
&& ((IndexRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) ||
(replicaRequest.request() instanceof DeleteRequest
&& ((DeleteRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO);
// successful operation
break; // out of retry loop
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
Expand All @@ -364,10 +374,10 @@ protected WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, I
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
operationResult = executeIndexRequestOnReplica(((IndexRequest) docWriteRequest), replica);
operationResult = executeIndexRequestOnReplica((IndexRequest) docWriteRequest, replica);
break;
case DELETE:
operationResult = executeDeleteRequestOnReplica(((DeleteRequest) docWriteRequest), replica);
operationResult = executeDeleteRequestOnReplica((DeleteRequest) docWriteRequest, replica);
break;
default:
throw new IllegalStateException("Unexpected request operation type on replica: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, Inde
// update the request with the version so it will go to the replicas
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.version(result.getVersion());
request.seqNo(result.getSeqNo());
assert request.versionType().validateVersionForWrites(request.version());
response = new DeleteResponse(
primary.shardId(),
Expand Down
143 changes: 96 additions & 47 deletions core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -86,6 +87,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

public class InternalEngine extends Engine {

Expand Down Expand Up @@ -175,9 +177,18 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
throw new IllegalArgumentException(openMode.toString());
}
logger.trace("recovered [{}]", seqNoStats);
indexWriter = writer;
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint);
// norelease
/*
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means
* that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local
* checkpoint to the maximum sequence number in the commit (at the potential expense of correctness).
*/
while (seqNoService().getLocalCheckpoint() < seqNoService().getMaxSeqNo()) {
seqNoService().markSeqNoAsCompleted(seqNoService().getLocalCheckpoint() + 1);
}
indexWriter = writer;
translog = openTranslog(engineConfig, writer, () -> seqNoService().getLocalCheckpoint());
assert translog.getGeneration() != null;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand Down Expand Up @@ -412,7 +423,7 @@ private SearcherManager createSearcherManager() throws EngineException {

@Override
public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
if (get.realtime()) {
VersionValue versionValue = versionMap.getUnderLock(get.uid());
Expand All @@ -434,11 +445,28 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
}
}

private boolean checkVersionConflict(
final Operation op,
final long currentVersion,
final long expectedVersion,
final boolean deleted) {
/**
* Checks for version conflicts. If a version conflict exists, the optional return value represents the operation result. Otherwise, if
* no conflicts are found, the optional return value is not present.
*
* @param <T> the result type
* @param op the operation
* @param currentVersion the current version
* @param expectedVersion the expected version
* @param deleted {@code true} if the current version is not found or represents a delete
* @param onSuccess if there is a version conflict that can be ignored, the result of the operation
* @param onFailure if there is a version conflict that can not be ignored, the result of the operation
* @return if there is a version conflict, the optional value is present and represents the operation result, otherwise the return value
* is not present
*/
private <T extends Result> Optional<T> checkVersionConflict(
final Operation op,
final long currentVersion,
final long expectedVersion,
final boolean deleted,
final Supplier<T> onSuccess,
final Function<VersionConflictEngineException, T> onFailure) {
final T result;
if (op.versionType() == VersionType.FORCE) {
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
// If index was created in 5.0 or later, 'force' is not allowed at all
Expand All @@ -452,14 +480,22 @@ private boolean checkVersionConflict(
if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (op.origin().isRecovery()) {
// version conflict, but okay
return true;
result = onSuccess.get();
} else {
// fatal version conflict
throw new VersionConflictEngineException(shardId, op.type(), op.id(),
final VersionConflictEngineException e =
new VersionConflictEngineException(
shardId,
op.type(),
op.id(),
op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
result = onFailure.apply(e);
}

return Optional.of(result);
} else {
return Optional.empty();
}
return false;
}

private long checkDeletedAndGCed(VersionValue versionValue) {
Expand All @@ -475,7 +511,7 @@ private long checkDeletedAndGCed(VersionValue versionValue) {
@Override
public IndexResult index(Index index) {
IndexResult result;
try (ReleasableLock lock = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
if (index.origin().isRecovery()) {
// Don't throttle recovery operations
Expand Down Expand Up @@ -573,7 +609,7 @@ private IndexResult innerIndex(Index index) throws IOException {
assert assertSequenceNumber(index.origin(), index.seqNo());
final Translog.Location location;
final long updatedVersion;
IndexResult indexResult = null;
long seqNo = index.seqNo();
try (Releasable ignored = acquireLock(index.uid())) {
lastWriteNanos = index.startTime();
/* if we have an autoGeneratedID that comes into the engine we can potentially optimize
Expand Down Expand Up @@ -638,27 +674,32 @@ private IndexResult innerIndex(Index index) throws IOException {
}
}
final long expectedVersion = index.version();
if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) {
// skip index operation because of version conflict on recovery
indexResult = new IndexResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, false);
final Optional<IndexResult> checkVersionConflictResult =
checkVersionConflict(
index,
currentVersion,
expectedVersion,
deleted,
() -> new IndexResult(currentVersion, index.seqNo(), false),
e -> new IndexResult(e, currentVersion, index.seqNo()));

final IndexResult indexResult;
if (checkVersionConflictResult.isPresent()) {
indexResult = checkVersionConflictResult.get();
} else {
final long seqNo;
// no version conflict
if (index.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService.generateSeqNo();
} else {
seqNo = index.seqNo();
seqNo = seqNoService().generateSeqNo();
}
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);

// Update the document's sequence number and primary term, the
// sequence number here is derived here from either the sequence
// number service if this is on the primary, or the existing
// document's sequence number if this is on the replica. The
// primary term here has already been set, see
// IndexShard.prepareIndex where the Engine.Index operation is
// created
/**
* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
*/
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);

if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create, but double check if assertions are running
Expand All @@ -669,17 +710,17 @@ private IndexResult innerIndex(Index index) throws IOException {
}
indexResult = new IndexResult(updatedVersion, seqNo, deleted);
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
? translog.add(new Translog.Index(index, indexResult))
: null;
? translog.add(new Translog.Index(index, indexResult))
: null;
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
indexResult.setTranslogLocation(location);
}
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
return indexResult;
} finally {
if (indexResult != null && indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo());
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService().markSeqNoAsCompleted(seqNo);
}
}

Expand Down Expand Up @@ -724,7 +765,7 @@ private static void update(final Term uid, final List<ParseContext.Document> doc
@Override
public DeleteResult delete(Delete delete) {
DeleteResult result;
try (ReleasableLock lock = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
result = innerDelete(delete);
Expand All @@ -748,7 +789,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
final Translog.Location location;
final long updatedVersion;
final boolean found;
DeleteResult deleteResult = null;
long seqNo = delete.seqNo();
try (Releasable ignored = acquireLock(delete.uid())) {
lastWriteNanos = delete.startTime();
final long currentVersion;
Expand All @@ -764,32 +805,40 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
}

final long expectedVersion = delete.version();
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) {
// skip executing delete because of version conflict on recovery
deleteResult = new DeleteResult(expectedVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, true);

final Optional<DeleteResult> result =
checkVersionConflict(
delete,
currentVersion,
expectedVersion,
deleted,
() -> new DeleteResult(expectedVersion, delete.seqNo(), true),
e -> new DeleteResult(e, expectedVersion, delete.seqNo()));

final DeleteResult deleteResult;
if (result.isPresent()) {
deleteResult = result.get();
} else {
final long seqNo;
if (delete.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService.generateSeqNo();
} else {
seqNo = delete.seqNo();
seqNo = seqNoService().generateSeqNo();
}

updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
deleteResult = new DeleteResult(updatedVersion, seqNo, found);
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
? translog.add(new Translog.Delete(delete, deleteResult))
: null;
? translog.add(new Translog.Delete(delete, deleteResult))
: null;
versionMap.putUnderLock(delete.uid().bytes(),
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
deleteResult.setTranslogLocation(location);
}
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
return deleteResult;
} finally {
if (deleteResult != null && deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo());
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService().markSeqNoAsCompleted(seqNo);
}
}
}
Expand Down
Loading