Skip to content

Commit

Permalink
[Zen2] Change MetaDataStateFormat write semantics (#34709)
Browse files Browse the repository at this point in the history
Currently, if MetaDataStateFormat.write throws an IOExceptions if there was some problem with persisting state to disk. If an exception is thrown, loadLatestState may read either old state or new state. This is not enough for the Zen2 algorithm. In case of failure, we need to distinguish between 2 cases: storage is left in clean state or storage is left in a dirty state.
If storage is left in the clean state, loadLatestState may read only old state. If storage is left in a dirty state, loadLatestState may read either old or new state.
If an exception occurs when writing the manifest file to disk this distinction is important for Zen2. If storage is clean, the node can continue to be a part of the cluster and may try to accept further cluster state updates (if it fails to accept cluster state updates it will be kicked off from the cluster using different mechanism). But if storage is dirty, the node should be restarted and it will be able to start up successfully only once it successfully re-writes manifest file to disk.
This commit changes MetaDataStateFormat.write signature, replacing IOException with WriteStateException, which “isDirty” method could be used to distinguish between 2 failure cases.
We need to minimise the number of failures, that leave storage in a dirty state. That’s why this PR changes the algorithm that is used to store state to disk. It has the following layout:

1. For the first state location, create and fsync tmp file with state content.
2. For each extra location, copy and fsync tmp file with state content.
2. Atomically rename tmp file in the first location.
3. For each extra location, atomically rename tmp file.
4. For each location, fsync state directory.
5. Perform cleanup of old files, ignoring exceptions.
If an exception occurs in steps 1-3, storage is clearly in the clean state. If an exception occurs in step 5, storage is clearly in dirty state. Exception in step 4 is questionable, there are 2 options:
1. Consider it as a failure. If the first disk fails, state disappears. So this is a failure and storage is in a dirty state.
2. Do not consider it as failure at all, ignore disk failures.
This commit prefers 1st approach and MetaDataTestFormatTests.testFailRandomlyAndReadAnyState tests for disk failures.
  • Loading branch information
andrershov authored Oct 24, 2018
1 parent 6d6ac74 commit 7a3cd10
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
Expand Down Expand Up @@ -91,51 +92,87 @@ private static void deleteFileIfExists(Path stateLocation, Directory directory,
logger.trace("cleaned up {}", stateLocation.resolve(fileName));
}

private void writeStateToFirstLocation(final T state, Path stateLocation, Directory stateDir, String fileName, String tmpFileName)
throws IOException {
private static void deleteFileIgnoreExceptions(Path stateLocation, Directory directory, String fileName) {
try {
deleteFileIfExists(stateLocation, directory, fileName);
} catch (IOException e) {
logger.trace("clean up failed {}", stateLocation.resolve(fileName));
}
}

private void writeStateToFirstLocation(final T state, Path stateLocation, Directory stateDir, String tmpFileName)
throws WriteStateException {
try {
deleteFileIfExists(stateLocation, stateDir, tmpFileName);
try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) {
CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION);
out.writeInt(FORMAT.index());
try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) {
@Override
public void close() throws IOException {
public void close() {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
}
})) {

builder.startObject();
{
toXContent(builder, state);
}
toXContent(builder, state);
builder.endObject();
}
CodecUtil.writeFooter(out);
}

stateDir.sync(Collections.singleton(tmpFileName));
stateDir.rename(tmpFileName, fileName);
stateDir.syncMetaData();
logger.trace("written state to {}", stateLocation.resolve(fileName));
} finally {
deleteFileIfExists(stateLocation, stateDir, tmpFileName);
} catch (Exception e) {
throw new WriteStateException(false, "failed to write state to the first location tmp file " +
stateLocation.resolve(tmpFileName), e);
}
}

private void copyStateToExtraLocation(Directory srcStateDir, Path extraStateLocation, String fileName, String tmpFileName)
throws IOException {
try (Directory extraStateDir = newDirectory(extraStateLocation)) {
private static void copyStateToExtraLocations(List<Tuple<Path, Directory>> stateDirs, String tmpFileName)
throws WriteStateException {
Directory srcStateDir = stateDirs.get(0).v2();
for (int i = 1; i < stateDirs.size(); i++) {
Tuple<Path, Directory> extraStatePathAndDir = stateDirs.get(i);
Path extraStateLocation = extraStatePathAndDir.v1();
Directory extraStateDir = extraStatePathAndDir.v2();
try {
deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName);
extraStateDir.copyFrom(srcStateDir, fileName, tmpFileName, IOContext.DEFAULT);
extraStateDir.copyFrom(srcStateDir, tmpFileName, tmpFileName, IOContext.DEFAULT);
extraStateDir.sync(Collections.singleton(tmpFileName));
extraStateDir.rename(tmpFileName, fileName);
extraStateDir.syncMetaData();
logger.trace("copied state to {}", extraStateLocation.resolve(fileName));
} finally {
deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName);
} catch (Exception e) {
throw new WriteStateException(false, "failed to copy tmp state file to extra location " + extraStateLocation, e);
}
}
}

private static void performRenames(String tmpFileName, String fileName, final List<Tuple<Path, Directory>> stateDirectories) throws
WriteStateException {
Directory firstStateDirectory = stateDirectories.get(0).v2();
try {
firstStateDirectory.rename(tmpFileName, fileName);
} catch (IOException e) {
throw new WriteStateException(false, "failed to rename tmp file to final name in the first state location " +
stateDirectories.get(0).v1().resolve(tmpFileName), e);
}

for (int i = 1; i < stateDirectories.size(); i++) {
Directory extraStateDirectory = stateDirectories.get(i).v2();
try {
extraStateDirectory.rename(tmpFileName, fileName);
} catch (IOException e) {
throw new WriteStateException(true, "failed to rename tmp file to final name in extra state location " +
stateDirectories.get(i).v1().resolve(tmpFileName), e);
}
}
}

private static void performStateDirectoriesFsync(List<Tuple<Path, Directory>> stateDirectories) throws WriteStateException {
for (int i = 0; i < stateDirectories.size(); i++) {
try {
stateDirectories.get(i).v2().syncMetaData();
} catch (IOException e) {
throw new WriteStateException(true, "meta data directory fsync has failed " + stateDirectories.get(i).v1(), e);
}
}
}
Expand All @@ -145,32 +182,52 @@ private void copyStateToExtraLocation(Directory srcStateDir, Path extraStateLoca
* state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it
* doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to
* it's target filename of the pattern {@code {prefix}{version}.st}.
* If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return it.
* But if this method throws an exception, loadLatestState could return this state or some previous state.
* If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return
* it.
*
* @param state the state object to write
* @param locations the locations where the state should be written to.
* @throws IOException if an IOException occurs
* @throws WriteStateException if some exception during writing state occurs. See also {@link WriteStateException#isDirty()}.
*/
public final void write(final T state, final Path... locations) throws IOException {

public final void write(final T state, final Path... locations) throws WriteStateException {
if (locations == null) {
throw new IllegalArgumentException("Locations must not be null");
}
if (locations.length <= 0) {
throw new IllegalArgumentException("One or more locations required");
}
final long maxStateId = findMaxStateId(prefix, locations) + 1;

long maxStateId;
try {
maxStateId = findMaxStateId(prefix, locations) + 1;
} catch (Exception e) {
throw new WriteStateException(false, "exception during looking up max state id", e);
}
assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]";

final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION;
final String tmpFileName = fileName + ".tmp";
final Path firstStateLocation = locations[0].resolve(STATE_DIR_NAME);
try (Directory stateDir = newDirectory(firstStateLocation)) {
writeStateToFirstLocation(state, firstStateLocation, stateDir, fileName, tmpFileName);
List<Tuple<Path, Directory>> directories = new ArrayList<>();

try {
for (Path location : locations) {
Path stateLocation = location.resolve(STATE_DIR_NAME);
try {
directories.add(new Tuple<>(location, newDirectory(stateLocation)));
} catch (IOException e) {
throw new WriteStateException(false, "failed to open state directory " + stateLocation, e);
}
}

for (int i = 1; i < locations.length; i++) {
final Path extraStateLocation = locations[i].resolve(STATE_DIR_NAME);
copyStateToExtraLocation(stateDir, extraStateLocation, fileName, tmpFileName);
writeStateToFirstLocation(state, directories.get(0).v1(), directories.get(0).v2(), tmpFileName);
copyStateToExtraLocations(directories, tmpFileName);
performRenames(tmpFileName, fileName, directories);
performStateDirectoriesFsync(directories);
} finally {
for (Tuple<Path, Directory> pathAndDirectory : directories) {
deleteFileIgnoreExceptions(pathAndDirectory.v1(), pathAndDirectory.v2(), tmpFileName);
IOUtils.closeWhileHandlingException(pathAndDirectory.v2());
}
}

Expand Down Expand Up @@ -227,16 +284,18 @@ protected Directory newDirectory(Path dir) throws IOException {
return new SimpleFSDirectory(dir);
}

private void cleanupOldFiles(final String currentStateFile, Path[] locations) throws IOException {
private void cleanupOldFiles(final String currentStateFile, Path[] locations) {
for (Path location : locations) {
logger.trace("cleanupOldFiles: cleaning up {}", location);
Path stateLocation = location.resolve(STATE_DIR_NAME);
try (Directory stateDir = newDirectory(stateLocation)) {
for (String file : stateDir.listAll()) {
if (file.startsWith(prefix) && file.equals(currentStateFile) == false) {
deleteFileIfExists(stateLocation, stateDir, file);
deleteFileIgnoreExceptions(stateLocation, stateDir, file);
}
}
} catch (Exception e) {
logger.trace("clean up failed for state location {}", stateLocation);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gateway;

import java.io.IOException;

/**
* This exception is thrown when there is a problem of writing state to disk.
*/
public class WriteStateException extends IOException {
private boolean dirty;

public WriteStateException(boolean dirty, String message, Exception cause) {
super(message, cause);
this.dirty = dirty;
}

/**
* If this method returns false, state is guaranteed to be not written to disk.
* If this method returns true, we don't know if state is written to disk.
*/
public boolean isDirty() {
return dirty;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ public void testFailWriteAndReadPreviousState() throws IOException {
Format.FAIL_FSYNC_TMP_FILE, Format.FAIL_RENAME_TMP_FILE);
DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean());
expectThrows(IOException.class, () -> format.write(newState, path));
WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, path));
assertFalse(ex.isDirty());

format.noFailures();
assertEquals(initialState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, path));
}
Expand All @@ -338,30 +340,34 @@ public void testFailWriteAndReadAnyState() throws IOException {
DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean());
possibleStates.add(newState);
expectThrows(IOException.class, () -> format.write(newState, path));
WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, path));
assertTrue(ex.isDirty());

format.noFailures();
assertTrue(possibleStates.contains(format.loadLatestState(logger, NamedXContentRegistry.EMPTY, path)));
}

writeAndReadStateSuccessfully(format, path);
}

public void testFailCopyStateToExtraLocation() throws IOException {
public void testFailCopyTmpFileToExtraLocation() throws IOException {
Path paths[] = new Path[randomIntBetween(2, 5)];
for (int i = 0; i < paths.length; i++) {
paths[i] = createTempDir();
}
Format format = new Format("foo-");

writeAndReadStateSuccessfully(format, paths);
DummyState initialState = writeAndReadStateSuccessfully(format, paths);

for (int i = 0; i < randomIntBetween(1, 5); i++) {
format.failOnMethods(Format.FAIL_OPEN_STATE_FILE_WHEN_COPYING);
DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean());
expectThrows(IOException.class, () -> format.write(newState, paths));
WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, paths));
assertFalse(ex.isDirty());

format.noFailures();
assertEquals(newState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths));
assertEquals(initialState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths));
}

writeAndReadStateSuccessfully(format, paths);
Expand All @@ -382,14 +388,21 @@ public void testFailRandomlyAndReadAnyState() throws IOException {
format.failRandomly();
DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(),
randomDouble(), randomBoolean());
possibleStates.add(newState);
try {
format.write(newState, paths);
} catch (Exception e) {
// since we're injecting failures at random it's ok if exception is thrown, it's also ok if exception is not thrown
possibleStates.clear();
possibleStates.add(newState);
} catch (WriteStateException e) {
if (e.isDirty()) {
possibleStates.add(newState);
}
}

format.noFailures();
assertTrue(possibleStates.contains(format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths)));
//we call loadLatestState not on full path set, but only on random paths from this set. This is to emulate disk failures.
Path[] randomPaths = randomSubsetOf(randomIntBetween(1, paths.length), paths).toArray(new Path[0]);
DummyState stateOnDisk = format.loadLatestState(logger, NamedXContentRegistry.EMPTY, randomPaths);
assertTrue(possibleStates.contains(stateOnDisk));
}

writeAndReadStateSuccessfully(format, paths);
Expand Down

0 comments on commit 7a3cd10

Please sign in to comment.