-
Notifications
You must be signed in to change notification settings - Fork 25.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Zen2] Change MetaDataStateFormat write semantics #34709
Changes from 3 commits
1ec0c73
0cfffd5
b0ac9aa
5f70ad8
aa443d6
d8fdf27
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
import org.apache.lucene.store.IndexOutput; | ||
import org.apache.lucene.store.SimpleFSDirectory; | ||
import org.elasticsearch.ExceptionsHelper; | ||
import org.elasticsearch.common.collect.Tuple; | ||
import org.elasticsearch.common.logging.Loggers; | ||
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; | ||
import org.elasticsearch.common.lucene.store.InputStreamIndexInput; | ||
|
@@ -91,86 +92,162 @@ private static void deleteFileIfExists(Path stateLocation, Directory directory, | |
logger.trace("cleaned up {}", stateLocation.resolve(fileName)); | ||
} | ||
|
||
private void writeStateToFirstLocation(final T state, Path stateLocation, Directory stateDir, String fileName, String tmpFileName) | ||
throws IOException { | ||
private static void deleteFileIgnoreExceptions(Path stateLocation, Directory directory, String fileName) { | ||
try { | ||
deleteFileIfExists(stateLocation, stateDir, tmpFileName); | ||
try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) { | ||
CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); | ||
out.writeInt(FORMAT.index()); | ||
try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { | ||
@Override | ||
public void close() throws IOException { | ||
// this is important since some of the XContentBuilders write bytes on close. | ||
// in order to write the footer we need to prevent closing the actual index input. | ||
} | ||
})) { | ||
deleteFileIfExists(stateLocation, directory, fileName); | ||
} catch (IOException e) { | ||
logger.trace("clean up failed {}", stateLocation.resolve(fileName)); | ||
} | ||
} | ||
|
||
private static void performDirectoryCleanup(Path stateLocation, Directory stateDir, String tmpFileName) { | ||
deleteFileIgnoreExceptions(stateLocation, stateDir, tmpFileName); | ||
IOUtils.closeWhileHandlingException(stateDir); | ||
} | ||
|
||
builder.startObject(); | ||
{ | ||
toXContent(builder, state); | ||
private Directory writeStateToFirstLocation(final T state, Path stateLocation, String tmpFileName) | ||
throws WriteStateException { | ||
try { | ||
Directory stateDir = newDirectory(stateLocation); | ||
try { | ||
deleteFileIfExists(stateLocation, stateDir, tmpFileName); | ||
try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) { | ||
CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); | ||
out.writeInt(FORMAT.index()); | ||
try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { | ||
@Override | ||
public void close() throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't actually throw |
||
// this is important since some of the XContentBuilders write bytes on close. | ||
// in order to write the footer we need to prevent closing the actual index input. | ||
} | ||
})) { | ||
|
||
builder.startObject(); | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
toXContent(builder, state); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject(); | ||
CodecUtil.writeFooter(out); | ||
} | ||
CodecUtil.writeFooter(out); | ||
} | ||
|
||
stateDir.sync(Collections.singleton(tmpFileName)); | ||
stateDir.rename(tmpFileName, fileName); | ||
stateDir.syncMetaData(); | ||
logger.trace("written state to {}", stateLocation.resolve(fileName)); | ||
} finally { | ||
deleteFileIfExists(stateLocation, stateDir, tmpFileName); | ||
stateDir.sync(Collections.singleton(tmpFileName)); | ||
} catch (Exception e) { | ||
// perform clean up only in case of exception, we need to keep directory open and temporary file on disk | ||
// if everything is ok for the next algorithm steps | ||
performDirectoryCleanup(stateLocation, stateDir, tmpFileName); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps I'm missing something, but it looks like we do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's true that clean up is performed in the write method in finally block, but only for those directories that are added successfully to the list of directories. If this method throws an exception, directory won't be added to the list. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Could we just open all the directories up-front and then clean them all up at the end? If possible I think it's clearer that there are no leaks if you can see acquisition and release of resources to be paired up in a single method. It's not always possible, but here I think it is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, that actually makes sense. Please see aa443d6 |
||
throw e; | ||
} | ||
return stateDir; | ||
} catch (Exception e) { | ||
throw new WriteStateException(false, "failed to write state to the first location tmp file", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it'd be useful to see the filenames in the exception message. |
||
} | ||
} | ||
|
||
private void copyStateToExtraLocation(Directory srcStateDir, Path extraStateLocation, String fileName, String tmpFileName) | ||
throws IOException { | ||
try (Directory extraStateDir = newDirectory(extraStateLocation)) { | ||
private Directory copyStateToExtraLocation(Directory srcStateDir, Path extraStateLocation, String tmpFileName) | ||
throws WriteStateException { | ||
try { | ||
Directory extraStateDir = newDirectory(extraStateLocation); | ||
try { | ||
deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); | ||
extraStateDir.copyFrom(srcStateDir, fileName, tmpFileName, IOContext.DEFAULT); | ||
extraStateDir.copyFrom(srcStateDir, tmpFileName, tmpFileName, IOContext.DEFAULT); | ||
extraStateDir.sync(Collections.singleton(tmpFileName)); | ||
extraStateDir.rename(tmpFileName, fileName); | ||
extraStateDir.syncMetaData(); | ||
logger.trace("copied state to {}", extraStateLocation.resolve(fileName)); | ||
} finally { | ||
deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); | ||
} catch (Exception e) { | ||
// perform clean up only in case of exception, we need to keep directory open and temporary file on disk | ||
// if everything is ok for the next algorithm steps | ||
performDirectoryCleanup(extraStateLocation, extraStateDir, tmpFileName); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps I'm missing something, but it looks like we do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as writeStateToFirstLocation |
||
throw e; | ||
} | ||
return extraStateDir; | ||
} catch (Exception e) { | ||
throw new WriteStateException(false, "failed to copy tmp state file to extra location", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it'd be useful to see the filenames in the exception message. |
||
} | ||
} | ||
|
||
public void performRenames(String tmpFileName, String fileName, final List<Tuple<Path, Directory>> stateDirectories) throws | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be |
||
WriteStateException { | ||
Directory firstStateDirectory = stateDirectories.get(0).v2(); | ||
try { | ||
firstStateDirectory.rename(tmpFileName, fileName); | ||
} catch (IOException e) { | ||
throw new WriteStateException(false, "failed to rename tmp file to final name in the first state location", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it'd be useful to see the filenames in the exception message. |
||
} | ||
|
||
for (int i = 1; i < stateDirectories.size(); i++) { | ||
Directory extraStateDirectory = stateDirectories.get(i).v2(); | ||
try { | ||
extraStateDirectory.rename(tmpFileName, fileName); | ||
} catch (IOException e) { | ||
throw new WriteStateException(true, "failed to rename tmp file to final name in extra state location", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it'd be useful to see the filenames in the exception message. |
||
e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: this could be on the previous line. |
||
} | ||
} | ||
} | ||
|
||
public void performStateDirectoriesFsync(List<Tuple<Path, Directory>> stateDirectories) throws WriteStateException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be |
||
for (int i = 0; i < stateDirectories.size(); i++) { | ||
try { | ||
stateDirectories.get(i).v2().syncMetaData(); | ||
} catch (IOException e) { | ||
throw new WriteStateException(true, "meta data directory fsync has failed", e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it'd be useful to see the path in the exception message. |
||
} | ||
} | ||
} | ||
|
||
|
||
/** | ||
* Writes the given state to the given directories. The state is written to a | ||
* state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it | ||
* doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to | ||
* it's target filename of the pattern {@code {prefix}{version}.st}. | ||
* If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return it. | ||
* But if this method throws an exception, loadLatestState could return this state or some previous state. | ||
* If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return | ||
* it. <br> | ||
* This method may throw an {@link WriteStateException} if some exception during writing state occurs. <br> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we don't need this line or the following two, since they duplicate docs found elsewhere. |
||
* If {@link WriteStateException#isDirty()} returns false, there is a guarantee that loadLatestState will return old state. <br> | ||
* If {@link WriteStateException#isDirty()} returns true, loadLatestState could return new state or previous state. | ||
* | ||
* @param state the state object to write | ||
* @param locations the locations where the state should be written to. | ||
* @throws IOException if an IOException occurs | ||
* @throws WriteStateException if some exception during writing state occurs. | ||
*/ | ||
public final void write(final T state, final Path... locations) throws IOException { | ||
|
||
public final void write(final T state, final Path... locations) throws WriteStateException { | ||
if (locations == null) { | ||
throw new IllegalArgumentException("Locations must not be null"); | ||
} | ||
if (locations.length <= 0) { | ||
throw new IllegalArgumentException("One or more locations required"); | ||
} | ||
final long maxStateId = findMaxStateId(prefix, locations) + 1; | ||
|
||
long maxStateId; | ||
try { | ||
maxStateId = findMaxStateId(prefix, locations) + 1; | ||
} catch (Exception e) { | ||
throw new WriteStateException(false, "exception during looking up max state id", e); | ||
} | ||
assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]"; | ||
|
||
final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION; | ||
final String tmpFileName = fileName + ".tmp"; | ||
final Path firstStateLocation = locations[0].resolve(STATE_DIR_NAME); | ||
try (Directory stateDir = newDirectory(firstStateLocation)) { | ||
writeStateToFirstLocation(state, firstStateLocation, stateDir, fileName, tmpFileName); | ||
List<Tuple<Path, Directory>> directories = new ArrayList<>(); | ||
|
||
try { | ||
Directory firstStateDir = writeStateToFirstLocation(state, firstStateLocation, tmpFileName); | ||
directories.add(new Tuple<>(firstStateLocation, firstStateDir)); | ||
for (int i = 1; i < locations.length; i++) { | ||
final Path extraStateLocation = locations[i].resolve(STATE_DIR_NAME); | ||
copyStateToExtraLocation(stateDir, extraStateLocation, fileName, tmpFileName); | ||
Directory extraStateDir = copyStateToExtraLocation(firstStateDir, extraStateLocation, tmpFileName); | ||
directories.add(new Tuple<>(extraStateLocation, extraStateDir)); | ||
} | ||
performRenames(tmpFileName, fileName, directories); | ||
performStateDirectoriesFsync(directories); | ||
} finally { | ||
//writeStateToFirstLocation and copyStateToExtraLocation perform clean up for themselves if they fail | ||
//we need to perform clean up for all data paths that were successfully opened and temporary file was created | ||
for (int i = 0; i < directories.size(); i++) { | ||
Tuple<Path, Directory> pathAndDirectory = directories.get(i); | ||
performDirectoryCleanup(pathAndDirectory.v1(), pathAndDirectory.v2(), tmpFileName); | ||
} | ||
} | ||
|
||
|
@@ -227,16 +304,18 @@ protected Directory newDirectory(Path dir) throws IOException { | |
return new SimpleFSDirectory(dir); | ||
} | ||
|
||
private void cleanupOldFiles(final String currentStateFile, Path[] locations) throws IOException { | ||
private void cleanupOldFiles(final String currentStateFile, Path[] locations) { | ||
for (Path location : locations) { | ||
logger.trace("cleanupOldFiles: cleaning up {}", location); | ||
Path stateLocation = location.resolve(STATE_DIR_NAME); | ||
try (Directory stateDir = newDirectory(stateLocation)) { | ||
for (String file : stateDir.listAll()) { | ||
if (file.startsWith(prefix) && file.equals(currentStateFile) == false) { | ||
deleteFileIfExists(stateLocation, stateDir, file); | ||
deleteFileIgnoreExceptions(stateLocation, stateDir, file); | ||
} | ||
} | ||
} catch (Exception e) { | ||
logger.trace("clean up failed for state location {}", stateLocation); | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.elasticsearch.gateway; | ||
|
||
import java.io.IOException; | ||
|
||
/** | ||
* This exception is thrown when there is a problem of writing state to disk. <br> | ||
* If {@link #isDirty()} returns false, state is guaranteed to be not written to disk. | ||
* If {@link #isDirty()} returns true, we don't know if state is written to disk. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably better for these docs to be on the |
||
*/ | ||
public class WriteStateException extends IOException { | ||
private boolean dirty; | ||
|
||
public WriteStateException(boolean dirty, String message, Exception cause) { | ||
super(message, cause); | ||
this.dirty = dirty; | ||
} | ||
|
||
public boolean isDirty() { | ||
return dirty; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is short and only used in one place, so I think I'd inline it.