-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reset replica engine before primary-replica resync #32867
Changes from all commits
1105179
2e10680
216b890
0ebb7e3
95f3e9f
ce10e16
92021f7
b60d0e8
ca7e57f
f0a1496
8f47769
a224c6b
7e08bd3
eacd9b2
aa01fbd
b3ac7e1
ef98712
021f94b
607535d
06da55a
b0ac93a
72ef838
379509e
0f19a28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,356 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.index.engine; | ||
|
||
import org.apache.lucene.index.DirectoryReader; | ||
import org.apache.lucene.index.SegmentInfos; | ||
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; | ||
import org.apache.lucene.search.SearcherFactory; | ||
import org.apache.lucene.search.SearcherManager; | ||
import org.apache.lucene.store.AlreadyClosedException; | ||
import org.elasticsearch.common.lease.Releasable; | ||
import org.elasticsearch.common.lease.Releasables; | ||
import org.elasticsearch.common.lucene.Lucene; | ||
import org.elasticsearch.common.util.concurrent.ReleasableLock; | ||
import org.elasticsearch.core.internal.io.IOUtils; | ||
import org.elasticsearch.index.mapper.MapperService; | ||
import org.elasticsearch.index.seqno.SeqNoStats; | ||
import org.elasticsearch.index.translog.Translog; | ||
import org.elasticsearch.index.translog.TranslogStats; | ||
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.function.BiFunction; | ||
import java.util.stream.Stream; | ||
|
||
/** | ||
* A minimal engine that does not accept writes, and always points stats, searcher to the last commit. | ||
*/ | ||
public final class SearchOnlyEngine extends Engine { | ||
private final SegmentInfos lastCommittedSegmentInfos; | ||
private final SeqNoStats seqNoStats; | ||
private final TranslogStats translogStats; | ||
private final SearcherManager searcherManager; | ||
|
||
public SearchOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats) { | ||
super(config); | ||
this.seqNoStats = seqNoStats; | ||
this.translogStats = translogStats; | ||
try { | ||
store.incRef(); | ||
DirectoryReader reader = null; | ||
boolean success = false; | ||
try { | ||
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); | ||
reader = DirectoryReader.open(store.directory()); | ||
if (config.getIndexSettings().isSoftDeleteEnabled()) { | ||
reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); | ||
} | ||
this.searcherManager = new SearcherManager(reader, new SearcherFactory()); | ||
success = true; | ||
} finally { | ||
if (success == false) { | ||
IOUtils.close(reader, store::decRef); | ||
} | ||
} | ||
} catch (IOException e) { | ||
throw new UncheckedIOException(e); | ||
} | ||
} | ||
|
||
@Override | ||
protected void closeNoLock(String reason, CountDownLatch closedLatch) { | ||
if (isClosed.compareAndSet(false, true)) { | ||
try { | ||
IOUtils.close(searcherManager, store::decRef); | ||
} catch (Exception ex) { | ||
logger.warn("failed to close engine", ex); | ||
} finally { | ||
closedLatch.countDown(); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void flushAndClose() throws IOException { | ||
// make a flush as a noop so that callers can close (and flush) this engine without worrying about the engine type. | ||
close(); | ||
} | ||
|
||
@Override | ||
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException { | ||
return getFromSearcher(get, this::acquireSearcher, SearcherScope.INTERNAL); | ||
} | ||
|
||
@Override | ||
public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { | ||
ensureOpen(); | ||
Releasable releasable = null; | ||
try (ReleasableLock ignored = readLock.acquire()) { | ||
store.incRef(); | ||
releasable = store::decRef; | ||
final EngineSearcher searcher = new EngineSearcher(source, searcherManager, store, logger); | ||
releasable = null; // hand over the reference to the engine searcher | ||
return searcher; | ||
} catch (AlreadyClosedException ex) { | ||
throw ex; | ||
} catch (Exception ex) { | ||
ensureOpen(ex); // throw AlreadyClosedException if it's closed | ||
throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex); | ||
} finally { | ||
Releasables.close(releasable); | ||
} | ||
} | ||
|
||
@Override | ||
protected SegmentInfos getLastCommittedSegmentInfos() { | ||
return lastCommittedSegmentInfos; | ||
} | ||
|
||
@Override | ||
public String getHistoryUUID() { | ||
return lastCommittedSegmentInfos.userData.get(Engine.HISTORY_UUID_KEY); | ||
} | ||
|
||
@Override | ||
public long getWritingBytes() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public long getIndexThrottleTimeInMillis() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public boolean isThrottled() { | ||
return false; | ||
} | ||
|
||
@Override | ||
public IndexResult index(Index index) { | ||
return ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public DeleteResult delete(Delete delete) { | ||
return ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public NoOpResult noOp(NoOp noOp) { | ||
return ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public boolean isTranslogSyncNeeded() { | ||
return false; | ||
} | ||
|
||
@Override | ||
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) { | ||
return false; | ||
} | ||
|
||
@Override | ||
public void syncTranslog() { | ||
// noop | ||
} | ||
|
||
@Override | ||
public Closeable acquireRetentionLockForPeerRecovery() { | ||
return ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, | ||
long fromSeqNo, long toSeqNo, boolean requiredFullRange) { | ||
return ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public Translog.Snapshot readHistoryOperations(String source, MapperService mapperService, long startingSeqNo) { | ||
return ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public int estimateNumberOfHistoryOperations(String source, MapperService mapperService, long startingSeqNo) { | ||
return ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public boolean hasCompleteOperationHistory(String source, MapperService mapperService, long startingSeqNo) { | ||
return ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public TranslogStats getTranslogStats() { | ||
return translogStats; | ||
} | ||
|
||
@Override | ||
public Translog.Location getTranslogLastWriteLocation() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is called by |
||
// noop - returns null as the caller treats null as noop. | ||
return null; | ||
} | ||
|
||
@Override | ||
public long getLocalCheckpoint() { | ||
return seqNoStats.getLocalCheckpoint(); | ||
} | ||
|
||
@Override | ||
public void waitForOpsToComplete(long seqNo) { | ||
ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public void resetLocalCheckpoint(long newCheckpoint) { | ||
ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public SeqNoStats getSeqNoStats(long globalCheckpoint) { | ||
return new SeqNoStats(seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), globalCheckpoint); | ||
} | ||
|
||
@Override | ||
public long getLastSyncedGlobalCheckpoint() { | ||
return seqNoStats.getGlobalCheckpoint(); | ||
} | ||
|
||
@Override | ||
public long getIndexBufferRAMBytesUsed() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public List<Segment> segments(boolean verbose) { | ||
return Arrays.asList(getSegmentInfo(lastCommittedSegmentInfos, verbose)); | ||
} | ||
|
||
@Override | ||
public void refresh(String source) throws EngineException { | ||
// noop | ||
} | ||
|
||
@Override | ||
public void writeIndexingBuffer() throws EngineException { | ||
|
||
} | ||
|
||
@Override | ||
public boolean shouldPeriodicallyFlush() { | ||
return false; | ||
} | ||
|
||
@Override | ||
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this could possibly called I think |
||
return SyncedFlushResult.PENDING_OPERATIONS; | ||
} | ||
|
||
@Override | ||
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { | ||
return new CommitId(lastCommittedSegmentInfos.getId()); | ||
} | ||
|
||
@Override | ||
public CommitId flush() throws EngineException { | ||
return flush(false, false); | ||
} | ||
|
||
@Override | ||
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, | ||
boolean upgrade, boolean upgradeOnlyAncientSegments) { | ||
ensureUnsupportedMethodNeverCalled(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, this might be called by a user-invoked force-merge |
||
} | ||
|
||
@Override | ||
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) { | ||
return ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public IndexCommitRef acquireSafeIndexCommit() { | ||
return ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public void activateThrottling() { | ||
} | ||
|
||
@Override | ||
public void deactivateThrottling() { | ||
} | ||
|
||
@Override | ||
public void trimUnreferencedTranslogFiles() { | ||
ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public boolean shouldRollTranslogGeneration() { | ||
return false; | ||
} | ||
|
||
@Override | ||
public void rollTranslogGeneration() throws EngineException { | ||
// noop | ||
} | ||
|
||
@Override | ||
public void restoreLocalCheckpointFromTranslog() { | ||
ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public int fillSeqNoGaps(long primaryTerm) { | ||
return ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public Engine recoverFromTranslog(long recoverUpToSeqNo) { | ||
return ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public void skipTranslogRecovery() { | ||
ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException { | ||
ensureUnsupportedMethodNeverCalled(); | ||
} | ||
|
||
@Override | ||
public void maybePruneDeletes() { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit extra newline |
||
} | ||
|
||
private <T> T ensureUnsupportedMethodNeverCalled() { | ||
assert false : "invoking an unsupported method in a search-only engine"; | ||
throw new UnsupportedOperationException(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
afaics this might be called by
maybeFSyncTranslogs
because engine might be swapped betweenisTranslogSyncNeeded
and this call