Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset replica engine before primary-replica resync #32867

Closed
wants to merge 24 commits into from

Conversation

dnhatn
Copy link
Member

@dnhatn dnhatn commented Aug 15, 2018

When a replica starts following a newly promoted primary, it may have
some operations which don't exist on the new primary. We need to reset
replicas to the global checkpoint before executing primary-replica
resync. These two steps will align replicas to the primary.

This change resets an engine of a replica to the safe commit when
detecting a new primary term, then reindex operations from the local
translog up to the global checkpoint.

When a replica starts following a newly promoted primary, it may have
some operations which don't exist on the new primary. We need to reset
replicas to the global checkpoint before executing primary-replica
resync. These two steps will align replicas to the primary.

This change resets an engine of a replica to the safe commit when
detecting a new primary term, then reindex operations from the local
translog up to the global checkpoint.
@dnhatn dnhatn added >enhancement :Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. v7.0.0 v6.5.0 labels Aug 15, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

@dnhatn
Copy link
Member Author

dnhatn commented Aug 15, 2018

/cc @not-napoleon

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did an initial pass and left some comments

@@ -388,17 +388,23 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}

// for testing
final Engine recoverFromTranslog() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary? can we just use the recoverFromTranslog(Long.MAX_VALUE) instead in the tests?

@@ -196,6 +196,11 @@
protected volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
protected volatile long operationPrimaryTerm;
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();

private final AtomicReference<Engine> resettingEngineReference = new AtomicReference<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me having 2 AtomicReference in flight is very very confusing. I think we can simplify this by introducing an EngineReference class that we make final here and add some the reset logic internally. Or, alterantively keep the AtomicReference<EngineReference>.

it could look like this:

class EngineReference {
   private volatile Engine activeEngine;
   private volatile Engine pendingEngine;

  synchronized boolean hasPendingEngine() {
    return pendingEngine != null;
  } 

  synchronized void makeActiveReadOnly() {
     // do the lockdown thing...
  }

  synchronized void swapPendingEngine() {
    // do the swap... and close the current etc.
  }
}

this looks more contained and we can maybe test it in isolation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

return applyIndexOperation(seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
Engine.Operation.Origin.REPLICA, sourceToParse);
boolean isRetry, SourceToParse sourceToParse) throws IOException {
return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering why you did that and I think I understand what you are trying to do. You try to make sure we always get the latest engine ie. the locked down one if we swap it. but there is still a race imo. inside applyIndexOperation you might have an engine that is already closed unless you put a lock around it. The swap might be atomic but the reference might still receive writes after you locked it down, is this ok?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before swapping engines, we drain all IndexShardOperationPermits (backed by Semaphore) and a write operation requires an IndexShardPermit. I think we are okay here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ maybe we add this as a comment somewhere or even as an assertion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to use getEngineForResync here? Assume that there are documents already replicated by the new primary before this replica has received all resync operations. Also wondering why there have been no test failures, maybe test coverage is not good? Also note that getEngineForResync is probably not the best name for this. I think there's a bigger issue here, let's sync about this tomorrow.

@@ -2234,6 +2240,22 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return localCheckpointTracker.getStats(globalCheckpoint);
}

@Override
public Engine lockDownEngine() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we would do this entirely outside of the engine and maybe just pass and engine to the ctor of ReadOnlyEngine? do we need to make sure we don't receive writes after we did this or why do we acquire a write lock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move this to ctor of ReadOnlyEngine.

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just given this an initial look.


final boolean canResetEngine() {
// TODO: do not reset the following shard
return indexSettings.getIndexVersionCreated().onOrAfter(Version.V_6_4_0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICS (correct me if I'm wrong) you had to it this way because we don't know on what node version the primary is (i.e. if it is going to send maxSeqNo or not), and the shard is reset when we acquire the replica operation permit (i.e. possibly before we receive the first resync request). It's a shame because it means we can't ensure consistency for older indices. The only other solution I can think of right now would be to always send the maximum sequence number with the replication request (same as we do for the global checkpoint). We could then pass this to acquireReplicaOperationPermit (same as the global checkpoint).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ywelsch Yeah, you understood it correctly. I had the same thought but did not go with that option as I wasn't sure if it's a right trade-off. I am glad that you suggest it. Should we make that change into this PR or a separate prerequisite PR to reduce noise in this PR?

}

private void resetEngineUpToLocalCheckpoint(long recoverUpToSeqNo) throws IOException {
synchronized (mutex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I see this correctly, you're doing recoverFromTranslog under the mutex here? This can potentially block the cluster state update thread for minutes.

// the resetting engine will be activated only if its local_checkpoint at least this guard.
minRequiredCheckpointForResettingEngine.set(currentMaxSeqNo);
resettingEngineReference.set(resettingEngine);
changeState(IndexShardState.RECOVERING, "reset engine from=" + currentMaxSeqNo + " to=" + globalCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move the state back to RECOVERING?

}

private void completeResettingEngineWithLocalHistory() throws IOException {
synchronized (mutex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above. You can't do stuff that possibly blocks the mutex for minutes


@Override
public void refresh(String source) throws EngineException {
// noop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refreshes should not be happening? If so, should we throw an UnsupportedOperationException here?


@Override
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
throw new UnsupportedOperationException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to assert that all of these methods are never called?

/**
* An engine that does not accept writes, and always points stats, searcher to the last commit.
*/
final class ReadOnlyEngine extends Engine {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should give this a different name, in particular because we might have something similar for frozen indices. There it might be a more complete version of readonly, with possibility to take a Translog.Snapshot. Maybe we could call this SearchOnlyEngine.

return applyIndexOperation(seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp, isRetry,
Engine.Operation.Origin.REPLICA, sourceToParse);
boolean isRetry, SourceToParse sourceToParse) throws IOException {
return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, autoGeneratedTimeStamp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct.

@dnhatn
Copy link
Member Author

dnhatn commented Aug 17, 2018

@s1monw and @ywelsch It's ready for another round. Can you please take a look? Thank you!

@dnhatn dnhatn requested review from s1monw and ywelsch August 17, 2018 01:43
Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments. looks good!


@Override
protected void closeNoLock(String reason, CountDownLatch closedLatch) {
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should protect this against double counting down the closeLatch by wrapping this entire try block in

if (isClosed.compareAndSet(false, true)) {

}

this.seqNoStats = engine.getSeqNoStats(engine.getLastSyncedGlobalCheckpoint());
this.translogStats = engine.getTranslogStats();
this.lastCommittedSegmentInfos = engine.getLastCommittedSegmentInfos();
Searcher searcher = engine.acquireSearcher("lockdown", SearcherScope.INTERNAL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you leave a comment here that we keep a reference to the store implicitly through the searcher? I do wonder if we should make it explicit

this.lastCommittedSegmentInfos = engine.getLastCommittedSegmentInfos();
Searcher searcher = engine.acquireSearcher("lockdown", SearcherScope.INTERNAL);
try {
this.searcherManager = new SearcherManager(searcher.getDirectoryReader(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this searcher manager seems to be unclosed. I think you should close it as well in the closeNoLock method?

store.incRef();
Releasable releasable = store::decRef;
try (ReleasableLock ignored = readLock.acquire()) {
final EngineSearcher searcher = new EngineSearcher(source, searcherManager, store, logger);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you try to exercise this method to make sure we open a new searcher and close / release everything

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, getDocIds method in SearchOnlyEngineTests#testSearchOnlyEngine acquires searchers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

private final SearcherManager searcherManager;
private final Searcher lastCommitSearcher;

public SearchOnlyEngine(Engine engine) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder if it would make more sense to open this entire thing off a store directly and maybe just pass and EngineConfig to this. it would make it more generic and less bound to an engine. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like this:

 public SearchOnlyEngine(EngineConfig config) {
        super(config);
        try {
            Store store = config.getStore();
            store.incRef();
            DirectoryReader reader = null;
            boolean success = false;
            try {
                this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(store.directory());
                this.translogStats = new TranslogStats(0, 0, 0, 0, 0);
                final SequenceNumbers.CommitInfo seqNoStats =
                    SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet());
                long maxSeqNo = seqNoStats.maxSeqNo;
                long localCheckpoint = seqNoStats.localCheckpoint;
                this.seqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, localCheckpoint);
                reader = SeqIdGeneratingDirectoryReader.wrap(ElasticsearchDirectoryReader.wrap(DirectoryReader
                .open(store.directory()), config.getShardId()), config.getPrimaryTermSupplier().getAsLong());
                this.indexCommit = reader.getIndexCommit();
                this.searcherManager = new SearcherManager(reader, new SearcherFactory());
                success = true;
            } finally {
                if (success == false) {
                    IOUtils.close(reader, store::decRef);
                }
            }
        } catch (IOException e) {
            throw new UncheckedIOException(e); // this is stupid
        }
    }

I did something similar a while back so I had it ready... I am not sure it safe to use 💯

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adopted this, but I have to pass SeqNoStats from outside because we use a "reset" local checkpoint which may not equal to the value from an index commit.


@Override
public void maybePruneDeletes() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit extra newline

@@ -1266,14 +1269,16 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {

// package-private for testing
int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException {
recoveryState.getTranslog().totalOperations(snapshot.totalOperations());
recoveryState.getTranslog().totalOperationsOnStart(snapshot.totalOperations());
if (isEngineResetting() == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we can't just run this the same way we do if we are not resetting?

private Engine createNewEngine(EngineConfig config) throws IOException {
assert Thread.holdsLock(mutex);
if (state == IndexShardState.CLOSED) {
throw new AlreadyClosedException(shardId + " can't create engine - shard is closed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure, should we through IndexShardNotClosedException instead?

Copy link
Member Author

@dnhatn dnhatn Aug 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should throw IndexShardClosedException. AlreadyClosedException was a left-over when we folded Engine to IndexShard.

}

@Override
public void close() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this synchronized too. it's safer since you modify both references

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
return newMultiSnapshot(snapshots);
Snapshot snapshot = newMultiSnapshot(snapshots);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you just return snapshot if upToSeqNo is == Long.MAX_VALUE?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

synchronized void closePendingEngine() throws IOException {
final Engine closing = this.pendingEngine;
this.pendingEngine = null;
IOUtils.close(closing);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you restrict the mutex to the first two lines and call close outside the mutex?

}

@Override
public void close() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be synchronized so we get a consistent snapshot of the two engines?
Also, again, please do the closing outside the lock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@dnhatn
Copy link
Member Author

dnhatn commented Aug 21, 2018

@s1monw and @ywelsch I've addressed your comments. Can you please give it another go? I will beef up integration tests as Yannick suggested.

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is too much to review in one sitting. Can you open a PR just for the recoverFromTranslog change where we can now specify an upper bound?

getEngine().flush(true, true); // force=true to make sure that we roll a translog generation
getEngine().resetLocalCheckpoint(localCheckpoint);
}
logger.info("detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this log message does not contain the right "before" local checkpoint as you moved it to after the local checkpoint reset

getEngine().resetLocalCheckpoint(localCheckpoint);
getEngine().rollTranslogGeneration();
if (isEngineResetting()) {
engineHolder.closePendingEngine();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is insufficient in the presence of cascading primary failures. Assume that you have a primary failover, which wants to index sequence number range 2 to 5 (because global checkpoint on new primary was 2, and resync trim-off is 5). Now, while resyncing, the global checkpoint moves from 2 to 3, and the new primary fails. Another primary is selected, which, for our purposes, has the global checkpoint 3. In that case the doc with sequence number 3 will only be in the translog and the pending Lucene index. By throwing the pending Lucene index away here, we now have to reset the local checkpoint and replay from sequence number 2 (to seq number 3).
What the implementation does here though is to not reset the local checkpoint to number 2, but leave it at 3, which, if this new IndexCommit is flushed, will lead to the situation where the local checkpoint info in the index commit is wrong (i.e. it might not contain the operation number 3).

Copy link
Member Author

@dnhatn dnhatn Aug 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay because we start another engine after that in this case. Moreover, we stick with the "reset" local checkpoint (expose the local checkpoint of the active engine) while resetting the engine; thus the global checkpoint won't advance.

return null;
} else {
engineHolder.makeActiveEngineSearchOnly();
final Engine pendingEngine = createNewEngine(newEngineConfig());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this trims unsafe commits, possibly cleaning up segments that are referenced by the active search only engine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We open a directory reader in the constructor of a search-only engine and keeps that reader until we manually close the search-only engine. Holding that reader would prevent the segment files of the last commit from deleting during trimming unsafe commits.


private void completePendingEngineWithLocalHistory() throws IOException {
final Engine pendingEngine;
synchronized (mutex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to do this under the mutex?

this.pendingEngine = null;
}
}
IOUtils.close(closing);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we ensure that searches are not accessing acquireSearcher on the closed engine and switching to the new engine? Also, is there a test that checks that searches (with preference set to this node) continue to work during this transition.

Copy link
Member Author

@dnhatn dnhatn Aug 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a small interval in that callers might acquire searchers from the closed engine and hit AlreadClosedException. We can void this entirely by retrying on "AlreadClosedException" if the accessing engine is different than the current active engine. However, I am not sure if we should do it.

There is a test which continuously acquires searchers and makes sure that all acknowledged writes are maintained during the transition. https://github.com/elastic/elasticsearch/pull/32867/files#diff-b268f5fefa5837ece96b957e46f628cbR674 (getShardDocUIDs acquires a searcher and uses that searcher to collect document Ids).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I am not sure if we should do it.

why is that? We're building all this machinery to have search availability during the transition, except for this very short moment?

I had the same idea about retrying. An alternative would be to do refcounting for closing the engine, to ensure that we only actually close once all in-flight acquireSearcher calls have been completed.

Copy link
Member Author

@dnhatn dnhatn Aug 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I did not think this carefully. I thought we have to implement the retry on all methods that we support in the search-only, but I was wrong. We only need to implement the retry for "get" and "acquire searcher". These two methods should be simple. Thanks for this great question.

dnhatn added a commit to dnhatn/elasticsearch that referenced this pull request Aug 21, 2018
This change allows an engine to recover from its local translog up to
the given seqno. The extended API can be used in these use cases:

1. When a replica starts following a new primary, it resets its index to
the safe commit, then replays its local  translog up to the current
global checkpoint (see elastic#32867).

2. When a replica starts a peer-recovery, it can initialize the
start_sequence_number to the persisted global checkpoint instead of the
local checkpoint of the safe commit. A replica  will then replay its
local translog up to that global checkpoint before accepting remote
translog from the primary. This change will increase the chance of
operation-based recovery. I will make this in a follow-up.

Relates elastic#32867
@dnhatn
Copy link
Member Author

dnhatn commented Aug 21, 2018

@ywelsch I opened #33032 for the translog change.

dnhatn added a commit to dnhatn/elasticsearch that referenced this pull request Sep 5, 2018
This commit allows us to use different TranslogRecoveryRunner when recovering an engine from its local translog. This change is a prerequisite for the commit-based rollback PR (elastic#32867).

See elastic#32867 (comment)
dnhatn added a commit that referenced this pull request Sep 6, 2018
This commit allows us to use different TranslogRecoveryRunner when
recovering an engine from its local translog. This change is a
prerequisite for the commit-based rollback PR.

Relates #32867
@dnhatn
Copy link
Member Author

dnhatn commented Sep 6, 2018

Discussed this with @bleskes on another channel. We are going to split this PR into 3 smaller PRs so we can review.

dnhatn added a commit that referenced this pull request Sep 6, 2018
This commit allows us to use different TranslogRecoveryRunner when
recovering an engine from its local translog. This change is a
prerequisite for the commit-based rollback PR.

Relates #32867
s1monw added a commit to s1monw/elasticsearch that referenced this pull request Sep 10, 2018
This change adds an engine implementation that opens a reader on an
existing index but doesn't permit any refreshes or modifications
to the index.

Relates to elastic#32867
Relates to elastic#32844
@s1monw s1monw mentioned this pull request Sep 10, 2018
s1monw added a commit that referenced this pull request Sep 11, 2018
This change adds an engine implementation that opens a reader on an
existing index but doesn't permit any refreshes or modifications
to the index.

Relates to #32867
Relates to #32844
s1monw added a commit that referenced this pull request Sep 11, 2018
This change adds an engine implementation that opens a reader on an
existing index but doesn't permit any refreshes or modifications
to the index.

Relates to #32867
Relates to #32844
dnhatn added a commit that referenced this pull request Sep 12, 2018
When a replica starts following a newly promoted primary, it may have
some operations which don't exist on the new primary. Thus we need to
throw those operations to align a replica with the new primary. This can
be done by first resetting an engine from the safe commit, then replaying
the local translog up to the global checkpoint.

Relates #32867
dnhatn added a commit to dnhatn/elasticsearch that referenced this pull request Sep 12, 2018
If a shard was serving as a replica when another shard was promoted to
primary, then its Lucene index was reset to the global checkpoint.
However, if the new  primary fails before the primary/replica resync
completes and we are now being promoted, we have to restore the reverted
operations by replaying the translog to avoid losing acknowledged writes.

Relates elastic#32867
@dnhatn dnhatn added the WIP label Sep 12, 2018
dnhatn added a commit that referenced this pull request Sep 12, 2018
When a replica starts following a newly promoted primary, it may have
some operations which don't exist on the new primary. Thus we need to
throw those operations to align a replica with the new primary. This can
be done by first resetting an engine from the safe commit, then replaying
the local translog up to the global checkpoint.

Relates #32867
gwbrown pushed a commit to gwbrown/elasticsearch that referenced this pull request Sep 14, 2018
dnhatn added a commit that referenced this pull request Sep 20, 2018
If a shard was serving as a replica when another shard was promoted to
primary, then its Lucene index was reset to the global checkpoint.
However, if the new primary fails before the primary/replica resync
completes and we are now being promoted, we have to restore the reverted
operations by replaying the translog to avoid losing acknowledged writes.

Relates #33473
Relates #32867
dnhatn added a commit that referenced this pull request Sep 20, 2018
If a shard was serving as a replica when another shard was promoted to
primary, then its Lucene index was reset to the global checkpoint.
However, if the new primary fails before the primary/replica resync
completes and we are now being promoted, we have to restore the reverted
operations by replaying the translog to avoid losing acknowledged writes.

Relates #33473
Relates #32867
@colings86 colings86 added v6.6.0 and removed v6.5.0 labels Oct 25, 2018
kcm pushed a commit that referenced this pull request Oct 30, 2018
If a shard was serving as a replica when another shard was promoted to
primary, then its Lucene index was reset to the global checkpoint.
However, if the new primary fails before the primary/replica resync
completes and we are now being promoted, we have to restore the reverted
operations by replaying the translog to avoid losing acknowledged writes.

Relates #33473
Relates #32867
dnhatn added a commit that referenced this pull request Dec 9, 2018
Today we expose a new engine immediately during Lucene rollback. The new
engine is started with a safe commit which might not include all
acknowledged operation. With this change, we won't expose the new engine
until it has recovered from the local translog.

Note that this solution is not complete since it's able to reserve only
acknowledged operations before the global checkpoint. This is because we
replay translog up to the global checkpoint during rollback. A per-doc
Lucene rollback would solve this issue entirely.

Relates #32867
@dnhatn
Copy link
Member Author

dnhatn commented Dec 9, 2018

All subtasks of this PR were done and merged. I am closing this.

@dnhatn dnhatn closed this Dec 9, 2018
@dnhatn dnhatn deleted the rollback-on-resync branch December 9, 2018 02:29
dnhatn added a commit that referenced this pull request Dec 9, 2018
Today we expose a new engine immediately during Lucene rollback. The new
engine is started with a safe commit which might not include all
acknowledged operation. With this change, we won't expose the new engine
until it has recovered from the local translog.

Note that this solution is not complete since it's able to reserve only
acknowledged operations before the global checkpoint. This is because we
replay translog up to the global checkpoint during rollback. A per-doc
Lucene rollback would solve this issue entirely.

Relates #32867
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. >enhancement WIP
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants