Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a History UUID as a requirement for ops based recovery #26577

Merged
merged 20 commits into from
Sep 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/
allprojects {
ext.bwc_tests_enabled = true
ext.bwc_tests_enabled = false
}

task verifyBwcTestsEnabled {
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
public abstract class Engine implements Closeable {

public static final String SYNC_COMMIT_ID = "sync_id";
public static final String HISTORY_UUID_KEY = "history_uuid";

protected final ShardId shardId;
protected final String allocationId;
Expand Down Expand Up @@ -183,6 +184,9 @@ public MergeStats getMergeStats() {
return new MergeStats();
}

/** returns the history uuid for the engine */
public abstract String getHistoryUUID();

/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -142,6 +143,8 @@ public class InternalEngine extends Engine {
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();

@Nullable
private final String historyUUID;

public InternalEngine(EngineConfig engineConfig) throws EngineException {
super(engineConfig);
Expand Down Expand Up @@ -174,15 +177,23 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
String existingHistoryUUID = loadHistoryUUIDFromCommit(writer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Determining whether to generate a fresh or reuse the existing historyUUID should be done based on the ShardRouting.recoverySource() and can be set in the engine config:

EMPTY_STORE, SNAPSHOT, LOCAL_SHARDS => create fresh history ID
EXISTING_STORE, PEER => use existing history id (if available, otherwise create fresh)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great suggestion. I prefer to use it as a follow up PR when I actually fix the snapshot issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok with a followup. good catch @ywelsch

if (existingHistoryUUID == null) {
historyUUID = UUIDs.randomBase64UUID();
} else {
historyUUID = existingHistoryUUID;
}
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
historyUUID = loadHistoryUUIDFromCommit(writer);
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
historyUUID = UUIDs.randomBase64UUID();
seqNoStats = new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
Expand Down Expand Up @@ -342,6 +353,12 @@ private void recoverFromTranslogInternal() throws IOException {
flush(true, true);
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
} else if (lastCommittedSegmentInfos.getUserData().containsKey(HISTORY_UUID_KEY) == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also have to put it in the commit if the existing UUID is different to the one we have in the local var?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the current logic it will be a bug - the history should stay the same for the index life time and only go from null -> a value. With Yannick's suggestion it might be needed but I prefer to do it as a follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

assert historyUUID != null;
// put the history uuid into the index
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
refreshLastCommittedSegmentInfos();
}
// clean up what's not needed
translog.trimUnreferencedReaders();
Expand Down Expand Up @@ -382,6 +399,11 @@ public Translog getTranslog() {
return translog;
}

@Override
public String getHistoryUUID() {
return historyUUID;
}

/**
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
* translog id into lucene and returns null.
Expand All @@ -401,6 +423,19 @@ private String loadTranslogUUIDFromCommit(IndexWriter writer) throws IOException
}
}

/**
* Reads the current stored history ID from the IW commit data. If the id is not found, returns null.
*/
@Nullable
private String loadHistoryUUIDFromCommit(final IndexWriter writer) throws IOException {
String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
if (uuid == null) {
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make this a hard fail? I wonder if we should return a null uuid here it will just fail later?!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, can do - it shouldn't happen anyway. I tend to communicate this "if this happens it's a bug" with assertions but I'm happy to convert to an IllegalState or something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah ++

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought - assertions cause nodes to die, which means our integration tests will be stopped on the spot - with an illegal state, the nodes we recover by doing another recovery and we'll never see the failure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough

"index was created after 6_0_0_rc1 but has no history uuid";
}
return uuid;
}

private SearcherManager createSearcherManager() throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
Expand Down Expand Up @@ -1312,30 +1347,8 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
}
/*
* we have to inc-ref the store here since if the engine is closed by a tragic event
* we don't acquire the write lock and wait until we have exclusive access. This might also
* dec the store reference which can essentially close the store and unless we can inc the reference
* we can't use it.
*/
store.incRef();
try {
// reread the last committed segment infos
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Exception e) {
if (isClosed.get() == false) {
try {
logger.warn("failed to read latest segment infos on flush", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
}
}
} finally {
store.decRef();
}
refreshLastCommittedSegmentInfos();

}
newCommitId = lastCommittedSegmentInfos.getId();
} catch (FlushFailedEngineException ex) {
Expand All @@ -1353,6 +1366,33 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
return new CommitId(newCommitId);
}

private void refreshLastCommittedSegmentInfos() {
/*
* we have to inc-ref the store here since if the engine is closed by a tragic event
* we don't acquire the write lock and wait until we have exclusive access. This might also
* dec the store reference which can essentially close the store and unless we can inc the reference
* we can't use it.
*/
store.incRef();
try {
// reread the last committed segment infos
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Exception e) {
if (isClosed.get() == false) {
try {
logger.warn("failed to read latest segment infos on flush", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
}
}
} finally {
store.decRef();
}
}

@Override
public void rollTranslogGeneration() throws EngineException {
try (ReleasableLock ignored = readLock.acquire()) {
Expand Down Expand Up @@ -1874,7 +1914,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(5);
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
Expand All @@ -1883,6 +1923,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
if (historyUUID != null) {
commitData.put(HISTORY_UUID_KEY, historyUUID);
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
Expand Down Expand Up @@ -1992,7 +2035,7 @@ public boolean isRecovering() {
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
Map<String, String> commitData = new HashMap<>(5);
Map<String, String> commitData = new HashMap<>(6);
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,10 @@ public Translog getTranslog() {
return getEngine().getTranslog();
}

public String getHistoryUUID() {
return getEngine().getHistoryUUID();
}

public IndexEventListener getIndexEventListener() {
return indexEventListener;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.MapperService;
Expand Down Expand Up @@ -162,10 +164,11 @@ void addIndices(
* document-level semantics.
*/
writer.setLiveCommitData(() -> {
final HashMap<String, String> liveCommitData = new HashMap<>(2);
final HashMap<String, String> liveCommitData = new HashMap<>(4);
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp));
liveCommitData.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
return liveCommitData.entrySet().iterator();
});
writer.commit();
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;

import java.io.Closeable;
import java.io.EOFException;
Expand Down Expand Up @@ -1027,6 +1028,20 @@ public Map<String, String> getCommitUserData() {
return commitUserData;
}

/**
* returns the history uuid the store points at, or null if not existant.
*/
public String getHistoryUUID() {
return commitUserData.get(Engine.HISTORY_UUID_KEY);
}

/**
* returns the translog uuid the store points at
*/
public String getTranslogUUID() {
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}

/**
* Returns true iff this metadata contains the given file.
*/
Expand Down
Loading