Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3strea/wal): handle IOException during flushing WAL header #1827

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,12 @@ public static BlockWALServiceBuilder recoveryBuilder(String path) {
return new BlockWALServiceBuilder(path).recoveryMode(true);
}

private void flushWALHeader(ShutdownType shutdownType) {
private void flushWALHeader(ShutdownType shutdownType) throws IOException {
walHeader.setShutdownType(shutdownType);
flushWALHeader();
}

private synchronized void flushWALHeader() {
private synchronized void flushWALHeader() throws IOException {
long position = writeHeaderRoundTimes.getAndIncrement() % WAL_HEADER_COUNT * WAL_HEADER_CAPACITY;
walHeader.setLastWriteTimestamp(System.nanoTime());
long trimOffset = walHeader.getTrimOffset();
Expand Down Expand Up @@ -363,7 +363,7 @@ private BlockWALHeader newWALHeader() {
return new BlockWALHeader(walChannel.capacity(), initialWindowSize);
}

private void walHeaderReady(BlockWALHeader header) {
private void walHeaderReady(BlockWALHeader header) throws IOException {
if (nodeId != NOOP_NODE_ID) {
header.setNodeId(nodeId);
header.setEpoch(epoch);
Expand Down Expand Up @@ -392,7 +392,11 @@ public void shutdownGracefully() {
boolean gracefulShutdown = Optional.ofNullable(slidingWindowService)
.map(s -> s.shutdown(1, TimeUnit.DAYS))
.orElse(true);
flushWALHeader(gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY);
try {
flushWALHeader(gracefulShutdown ? ShutdownType.GRACEFULLY : ShutdownType.UNGRACEFULLY);
} catch (IOException ignored) {
// shutdown anyway
}

walChannel.close();

Expand Down Expand Up @@ -506,7 +510,13 @@ private CompletableFuture<Void> trim(long offset, boolean internal) {
}

walHeader.updateTrimOffset(offset);
return CompletableFuture.runAsync(this::flushWALHeader, walHeaderFlusher);
return CompletableFuture.runAsync(() -> {
try {
flushWALHeader();
} catch (IOException e) {
throw new RuntimeIOException(e);
}
}, walHeaderFlusher);
}

private void checkStarted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ private void writeBlockData(BlockBatch blocks) throws IOException {
StorageOperationStats.getInstance().appendWALWriteStats.record(TimerUtil.durationElapsedAs(start, TimeUnit.NANOSECONDS));
}

private void makeWriteOffsetMatchWindow(long newWindowEndOffset) {
private void makeWriteOffsetMatchWindow(long newWindowEndOffset) throws IOException {
// align to block size
newWindowEndOffset = WALUtil.alignLargeByBlockSize(newWindowEndOffset);
long windowStartOffset = windowCoreData.getStartOffset();
Expand All @@ -400,7 +400,7 @@ private void makeWriteOffsetMatchWindow(long newWindowEndOffset) {
}

public interface WALHeaderFlusher {
void flush();
void flush() throws IOException;
}

public static class WindowCoreData {
Expand Down Expand Up @@ -442,7 +442,7 @@ public void updateWindowStartOffset(long offset) {
this.startOffset.accumulateAndGet(offset, Math::max);
}

public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) {
public void scaleOutWindow(WALHeaderFlusher flusher, long newMaxLength) throws IOException {
boolean scaleWindowHappened = false;
scaleOutLock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,28 @@ default void writeAndFlush(ByteBuf src, long position) throws IOException {
flush();
}

default void retryWriteAndFlush(ByteBuf src, long position) {
retryWriteAndFlush(src, position, DEFAULT_RETRY_INTERVAL);
default void retryWriteAndFlush(ByteBuf src, long position) throws IOException {
retryWriteAndFlush(src, position, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT);
}

/**
* Retry {@link #writeAndFlush(ByteBuf, long)} with the given interval until success.
* Retry {@link #writeAndFlush(ByteBuf, long)} with the given interval until success or timeout.
*/
default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMillis) {
default void retryWriteAndFlush(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException {
long start = System.nanoTime();
long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis);
while (true) {
try {
writeAndFlush(src, position);
break;
} catch (IOException e) {
LOGGER.error("Failed to write and flush, retrying in {}ms", retryIntervalMillis, e);
Threads.sleep(retryIntervalMillis);
if (System.nanoTime() - start > retryTimeoutNanos) {
LOGGER.error("Failed to write and flush, retry timeout", e);
throw e;
} else {
LOGGER.error("Failed to write and flush, retrying in {}ms", retryIntervalMillis, e);
Threads.sleep(retryIntervalMillis);
}
}
}
}
Expand Down
Loading