-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce a History UUID as a requirement for ops based recovery #26577
Changes from all commits
ca9b913
6c7b399
3f4991c
30921fc
152b87e
5d3693b
d7a39f1
2e4dc13
5f98e48
8b68522
853322b
27b51b7
af3ae0f
fa4430f
da69f42
73dc78e
1b40bba
06951f6
63983ba
0a86367
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,7 @@ | |
import org.elasticsearch.Version; | ||
import org.elasticsearch.action.index.IndexRequest; | ||
import org.elasticsearch.common.Nullable; | ||
import org.elasticsearch.common.UUIDs; | ||
import org.elasticsearch.common.lease.Releasable; | ||
import org.elasticsearch.common.lucene.LoggerInfoStream; | ||
import org.elasticsearch.common.lucene.Lucene; | ||
|
@@ -142,6 +143,8 @@ public class InternalEngine extends Engine { | |
private final CounterMetric numVersionLookups = new CounterMetric(); | ||
private final CounterMetric numIndexVersionsLookups = new CounterMetric(); | ||
|
||
@Nullable | ||
private final String historyUUID; | ||
|
||
public InternalEngine(EngineConfig engineConfig) throws EngineException { | ||
super(engineConfig); | ||
|
@@ -174,15 +177,23 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException { | |
switch (openMode) { | ||
case OPEN_INDEX_AND_TRANSLOG: | ||
writer = createWriter(false); | ||
String existingHistoryUUID = loadHistoryUUIDFromCommit(writer); | ||
if (existingHistoryUUID == null) { | ||
historyUUID = UUIDs.randomBase64UUID(); | ||
} else { | ||
historyUUID = existingHistoryUUID; | ||
} | ||
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()); | ||
seqNoStats = store.loadSeqNoStats(globalCheckpoint); | ||
break; | ||
case OPEN_INDEX_CREATE_TRANSLOG: | ||
writer = createWriter(false); | ||
historyUUID = loadHistoryUUIDFromCommit(writer); | ||
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO); | ||
break; | ||
case CREATE_INDEX_AND_TRANSLOG: | ||
writer = createWriter(true); | ||
historyUUID = UUIDs.randomBase64UUID(); | ||
seqNoStats = new SeqNoStats( | ||
SequenceNumbers.NO_OPS_PERFORMED, | ||
SequenceNumbers.NO_OPS_PERFORMED, | ||
|
@@ -342,6 +353,12 @@ private void recoverFromTranslogInternal() throws IOException { | |
flush(true, true); | ||
} else if (translog.isCurrent(translogGeneration) == false) { | ||
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); | ||
refreshLastCommittedSegmentInfos(); | ||
} else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we also have to put it in the commit if the existing UUID is different to the one we have in the local var? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with the current logic it will be a bug - the history should stay the same for the index life time and only go from null -> a value. With Yannick's suggestion it might be needed but I prefer to do it as a follow up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
assert historyUUID != null; | ||
// put the history uuid into the index | ||
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID)); | ||
refreshLastCommittedSegmentInfos(); | ||
} | ||
// clean up what's not needed | ||
translog.trimUnreferencedReaders(); | ||
|
@@ -382,6 +399,11 @@ public Translog getTranslog() { | |
return translog; | ||
} | ||
|
||
@Override | ||
public String getHistoryUUID() { | ||
return historyUUID; | ||
} | ||
|
||
/** | ||
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current | ||
* translog id into lucene and returns null. | ||
|
@@ -401,6 +423,19 @@ private String loadTranslogUUIDFromCommit(IndexWriter writer) throws IOException | |
} | ||
} | ||
|
||
/** | ||
* Reads the current stored history ID from the IW commit data. If the id is not found, returns null. | ||
*/ | ||
@Nullable | ||
private String loadHistoryUUIDFromCommit(final IndexWriter writer) throws IOException { | ||
String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY); | ||
if (uuid == null) { | ||
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) : | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we make this a hard fail? I wonder if we should return a null uuid here it will just fail later?! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, can do - it shouldn't happen anyway. I tend to communicate this "if this happens it's a bug" with assertions but I'm happy to convert to an IllegalState or something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah ++ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On a second thought - assertions cause nodes to die, which means our integration tests will be stopped on the spot - with an illegal state, the nodes we recover by doing another recovery and we'll never see the failure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fair enough |
||
"index was created after 6_0_0_rc1 but has no history uuid"; | ||
} | ||
return uuid; | ||
} | ||
|
||
private SearcherManager createSearcherManager() throws EngineException { | ||
boolean success = false; | ||
SearcherManager searcherManager = null; | ||
|
@@ -1312,30 +1347,8 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti | |
} catch (Exception e) { | ||
throw new FlushFailedEngineException(shardId, e); | ||
} | ||
/* | ||
* we have to inc-ref the store here since if the engine is closed by a tragic event | ||
* we don't acquire the write lock and wait until we have exclusive access. This might also | ||
* dec the store reference which can essentially close the store and unless we can inc the reference | ||
* we can't use it. | ||
*/ | ||
store.incRef(); | ||
try { | ||
// reread the last committed segment infos | ||
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); | ||
} catch (Exception e) { | ||
if (isClosed.get() == false) { | ||
try { | ||
logger.warn("failed to read latest segment infos on flush", e); | ||
} catch (Exception inner) { | ||
e.addSuppressed(inner); | ||
} | ||
if (Lucene.isCorruptionException(e)) { | ||
throw new FlushFailedEngineException(shardId, e); | ||
} | ||
} | ||
} finally { | ||
store.decRef(); | ||
} | ||
refreshLastCommittedSegmentInfos(); | ||
|
||
} | ||
newCommitId = lastCommittedSegmentInfos.getId(); | ||
} catch (FlushFailedEngineException ex) { | ||
|
@@ -1353,6 +1366,33 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti | |
return new CommitId(newCommitId); | ||
} | ||
|
||
private void refreshLastCommittedSegmentInfos() { | ||
/* | ||
* we have to inc-ref the store here since if the engine is closed by a tragic event | ||
* we don't acquire the write lock and wait until we have exclusive access. This might also | ||
* dec the store reference which can essentially close the store and unless we can inc the reference | ||
* we can't use it. | ||
*/ | ||
store.incRef(); | ||
try { | ||
// reread the last committed segment infos | ||
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); | ||
} catch (Exception e) { | ||
if (isClosed.get() == false) { | ||
try { | ||
logger.warn("failed to read latest segment infos on flush", e); | ||
} catch (Exception inner) { | ||
e.addSuppressed(inner); | ||
} | ||
if (Lucene.isCorruptionException(e)) { | ||
throw new FlushFailedEngineException(shardId, e); | ||
} | ||
} | ||
} finally { | ||
store.decRef(); | ||
} | ||
} | ||
|
||
@Override | ||
public void rollTranslogGeneration() throws EngineException { | ||
try (ReleasableLock ignored = readLock.acquire()) { | ||
|
@@ -1874,7 +1914,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl | |
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time | ||
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). | ||
*/ | ||
final Map<String, String> commitData = new HashMap<>(5); | ||
final Map<String, String> commitData = new HashMap<>(6); | ||
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration); | ||
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); | ||
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue); | ||
|
@@ -1883,6 +1923,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl | |
} | ||
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); | ||
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); | ||
if (historyUUID != null) { | ||
commitData.put(HISTORY_UUID_KEY, historyUUID); | ||
} | ||
logger.trace("committing writer with commit data [{}]", commitData); | ||
return commitData.entrySet().iterator(); | ||
}); | ||
|
@@ -1992,7 +2035,7 @@ public boolean isRecovering() { | |
* Gets the commit data from {@link IndexWriter} as a map. | ||
*/ | ||
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) { | ||
Map<String, String> commitData = new HashMap<>(5); | ||
Map<String, String> commitData = new HashMap<>(6); | ||
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) { | ||
commitData.put(entry.getKey(), entry.getValue()); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Determining whether to generate a fresh or reuse the existing historyUUID should be done based on the ShardRouting.recoverySource() and can be set in the engine config:
EMPTY_STORE, SNAPSHOT, LOCAL_SHARDS => create fresh history ID
EXISTING_STORE, PEER => use existing history id (if available, otherwise create fresh)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great suggestion. I prefer to use it as a follow up PR when I actually fix the snapshot issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am ok with a followup. good catch @ywelsch