From 1ec0c736451d0ec45fec8ba52ea69b38a5091abf Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 22 Oct 2018 15:38:38 +0300 Subject: [PATCH 1/6] Fix test failure. Convert IllegalStateException to IOException --- .../java/org/elasticsearch/gateway/MetaDataStateFormat.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index fe84fb9e38c8d..787f51eb1018e 100644 --- a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -113,6 +113,8 @@ public void close() throws IOException { builder.endObject(); } CodecUtil.writeFooter(out); + } catch (IllegalStateException e) { + throw new IOException(e); } stateDir.sync(Collections.singleton(tmpFileName)); From 0cfffd5c0b0ef0242c521b41f9ec6436e8664484 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Mon, 22 Oct 2018 12:30:51 +0300 Subject: [PATCH 2/6] Change meta data write failure semantics Write should clearly report if storage is left in dirty state. --- .../elasticsearch/env/NodeEnvironment.java | 7 +- .../gateway/MetaDataStateFormat.java | 169 +++++++++++++----- .../gateway/WriteStateException.java | 37 ++++ .../elasticsearch/index/shard/IndexShard.java | 7 +- .../RemoveCorruptedShardDataCommand.java | 8 +- .../gateway/MetaDataStateFormatTests.java | 58 ++++-- .../index/shard/IndexShardTests.java | 7 +- .../RemoveCorruptedShardDataCommandTests.java | 7 +- .../index/shard/ShardPathTests.java | 25 ++- 9 files changed, 250 insertions(+), 75 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/gateway/WriteStateException.java diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index ff8baaabb443c..69ba50cd60291 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -51,6 +51,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.gateway.MetaDataStateFormat; +import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; @@ -390,7 +391,11 @@ private static NodeMetaData loadOrCreateNodeMetaData(Settings settings, Logger l metaData = new NodeMetaData(generateNodeId(settings)); } // we write again to make sure all paths have the latest state file - NodeMetaData.FORMAT.write(metaData, paths); + try { + NodeMetaData.FORMAT.write(metaData, paths); + } catch (WriteStateException e) { + throw new IOException(e); + } return metaData; } diff --git a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index 787f51eb1018e..c65eaa7d3eca6 100644 --- a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -30,6 +30,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.SimpleFSDirectory; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; @@ -91,88 +92,162 @@ private static void deleteFileIfExists(Path stateLocation, Directory directory, logger.trace("cleaned up {}", stateLocation.resolve(fileName)); } - private void writeStateToFirstLocation(final T state, Path stateLocation, Directory stateDir, String fileName, String tmpFileName) - throws IOException { + private static void deleteFileIgnoreExceptions(Path stateLocation, Directory directory, String fileName) { try { - deleteFileIfExists(stateLocation, stateDir, tmpFileName); - try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) { - CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); - out.writeInt(FORMAT.index()); - try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { - @Override - public void close() throws IOException { - // this is important since some of the XContentBuilders write bytes on close. - // in order to write the footer we need to prevent closing the actual index input. - } - })) { + deleteFileIfExists(stateLocation, directory, fileName); + } catch (IOException e) { + logger.trace("clean up failed {}", stateLocation.resolve(fileName)); + } + } + + private static void performDirectoryCleanup(Path stateLocation, Directory stateDir, String tmpFileName) { + deleteFileIgnoreExceptions(stateLocation, stateDir, tmpFileName); + IOUtils.closeWhileHandlingException(stateDir); + } - builder.startObject(); - { - toXContent(builder, state); + private Directory writeStateToFirstLocation(final T state, Path stateLocation, String tmpFileName) + throws WriteStateException { + try { + Directory stateDir = newDirectory(stateLocation); + try { + deleteFileIfExists(stateLocation, stateDir, tmpFileName); + try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) { + CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); + out.writeInt(FORMAT.index()); + try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { + @Override + public void close() throws IOException { + // this is important since some of the XContentBuilders write bytes on close. + // in order to write the footer we need to prevent closing the actual index input. + } + })) { + + builder.startObject(); + { + toXContent(builder, state); + } + builder.endObject(); } - builder.endObject(); + CodecUtil.writeFooter(out); } - CodecUtil.writeFooter(out); - } catch (IllegalStateException e) { - throw new IOException(e); - } - stateDir.sync(Collections.singleton(tmpFileName)); - stateDir.rename(tmpFileName, fileName); - stateDir.syncMetaData(); - logger.trace("written state to {}", stateLocation.resolve(fileName)); - } finally { - deleteFileIfExists(stateLocation, stateDir, tmpFileName); + stateDir.sync(Collections.singleton(tmpFileName)); + } catch (Exception e) { + // perform clean up only in case of exception, we need to keep directory open and temporary file on disk + // if everything is ok for the next algorithm steps + performDirectoryCleanup(stateLocation, stateDir, tmpFileName); + throw e; + } + return stateDir; + } catch (Exception e) { + throw new WriteStateException(false, "failed to write state to the first location tmp file", e); } } - private void copyStateToExtraLocation(Directory srcStateDir, Path extraStateLocation, String fileName, String tmpFileName) - throws IOException { - try (Directory extraStateDir = newDirectory(extraStateLocation)) { + private Directory copyStateToExtraLocation(Directory srcStateDir, Path extraStateLocation, String tmpFileName) + throws WriteStateException { + try { + Directory extraStateDir = newDirectory(extraStateLocation); try { deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); - extraStateDir.copyFrom(srcStateDir, fileName, tmpFileName, IOContext.DEFAULT); + extraStateDir.copyFrom(srcStateDir, tmpFileName, tmpFileName, IOContext.DEFAULT); extraStateDir.sync(Collections.singleton(tmpFileName)); - extraStateDir.rename(tmpFileName, fileName); - extraStateDir.syncMetaData(); - logger.trace("copied state to {}", extraStateLocation.resolve(fileName)); - } finally { - deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); + } catch (Exception e) { + // perform clean up only in case of exception, we need to keep directory open and temporary file on disk + // if everything is ok for the next algorithm steps + performDirectoryCleanup(extraStateLocation, extraStateDir, tmpFileName); + throw e; } + return extraStateDir; + } catch (Exception e) { + throw new WriteStateException(false, "failed to copy tmp state file to extra location", e); } } + public void performRenames(String tmpFileName, String fileName, final List> stateDirectories) throws + WriteStateException { + Directory firstStateDirectory = stateDirectories.get(0).v2(); + try { + firstStateDirectory.rename(tmpFileName, fileName); + } catch (IOException e) { + throw new WriteStateException(false, "failed to rename tmp file to final name in the first state location", e); + } + + for (int i = 1; i < stateDirectories.size(); i++) { + Directory extraStateDirectory = stateDirectories.get(i).v2(); + try { + extraStateDirectory.rename(tmpFileName, fileName); + } catch (IOException e) { + throw new WriteStateException(true, "failed to rename tmp file to final name in extra state location", + e); + } + } + } + + public void performStateDirectoriesFsync(List> stateDirectories) throws WriteStateException { + for (int i = 0; i < stateDirectories.size(); i++) { + try { + stateDirectories.get(i).v2().syncMetaData(); + } catch (IOException e) { + throw new WriteStateException(true, "meta data directory fsync has failed", e); + } + } + } + + /** * Writes the given state to the given directories. The state is written to a * state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it * doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to * it's target filename of the pattern {@code {prefix}{version}.st}. - * If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return it. - * But if this method throws an exception, loadLatestState could return this state or some previous state. + * If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return + * it.
+ * This method may throw an {@link WriteStateException} if some exception during writing state occurs.
+ * If {@link WriteStateException#isDirty()} returns false, there is a guarantee that loadLatestState will return old state.
+ * If {@link WriteStateException#isDirty()} returns true, loadLatestState could return new state or previous state. * * @param state the state object to write * @param locations the locations where the state should be written to. - * @throws IOException if an IOException occurs + * @throws WriteStateException if some exception during writing state occurs. */ - public final void write(final T state, final Path... locations) throws IOException { + + public final void write(final T state, final Path... locations) throws WriteStateException { if (locations == null) { throw new IllegalArgumentException("Locations must not be null"); } if (locations.length <= 0) { throw new IllegalArgumentException("One or more locations required"); } - final long maxStateId = findMaxStateId(prefix, locations) + 1; + + long maxStateId; + try { + maxStateId = findMaxStateId(prefix, locations) + 1; + } catch (Exception e) { + throw new WriteStateException(false, "exception during looking up max state id", e); + } assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]"; final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION; final String tmpFileName = fileName + ".tmp"; final Path firstStateLocation = locations[0].resolve(STATE_DIR_NAME); - try (Directory stateDir = newDirectory(firstStateLocation)) { - writeStateToFirstLocation(state, firstStateLocation, stateDir, fileName, tmpFileName); + List> directories = new ArrayList<>(); + try { + Directory firstStateDir = writeStateToFirstLocation(state, firstStateLocation, tmpFileName); + directories.add(new Tuple<>(firstStateLocation, firstStateDir)); for (int i = 1; i < locations.length; i++) { final Path extraStateLocation = locations[i].resolve(STATE_DIR_NAME); - copyStateToExtraLocation(stateDir, extraStateLocation, fileName, tmpFileName); + Directory extraStateDir = copyStateToExtraLocation(firstStateDir, extraStateLocation, tmpFileName); + directories.add(new Tuple<>(extraStateLocation, extraStateDir)); + } + performRenames(tmpFileName, fileName, directories); + performStateDirectoriesFsync(directories); + } finally { + //writeStateToFirstLocation and copyStateToExtraLocation perform clean up for themselves if they fail + //we need to perform clean up for all data paths that were successfully opened and temporary file was created + for (int i = 0; i < directories.size(); i++) { + Tuple pathAndDirectory = directories.get(i); + performDirectoryCleanup(pathAndDirectory.v1(), pathAndDirectory.v2(), tmpFileName); } } @@ -229,16 +304,18 @@ protected Directory newDirectory(Path dir) throws IOException { return new SimpleFSDirectory(dir); } - private void cleanupOldFiles(final String currentStateFile, Path[] locations) throws IOException { + private void cleanupOldFiles(final String currentStateFile, Path[] locations) { for (Path location : locations) { logger.trace("cleanupOldFiles: cleaning up {}", location); Path stateLocation = location.resolve(STATE_DIR_NAME); try (Directory stateDir = newDirectory(stateLocation)) { for (String file : stateDir.listAll()) { if (file.startsWith(prefix) && file.equals(currentStateFile) == false) { - deleteFileIfExists(stateLocation, stateDir, file); + deleteFileIgnoreExceptions(stateLocation, stateDir, file); } } + } catch (Exception e) { + logger.trace("clean up failed for state location {}", stateLocation); } } } diff --git a/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java b/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java new file mode 100644 index 0000000000000..f318949863f73 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.gateway; + +/** + * This exception is thrown when there is a problem of writing state to disk.
+ * If {@link #isDirty()} returns false, state is guaranteed to be not written to disk. + * If {@link #isDirty()} returns true, we don't know if state is written to disk. + */ +public class WriteStateException extends Exception { + private boolean dirty; + + public WriteStateException(boolean dirty, String message, Exception cause) { + super(message, cause); + this.dirty = dirty; + } + + public boolean isDirty() { + return dirty; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 17756630517d2..99b3741321378 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -64,6 +64,7 @@ import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexNotFoundException; @@ -2243,7 +2244,11 @@ private static void persistMetadata( logger.trace("{} writing shard state, reason [{}]", shardId, writeReason); final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), indexSettings.getUUID(), newRouting.allocationId()); - ShardStateMetaData.FORMAT.write(newShardStateMetadata, shardPath.getShardStatePath()); + try { + ShardStateMetaData.FORMAT.write(newShardStateMetadata, shardPath.getShardStatePath()); + } catch (WriteStateException e) { + throw new IOException(e); + } } else { logger.trace("{} skip writing shard state, has been written before", shardId); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java index 54c1dd7c1db69..432cc35dbdaa0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java +++ b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java @@ -53,6 +53,7 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeMetaData; import org.elasticsearch.gateway.MetaDataStateFormat; +import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; @@ -460,8 +461,11 @@ protected void newAllocationId(Environment environment, ShardPath shardPath, Ter final ShardStateMetaData newShardStateMetaData = new ShardStateMetaData(shardStateMetaData.primary, shardStateMetaData.indexUUID, newAllocationId); - ShardStateMetaData.FORMAT.write(newShardStateMetaData, shardStatePath); - + try { + ShardStateMetaData.FORMAT.write(newShardStateMetaData, shardStatePath); + } catch (WriteStateException e) { + throw new IOException(e); + } terminal.println(""); terminal.println("You should run the following command to allocate this shard:"); diff --git a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java index 4acb5f428a72c..d07adb3193aa6 100644 --- a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java @@ -95,7 +95,7 @@ public MetaData fromXContent(XContentParser parser) throws IOException { // indices are empty since they are serialized separately } - public void testReadWriteState() throws IOException { + public void testReadWriteState() throws IOException, WriteStateException { Path[] dirs = new Path[randomIntBetween(1, 5)]; for (int i = 0; i < dirs.length; i++) { dirs[i] = createTempDir(); @@ -103,6 +103,7 @@ public void testReadWriteState() throws IOException { final long id = addDummyFiles("foo-", dirs); Format format = new Format("foo-"); DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); + int version = between(0, Integer.MAX_VALUE/2); format.write(state, dirs); for (Path file : dirs) { Path[] list = content("*", file); @@ -116,6 +117,7 @@ public void testReadWriteState() throws IOException { DummyState read = format.read(NamedXContentRegistry.EMPTY, list[0]); assertThat(read, equalTo(state)); } + final int version2 = between(version, Integer.MAX_VALUE); DummyState state2 = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); format.write(state2, dirs); @@ -134,7 +136,7 @@ public void testReadWriteState() throws IOException { } } - public void testVersionMismatch() throws IOException { + public void testVersionMismatch() throws IOException, WriteStateException { Path[] dirs = new Path[randomIntBetween(1, 5)]; for (int i = 0; i < dirs.length; i++) { dirs[i] = createTempDir(); @@ -143,6 +145,7 @@ public void testVersionMismatch() throws IOException { Format format = new Format("foo-"); DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); + int version = between(0, Integer.MAX_VALUE/2); format.write(state, dirs); for (Path file : dirs) { Path[] list = content("*", file); @@ -158,7 +161,7 @@ public void testVersionMismatch() throws IOException { } } - public void testCorruption() throws IOException { + public void testCorruption() throws IOException, WriteStateException { Path[] dirs = new Path[randomIntBetween(1, 5)]; for (int i = 0; i < dirs.length; i++) { dirs[i] = createTempDir(); @@ -166,6 +169,7 @@ public void testCorruption() throws IOException { final long id = addDummyFiles("foo-", dirs); Format format = new Format("foo-"); DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); + int version = between(0, Integer.MAX_VALUE/2); format.write(state, dirs); for (Path file : dirs) { Path[] list = content("*", file); @@ -229,7 +233,7 @@ public static void corruptFile(Path file, Logger logger) throws IOException { } } - public void testLoadState() throws IOException { + public void testLoadState() throws IOException, WriteStateException { final Path[] dirs = new Path[randomIntBetween(1, 5)]; int numStates = randomIntBetween(1, 5); List meta = new ArrayList<>(); @@ -264,7 +268,6 @@ public void testLoadState() throws IOException { assertThat(deserialized, notNullValue()); assertThat(deserialized.getVersion(), equalTo(original.getVersion())); assertThat(deserialized.getMappingVersion(), equalTo(original.getMappingVersion())); - assertThat(deserialized.getSettingsVersion(), equalTo(original.getSettingsVersion())); assertThat(deserialized.getNumberOfReplicas(), equalTo(original.getNumberOfReplicas())); assertThat(deserialized.getNumberOfShards(), equalTo(original.getNumberOfShards())); } @@ -288,7 +291,7 @@ public void testLoadState() throws IOException { } } - private DummyState writeAndReadStateSuccessfully(Format format, Path... paths) throws IOException { + private DummyState writeAndReadStateSuccessfully(Format format, Path... paths) throws IOException, WriteStateException { format.noFailures(); DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); @@ -306,7 +309,7 @@ private static void ensureOnlyOneStateFile(Path[] paths) throws IOException { } } - public void testFailWriteAndReadPreviousState() throws IOException { + public void testFailWriteAndReadPreviousState() throws IOException, WriteStateException { Path path = createTempDir(); Format format = new Format("foo-"); @@ -317,7 +320,9 @@ public void testFailWriteAndReadPreviousState() throws IOException { Format.FAIL_FSYNC_TMP_FILE, Format.FAIL_RENAME_TMP_FILE); DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); - expectThrows(IOException.class, () -> format.write(newState, path)); + WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, path)); + assertFalse(ex.isDirty()); + format.noFailures(); assertEquals(initialState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, path)); } @@ -325,7 +330,7 @@ public void testFailWriteAndReadPreviousState() throws IOException { writeAndReadStateSuccessfully(format, path); } - public void testFailWriteAndReadAnyState() throws IOException { + public void testFailWriteAndReadAnyState() throws IOException, WriteStateException { Path path = createTempDir(); Format format = new Format("foo-"); Set possibleStates = new HashSet<>(); @@ -338,7 +343,9 @@ public void testFailWriteAndReadAnyState() throws IOException { DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); possibleStates.add(newState); - expectThrows(IOException.class, () -> format.write(newState, path)); + WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, path)); + assertTrue(ex.isDirty()); + format.noFailures(); assertTrue(possibleStates.contains(format.loadLatestState(logger, NamedXContentRegistry.EMPTY, path))); } @@ -346,28 +353,31 @@ public void testFailWriteAndReadAnyState() throws IOException { writeAndReadStateSuccessfully(format, path); } - public void testFailCopyStateToExtraLocation() throws IOException { + public void testFailCopyTmpFileToExtraLocation() throws IOException, WriteStateException { Path paths[] = new Path[randomIntBetween(2, 5)]; for (int i = 0; i < paths.length; i++) { paths[i] = createTempDir(); } Format format = new Format("foo-"); - writeAndReadStateSuccessfully(format, paths); + DummyState initialState = writeAndReadStateSuccessfully(format, paths); for (int i = 0; i < randomIntBetween(1, 5); i++) { format.failOnMethods(Format.FAIL_OPEN_STATE_FILE_WHEN_COPYING); DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); - expectThrows(IOException.class, () -> format.write(newState, paths)); + WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, paths)); + assertFalse(ex.isDirty()); + format.noFailures(); - assertEquals(newState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths)); + assertEquals(initialState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths)); } writeAndReadStateSuccessfully(format, paths); } - public void testFailRandomlyAndReadAnyState() throws IOException { + + public void testFailRandomlyAndReadAnyState() throws IOException, WriteStateException { Path paths[] = new Path[randomIntBetween(1, 5)]; for (int i = 0; i < paths.length; i++) { paths[i] = createTempDir(); @@ -382,14 +392,24 @@ public void testFailRandomlyAndReadAnyState() throws IOException { format.failRandomly(); DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); - possibleStates.add(newState); try { format.write(newState, paths); - } catch (Exception e) { - // since we're injecting failures at random it's ok if exception is thrown, it's also ok if exception is not thrown + //if write is successful, the only possible state we can read is the one that was just written + possibleStates.clear(); + possibleStates.add(newState); + } catch (WriteStateException e) { + //if dirty flag is not set, read might return only old state + if (e.isDirty()) { + //if dirty flag is set, read might return old state or new state + possibleStates.add(newState); + } } + format.noFailures(); - assertTrue(possibleStates.contains(format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths))); + //we call loadLatestState not on full path set, but only on random paths from this set. This is to emulate disk failures. + Path[] randomPaths = randomSubsetOf(randomIntBetween(1, paths.length), paths).toArray(new Path[0]); + DummyState stateOnDisk = format.loadLatestState(logger, NamedXContentRegistry.EMPTY, randomPaths); + assertTrue(possibleStates.contains(stateOnDisk)); } writeAndReadStateSuccessfully(format, paths); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 487ac7e0694ef..aef8326e0d0a0 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -76,6 +76,7 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; @@ -194,7 +195,11 @@ public static ShardStateMetaData load(Logger logger, Path... shardPaths) throws public static void write(ShardStateMetaData shardStateMetaData, Path... shardPaths) throws IOException { - ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths); + try { + ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths); + } catch (WriteStateException e) { + throw new IOException(e); + } } public static Engine getEngineFromShard(IndexShard shard) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java index 6e34bb03860c5..980b2f92ead7c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.engine.EngineException; @@ -401,7 +402,11 @@ private void writeIndexState() throws IOException { // create _state of IndexMetaData try(NodeEnvironment nodeEnvironment = new NodeEnvironment(environment.settings(), environment)) { final Path[] paths = nodeEnvironment.indexPaths(indexMetaData.getIndex()); - IndexMetaData.FORMAT.write(indexMetaData, paths); + try { + IndexMetaData.FORMAT.write(indexMetaData, paths); + } catch (WriteStateException e) { + throw new IOException(e); + } logger.info("--> index metadata persisted to {} ", Arrays.toString(paths)); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java index fda2f8ef7d039..fc63a90178445 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -43,7 +44,11 @@ public void testLoadShardPath() throws IOException { ShardId shardId = new ShardId("foo", "0xDEADBEEF", 0); Path[] paths = env.availableShardPaths(shardId); Path path = randomFrom(paths); - ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); + try { + ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); + } catch (WriteStateException e) { + throw new IOException(e); + } ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings)); assertEquals(path, shardPath.getDataPath()); assertEquals("0xDEADBEEF", shardPath.getShardId().getIndex().getUUID()); @@ -62,7 +67,11 @@ public void testFailLoadShardPathOnMultiState() throws IOException { ShardId shardId = new ShardId("foo", indexUUID, 0); Path[] paths = env.availableShardPaths(shardId); assumeTrue("This test tests multi data.path but we only got one", paths.length > 1); - ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), paths); + try { + ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), paths); + } catch (WriteStateException e) { + throw new IOException(e); + } Exception e = expectThrows(IllegalStateException.class, () -> ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings))); assertThat(e.getMessage(), containsString("more than one shard state found")); @@ -77,7 +86,11 @@ public void testFailLoadShardPathIndexUUIDMissmatch() throws IOException { ShardId shardId = new ShardId("foo", "foobar", 0); Path[] paths = env.availableShardPaths(shardId); Path path = randomFrom(paths); - ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); + try { + ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); + } catch (WriteStateException e) { + throw new IOException(e); + } Exception e = expectThrows(IllegalStateException.class, () -> ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings))); assertThat(e.getMessage(), containsString("expected: foobar on shard path")); @@ -124,7 +137,11 @@ public void testGetRootPaths() throws IOException { ShardId shardId = new ShardId("foo", indexUUID, 0); Path[] paths = env.availableShardPaths(shardId); Path path = randomFrom(paths); - ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), path); + try { + ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), path); + } catch (WriteStateException e) { + throw new IOException(e); + } ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), indexSettings, nodeSettings)); boolean found = false; From b0ac9aa1a84dab46efcd51fbca0723faee794cdd Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Tue, 23 Oct 2018 12:52:25 +0300 Subject: [PATCH 3/6] WriteStateException extends IOException --- .../elasticsearch/env/NodeEnvironment.java | 7 +----- .../gateway/WriteStateException.java | 4 ++- .../elasticsearch/index/shard/IndexShard.java | 7 +----- .../RemoveCorruptedShardDataCommand.java | 8 ++---- .../gateway/MetaDataStateFormatTests.java | 16 ++++++------ .../index/shard/IndexShardTests.java | 7 +----- .../RemoveCorruptedShardDataCommandTests.java | 7 +----- .../index/shard/ShardPathTests.java | 25 +++---------------- 8 files changed, 21 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 69ba50cd60291..ff8baaabb443c 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -51,7 +51,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.gateway.MetaDataStateFormat; -import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; @@ -391,11 +390,7 @@ private static NodeMetaData loadOrCreateNodeMetaData(Settings settings, Logger l metaData = new NodeMetaData(generateNodeId(settings)); } // we write again to make sure all paths have the latest state file - try { - NodeMetaData.FORMAT.write(metaData, paths); - } catch (WriteStateException e) { - throw new IOException(e); - } + NodeMetaData.FORMAT.write(metaData, paths); return metaData; } diff --git a/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java b/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java index f318949863f73..832cdcebfb457 100644 --- a/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java +++ b/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java @@ -18,12 +18,14 @@ */ package org.elasticsearch.gateway; +import java.io.IOException; + /** * This exception is thrown when there is a problem of writing state to disk.
* If {@link #isDirty()} returns false, state is guaranteed to be not written to disk. * If {@link #isDirty()} returns true, we don't know if state is written to disk. */ -public class WriteStateException extends Exception { +public class WriteStateException extends IOException { private boolean dirty; public WriteStateException(boolean dirty, String message, Exception cause) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 99b3741321378..17756630517d2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -64,7 +64,6 @@ import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexNotFoundException; @@ -2244,11 +2243,7 @@ private static void persistMetadata( logger.trace("{} writing shard state, reason [{}]", shardId, writeReason); final ShardStateMetaData newShardStateMetadata = new ShardStateMetaData(newRouting.primary(), indexSettings.getUUID(), newRouting.allocationId()); - try { - ShardStateMetaData.FORMAT.write(newShardStateMetadata, shardPath.getShardStatePath()); - } catch (WriteStateException e) { - throw new IOException(e); - } + ShardStateMetaData.FORMAT.write(newShardStateMetadata, shardPath.getShardStatePath()); } else { logger.trace("{} skip writing shard state, has been written before", shardId); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java index 432cc35dbdaa0..54c1dd7c1db69 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java +++ b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java @@ -53,7 +53,6 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeMetaData; import org.elasticsearch.gateway.MetaDataStateFormat; -import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; @@ -461,11 +460,8 @@ protected void newAllocationId(Environment environment, ShardPath shardPath, Ter final ShardStateMetaData newShardStateMetaData = new ShardStateMetaData(shardStateMetaData.primary, shardStateMetaData.indexUUID, newAllocationId); - try { - ShardStateMetaData.FORMAT.write(newShardStateMetaData, shardStatePath); - } catch (WriteStateException e) { - throw new IOException(e); - } + ShardStateMetaData.FORMAT.write(newShardStateMetaData, shardStatePath); + terminal.println(""); terminal.println("You should run the following command to allocate this shard:"); diff --git a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java index d07adb3193aa6..d91e139121cc5 100644 --- a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java @@ -95,7 +95,7 @@ public MetaData fromXContent(XContentParser parser) throws IOException { // indices are empty since they are serialized separately } - public void testReadWriteState() throws IOException, WriteStateException { + public void testReadWriteState() throws IOException { Path[] dirs = new Path[randomIntBetween(1, 5)]; for (int i = 0; i < dirs.length; i++) { dirs[i] = createTempDir(); @@ -136,7 +136,7 @@ public void testReadWriteState() throws IOException, WriteStateException { } } - public void testVersionMismatch() throws IOException, WriteStateException { + public void testVersionMismatch() throws IOException { Path[] dirs = new Path[randomIntBetween(1, 5)]; for (int i = 0; i < dirs.length; i++) { dirs[i] = createTempDir(); @@ -161,7 +161,7 @@ public void testVersionMismatch() throws IOException, WriteStateException { } } - public void testCorruption() throws IOException, WriteStateException { + public void testCorruption() throws IOException { Path[] dirs = new Path[randomIntBetween(1, 5)]; for (int i = 0; i < dirs.length; i++) { dirs[i] = createTempDir(); @@ -233,7 +233,7 @@ public static void corruptFile(Path file, Logger logger) throws IOException { } } - public void testLoadState() throws IOException, WriteStateException { + public void testLoadState() throws IOException { final Path[] dirs = new Path[randomIntBetween(1, 5)]; int numStates = randomIntBetween(1, 5); List meta = new ArrayList<>(); @@ -291,7 +291,7 @@ public void testLoadState() throws IOException, WriteStateException { } } - private DummyState writeAndReadStateSuccessfully(Format format, Path... paths) throws IOException, WriteStateException { + private DummyState writeAndReadStateSuccessfully(Format format, Path... paths) throws IOException { format.noFailures(); DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); @@ -309,7 +309,7 @@ private static void ensureOnlyOneStateFile(Path[] paths) throws IOException { } } - public void testFailWriteAndReadPreviousState() throws IOException, WriteStateException { + public void testFailWriteAndReadPreviousState() throws IOException { Path path = createTempDir(); Format format = new Format("foo-"); @@ -330,7 +330,7 @@ public void testFailWriteAndReadPreviousState() throws IOException, WriteStateEx writeAndReadStateSuccessfully(format, path); } - public void testFailWriteAndReadAnyState() throws IOException, WriteStateException { + public void testFailWriteAndReadAnyState() throws IOException { Path path = createTempDir(); Format format = new Format("foo-"); Set possibleStates = new HashSet<>(); @@ -377,7 +377,7 @@ public void testFailCopyTmpFileToExtraLocation() throws IOException, WriteStateE } - public void testFailRandomlyAndReadAnyState() throws IOException, WriteStateException { + public void testFailRandomlyAndReadAnyState() throws IOException { Path paths[] = new Path[randomIntBetween(1, 5)]; for (int i = 0; i < paths.length; i++) { paths[i] = createTempDir(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index aef8326e0d0a0..487ac7e0694ef 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -76,7 +76,6 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; @@ -195,11 +194,7 @@ public static ShardStateMetaData load(Logger logger, Path... shardPaths) throws public static void write(ShardStateMetaData shardStateMetaData, Path... shardPaths) throws IOException { - try { - ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths); - } catch (WriteStateException e) { - throw new IOException(e); - } + ShardStateMetaData.FORMAT.write(shardStateMetaData, shardPaths); } public static Engine getEngineFromShard(IndexShard shard) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java index 980b2f92ead7c..6e34bb03860c5 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.TestEnvironment; -import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.engine.EngineException; @@ -402,11 +401,7 @@ private void writeIndexState() throws IOException { // create _state of IndexMetaData try(NodeEnvironment nodeEnvironment = new NodeEnvironment(environment.settings(), environment)) { final Path[] paths = nodeEnvironment.indexPaths(indexMetaData.getIndex()); - try { - IndexMetaData.FORMAT.write(indexMetaData, paths); - } catch (WriteStateException e) { - throw new IOException(e); - } + IndexMetaData.FORMAT.write(indexMetaData, paths); logger.info("--> index metadata persisted to {} ", Arrays.toString(paths)); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java index fc63a90178445..fda2f8ef7d039 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -44,11 +43,7 @@ public void testLoadShardPath() throws IOException { ShardId shardId = new ShardId("foo", "0xDEADBEEF", 0); Path[] paths = env.availableShardPaths(shardId); Path path = randomFrom(paths); - try { - ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); - } catch (WriteStateException e) { - throw new IOException(e); - } + ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings)); assertEquals(path, shardPath.getDataPath()); assertEquals("0xDEADBEEF", shardPath.getShardId().getIndex().getUUID()); @@ -67,11 +62,7 @@ public void testFailLoadShardPathOnMultiState() throws IOException { ShardId shardId = new ShardId("foo", indexUUID, 0); Path[] paths = env.availableShardPaths(shardId); assumeTrue("This test tests multi data.path but we only got one", paths.length > 1); - try { - ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), paths); - } catch (WriteStateException e) { - throw new IOException(e); - } + ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), paths); Exception e = expectThrows(IllegalStateException.class, () -> ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings))); assertThat(e.getMessage(), containsString("more than one shard state found")); @@ -86,11 +77,7 @@ public void testFailLoadShardPathIndexUUIDMissmatch() throws IOException { ShardId shardId = new ShardId("foo", "foobar", 0); Path[] paths = env.availableShardPaths(shardId); Path path = randomFrom(paths); - try { - ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); - } catch (WriteStateException e) { - throw new IOException(e); - } + ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); Exception e = expectThrows(IllegalStateException.class, () -> ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings))); assertThat(e.getMessage(), containsString("expected: foobar on shard path")); @@ -137,11 +124,7 @@ public void testGetRootPaths() throws IOException { ShardId shardId = new ShardId("foo", indexUUID, 0); Path[] paths = env.availableShardPaths(shardId); Path path = randomFrom(paths); - try { - ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), path); - } catch (WriteStateException e) { - throw new IOException(e); - } + ShardStateMetaData.FORMAT.write(new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), path); ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), indexSettings, nodeSettings)); boolean found = false; From 5f70ad80242828b164268682bf96af98acd3f139 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Tue, 23 Oct 2018 15:14:16 +0300 Subject: [PATCH 4/6] Fix David's code review comments --- .../gateway/MetaDataStateFormat.java | 31 +++++++++---------- .../gateway/WriteStateException.java | 8 +++-- .../gateway/MetaDataStateFormatTests.java | 11 ++----- 3 files changed, 21 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index c65eaa7d3eca6..3e157dcfe2dd7 100644 --- a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -116,16 +116,14 @@ private Directory writeStateToFirstLocation(final T state, Path stateLocation, S out.writeInt(FORMAT.index()); try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { @Override - public void close() throws IOException { + public void close() { // this is important since some of the XContentBuilders write bytes on close. // in order to write the footer we need to prevent closing the actual index input. } })) { builder.startObject(); - { - toXContent(builder, state); - } + toXContent(builder, state); builder.endObject(); } CodecUtil.writeFooter(out); @@ -140,7 +138,8 @@ public void close() throws IOException { } return stateDir; } catch (Exception e) { - throw new WriteStateException(false, "failed to write state to the first location tmp file", e); + throw new WriteStateException(false, "failed to write state to the first location tmp file " + + stateLocation.resolve(tmpFileName), e); } } @@ -160,17 +159,18 @@ private Directory copyStateToExtraLocation(Directory srcStateDir, Path extraStat } return extraStateDir; } catch (Exception e) { - throw new WriteStateException(false, "failed to copy tmp state file to extra location", e); + throw new WriteStateException(false, "failed to copy tmp state file to extra location " + extraStateLocation, e); } } - public void performRenames(String tmpFileName, String fileName, final List> stateDirectories) throws + private static void performRenames(String tmpFileName, String fileName, final List> stateDirectories) throws WriteStateException { Directory firstStateDirectory = stateDirectories.get(0).v2(); try { firstStateDirectory.rename(tmpFileName, fileName); } catch (IOException e) { - throw new WriteStateException(false, "failed to rename tmp file to final name in the first state location", e); + throw new WriteStateException(false, "failed to rename tmp file to final name in the first state location " + + stateDirectories.get(0).v1().resolve(tmpFileName), e); } for (int i = 1; i < stateDirectories.size(); i++) { @@ -178,18 +178,18 @@ public void performRenames(String tmpFileName, String fileName, final List> stateDirectories) throws WriteStateException { + private static void performStateDirectoriesFsync(List> stateDirectories) throws WriteStateException { for (int i = 0; i < stateDirectories.size(); i++) { try { stateDirectories.get(i).v2().syncMetaData(); } catch (IOException e) { - throw new WriteStateException(true, "meta data directory fsync has failed", e); + throw new WriteStateException(true, "meta data directory fsync has failed " + stateDirectories.get(i).v1(), e); } } } @@ -201,14 +201,11 @@ public void performStateDirectoriesFsync(List> stateDirec * doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to * it's target filename of the pattern {@code {prefix}{version}.st}. * If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return - * it.
- * This method may throw an {@link WriteStateException} if some exception during writing state occurs.
- * If {@link WriteStateException#isDirty()} returns false, there is a guarantee that loadLatestState will return old state.
- * If {@link WriteStateException#isDirty()} returns true, loadLatestState could return new state or previous state. + * it. * * @param state the state object to write * @param locations the locations where the state should be written to. - * @throws WriteStateException if some exception during writing state occurs. + * @throws WriteStateException if some exception during writing state occurs. See also {@link WriteStateException#isDirty()}. */ public final void write(final T state, final Path... locations) throws WriteStateException { diff --git a/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java b/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java index 832cdcebfb457..3283f01b9def7 100644 --- a/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java +++ b/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java @@ -21,9 +21,7 @@ import java.io.IOException; /** - * This exception is thrown when there is a problem of writing state to disk.
- * If {@link #isDirty()} returns false, state is guaranteed to be not written to disk. - * If {@link #isDirty()} returns true, we don't know if state is written to disk. + * This exception is thrown when there is a problem of writing state to disk. */ public class WriteStateException extends IOException { private boolean dirty; @@ -33,6 +31,10 @@ public WriteStateException(boolean dirty, String message, Exception cause) { this.dirty = dirty; } + /** + * If this method returns false, state is guaranteed to be not written to disk. + * If this method returns true, we don't know if state is written to disk. + */ public boolean isDirty() { return dirty; } diff --git a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java index d91e139121cc5..d8433963e5226 100644 --- a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java @@ -103,7 +103,6 @@ public void testReadWriteState() throws IOException { final long id = addDummyFiles("foo-", dirs); Format format = new Format("foo-"); DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); - int version = between(0, Integer.MAX_VALUE/2); format.write(state, dirs); for (Path file : dirs) { Path[] list = content("*", file); @@ -117,7 +116,6 @@ public void testReadWriteState() throws IOException { DummyState read = format.read(NamedXContentRegistry.EMPTY, list[0]); assertThat(read, equalTo(state)); } - final int version2 = between(version, Integer.MAX_VALUE); DummyState state2 = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); format.write(state2, dirs); @@ -145,7 +143,6 @@ public void testVersionMismatch() throws IOException { Format format = new Format("foo-"); DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); - int version = between(0, Integer.MAX_VALUE/2); format.write(state, dirs); for (Path file : dirs) { Path[] list = content("*", file); @@ -169,7 +166,6 @@ public void testCorruption() throws IOException { final long id = addDummyFiles("foo-", dirs); Format format = new Format("foo-"); DummyState state = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 1000), randomInt(), randomLong(), randomDouble(), randomBoolean()); - int version = between(0, Integer.MAX_VALUE/2); format.write(state, dirs); for (Path file : dirs) { Path[] list = content("*", file); @@ -268,6 +264,7 @@ public void testLoadState() throws IOException { assertThat(deserialized, notNullValue()); assertThat(deserialized.getVersion(), equalTo(original.getVersion())); assertThat(deserialized.getMappingVersion(), equalTo(original.getMappingVersion())); + assertThat(deserialized.getSettingsVersion(), equalTo(original.getSettingsVersion())); assertThat(deserialized.getNumberOfReplicas(), equalTo(original.getNumberOfReplicas())); assertThat(deserialized.getNumberOfShards(), equalTo(original.getNumberOfShards())); } @@ -353,7 +350,7 @@ public void testFailWriteAndReadAnyState() throws IOException { writeAndReadStateSuccessfully(format, path); } - public void testFailCopyTmpFileToExtraLocation() throws IOException, WriteStateException { + public void testFailCopyTmpFileToExtraLocation() throws IOException { Path paths[] = new Path[randomIntBetween(2, 5)]; for (int i = 0; i < paths.length; i++) { paths[i] = createTempDir(); @@ -376,7 +373,6 @@ public void testFailCopyTmpFileToExtraLocation() throws IOException, WriteStateE writeAndReadStateSuccessfully(format, paths); } - public void testFailRandomlyAndReadAnyState() throws IOException { Path paths[] = new Path[randomIntBetween(1, 5)]; for (int i = 0; i < paths.length; i++) { @@ -394,13 +390,10 @@ public void testFailRandomlyAndReadAnyState() throws IOException { randomDouble(), randomBoolean()); try { format.write(newState, paths); - //if write is successful, the only possible state we can read is the one that was just written possibleStates.clear(); possibleStates.add(newState); } catch (WriteStateException e) { - //if dirty flag is not set, read might return only old state if (e.isDirty()) { - //if dirty flag is set, read might return old state or new state possibleStates.add(newState); } } From aa443d6fcda7b4e2a79f43dae672a9fed5f9e0fb Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Tue, 23 Oct 2018 15:52:28 +0300 Subject: [PATCH 5/6] Open all directories as the first algorithm step --- .../gateway/MetaDataStateFormat.java | 85 ++++++++----------- 1 file changed, 36 insertions(+), 49 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index 3e157dcfe2dd7..d7bc3b7fe9a46 100644 --- a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -105,61 +105,49 @@ private static void performDirectoryCleanup(Path stateLocation, Directory stateD IOUtils.closeWhileHandlingException(stateDir); } - private Directory writeStateToFirstLocation(final T state, Path stateLocation, String tmpFileName) + private void writeStateToFirstLocation(final T state, Path stateLocation, Directory stateDir, String tmpFileName) throws WriteStateException { try { - Directory stateDir = newDirectory(stateLocation); - try { - deleteFileIfExists(stateLocation, stateDir, tmpFileName); - try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) { - CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); - out.writeInt(FORMAT.index()); - try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { - @Override - public void close() { - // this is important since some of the XContentBuilders write bytes on close. - // in order to write the footer we need to prevent closing the actual index input. - } - })) { - - builder.startObject(); - toXContent(builder, state); - builder.endObject(); + deleteFileIfExists(stateLocation, stateDir, tmpFileName); + try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) { + CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); + out.writeInt(FORMAT.index()); + try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { + @Override + public void close() { + // this is important since some of the XContentBuilders write bytes on close. + // in order to write the footer we need to prevent closing the actual index input. } - CodecUtil.writeFooter(out); - } + })) { - stateDir.sync(Collections.singleton(tmpFileName)); - } catch (Exception e) { - // perform clean up only in case of exception, we need to keep directory open and temporary file on disk - // if everything is ok for the next algorithm steps - performDirectoryCleanup(stateLocation, stateDir, tmpFileName); - throw e; + builder.startObject(); + toXContent(builder, state); + builder.endObject(); + } + CodecUtil.writeFooter(out); } - return stateDir; + + stateDir.sync(Collections.singleton(tmpFileName)); } catch (Exception e) { throw new WriteStateException(false, "failed to write state to the first location tmp file " + stateLocation.resolve(tmpFileName), e); } } - private Directory copyStateToExtraLocation(Directory srcStateDir, Path extraStateLocation, String tmpFileName) + private static void copyStateToExtraLocations(List> stateDirs, String tmpFileName) throws WriteStateException { - try { - Directory extraStateDir = newDirectory(extraStateLocation); + Directory srcStateDir = stateDirs.get(0).v2(); + for (int i = 1; i < stateDirs.size(); i++) { + Tuple extraStatePathAndDir = stateDirs.get(i); + Path extraStateLocation = extraStatePathAndDir.v1(); + Directory extraStateDir = extraStatePathAndDir.v2(); try { deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); extraStateDir.copyFrom(srcStateDir, tmpFileName, tmpFileName, IOContext.DEFAULT); extraStateDir.sync(Collections.singleton(tmpFileName)); } catch (Exception e) { - // perform clean up only in case of exception, we need to keep directory open and temporary file on disk - // if everything is ok for the next algorithm steps - performDirectoryCleanup(extraStateLocation, extraStateDir, tmpFileName); - throw e; + throw new WriteStateException(false, "failed to copy tmp state file to extra location " + extraStateLocation, e); } - return extraStateDir; - } catch (Exception e) { - throw new WriteStateException(false, "failed to copy tmp state file to extra location " + extraStateLocation, e); } } @@ -194,7 +182,6 @@ private static void performStateDirectoriesFsync(List> st } } - /** * Writes the given state to the given directories. The state is written to a * state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it @@ -226,24 +213,24 @@ public final void write(final T state, final Path... locations) throws WriteStat final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION; final String tmpFileName = fileName + ".tmp"; - final Path firstStateLocation = locations[0].resolve(STATE_DIR_NAME); List> directories = new ArrayList<>(); try { - Directory firstStateDir = writeStateToFirstLocation(state, firstStateLocation, tmpFileName); - directories.add(new Tuple<>(firstStateLocation, firstStateDir)); - for (int i = 1; i < locations.length; i++) { - final Path extraStateLocation = locations[i].resolve(STATE_DIR_NAME); - Directory extraStateDir = copyStateToExtraLocation(firstStateDir, extraStateLocation, tmpFileName); - directories.add(new Tuple<>(extraStateLocation, extraStateDir)); + for (Path location : locations) { + Path stateLocation = location.resolve(STATE_DIR_NAME); + try { + directories.add(new Tuple<>(location, newDirectory(stateLocation))); + } catch (IOException e) { + throw new WriteStateException(false, "failed to open state directory " + stateLocation, e); + } } + + writeStateToFirstLocation(state, directories.get(0).v1(), directories.get(0).v2(), tmpFileName); + copyStateToExtraLocations(directories, tmpFileName); performRenames(tmpFileName, fileName, directories); performStateDirectoriesFsync(directories); } finally { - //writeStateToFirstLocation and copyStateToExtraLocation perform clean up for themselves if they fail - //we need to perform clean up for all data paths that were successfully opened and temporary file was created - for (int i = 0; i < directories.size(); i++) { - Tuple pathAndDirectory = directories.get(i); + for (Tuple pathAndDirectory : directories) { performDirectoryCleanup(pathAndDirectory.v1(), pathAndDirectory.v2(), tmpFileName); } } From d8fdf27e53ad46979491cbe095407fe7dd535229 Mon Sep 17 00:00:00 2001 From: Andrey Ershov Date: Tue, 23 Oct 2018 16:56:46 +0300 Subject: [PATCH 6/6] Inline performDirectoryCleanup --- .../org/elasticsearch/gateway/MetaDataStateFormat.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index d7bc3b7fe9a46..7ae6a13724d66 100644 --- a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -100,11 +100,6 @@ private static void deleteFileIgnoreExceptions(Path stateLocation, Directory dir } } - private static void performDirectoryCleanup(Path stateLocation, Directory stateDir, String tmpFileName) { - deleteFileIgnoreExceptions(stateLocation, stateDir, tmpFileName); - IOUtils.closeWhileHandlingException(stateDir); - } - private void writeStateToFirstLocation(final T state, Path stateLocation, Directory stateDir, String tmpFileName) throws WriteStateException { try { @@ -231,7 +226,8 @@ public final void write(final T state, final Path... locations) throws WriteStat performStateDirectoriesFsync(directories); } finally { for (Tuple pathAndDirectory : directories) { - performDirectoryCleanup(pathAndDirectory.v1(), pathAndDirectory.v2(), tmpFileName); + deleteFileIgnoreExceptions(pathAndDirectory.v1(), pathAndDirectory.v2(), tmpFileName); + IOUtils.closeWhileHandlingException(pathAndDirectory.v2()); } }