Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(issues1069): stream client shutdown await opening stream close #1071

Merged
merged 1 commit into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 110 additions & 30 deletions s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.automq.stream.api.StreamClient;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.StreamOperationStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
Expand All @@ -39,7 +40,8 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,7 +50,7 @@ public class S3StreamClient implements StreamClient {
private static final long STREAM_OBJECT_COMPACTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1);
private final ScheduledExecutorService streamObjectCompactionScheduler = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("stream-object-compaction-scheduler", true), LOGGER, true);
private final Map<Long, StreamWrapper> openedStreams;
final Map<Long, StreamWrapper> openedStreams;
private final StreamManager streamManager;
private final Storage storage;
private final ObjectManager objectManager;
Expand All @@ -58,6 +60,13 @@ public class S3StreamClient implements StreamClient {
private final AsyncNetworkBandwidthLimiter networkOutboundBucket;
private ScheduledFuture<?> scheduledCompactionTaskFuture;

private final ReentrantLock lock = new ReentrantLock();

final Map<Long, CompletableFuture<Stream>> openingStreams = new ConcurrentHashMap<>();
final Map<Long, CompletableFuture<Stream>> closingStreams = new ConcurrentHashMap<>();

private boolean closed;

@SuppressWarnings("unused")
public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManager objectManager,
S3Operator s3Operator, Config config) {
Expand All @@ -80,21 +89,30 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage

@Override
public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options) {
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
StreamOperationStats.getInstance().createStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return openStream0(streamId, options.epoch());
}), LOGGER, "createAndOpenStream");
return runInLock(() -> {
checkState();
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> {
StreamOperationStats.getInstance().createStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return openStream0(streamId, options.epoch());
}), LOGGER, "createAndOpenStream");
});
}

@Override
public CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions openStreamOptions) {
return FutureUtil.exec(() -> openStream0(streamId, openStreamOptions.epoch()), LOGGER, "openStream");
return runInLock(() -> {
checkState();
return FutureUtil.exec(() -> openStream0(streamId, openStreamOptions.epoch()), LOGGER, "openStream");
});
}

@Override
public Optional<Stream> getStream(long streamId) {
return Optional.ofNullable(openedStreams.get(streamId));
return runInLock(() -> {
checkState();
return Optional.ofNullable(openedStreams.get(streamId));
});
}

/**
Expand All @@ -108,21 +126,32 @@ private void startStreamObjectsCompactions() {
}

private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
TimerUtil timerUtil = new TimerUtil();
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
StreamWrapper stream = new StreamWrapper(new S3Stream(
metadata.streamId(), metadata.epoch(),
metadata.startOffset(), metadata.endOffset(),
storage, streamManager, networkInboundBucket, networkOutboundBucket));
openedStreams.put(streamId, stream);
StreamOperationStats.getInstance().openStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return stream;
});
return runInLock(() -> {
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<Stream> cf = streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
StreamWrapper stream = new StreamWrapper(newStream(metadata));
runInLock(() -> openedStreams.put(streamId, stream));
StreamOperationStats.getInstance().openStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return stream;
});
openingStreams.put(streamId, cf);
cf.whenComplete((stream, ex) -> runInLock(() -> openingStreams.remove(streamId, cf)));
return cf;
});
}

S3Stream newStream(StreamMetadata metadata) {
return new S3Stream(
metadata.streamId(), metadata.epoch(),
metadata.startOffset(), metadata.endOffset(),
storage, streamManager, networkInboundBucket, networkOutboundBucket);
}

@Override
public void shutdown() {
LOGGER.info("S3StreamClient start shutting down");
markClosed();
// cancel the submitted task if not started; do not interrupt the task if it is running.
if (scheduledCompactionTaskFuture != null) {
scheduledCompactionTaskFuture.cancel(false);
Expand All @@ -139,17 +168,52 @@ public void shutdown() {
}

TimerUtil timerUtil = new TimerUtil();
Map<Long, CompletableFuture<Void>> streamCloseFutures = new ConcurrentHashMap<>();
openedStreams.forEach((streamId, stream) -> streamCloseFutures.put(streamId, stream.close()));
for (; ; ) {
Threads.sleep(1000);
List<Long> closingStreams = streamCloseFutures.entrySet().stream().filter(e -> !e.getValue().isDone()).map(Map.Entry::getKey).collect(Collectors.toList());
LOGGER.info("waiting streams close, closed {} / all {}, closing[{}]", streamCloseFutures.size() - closingStreams.size(), streamCloseFutures.size(), closingStreams);
if (closingStreams.isEmpty()) {
break;
lock.lock();
try {
openedStreams.forEach((streamId, stream) -> {
LOGGER.info("trigger stream force close, streamId={}", streamId);
stream.close();
});
if (openedStreams.isEmpty() && openingStreams.isEmpty() && closingStreams.isEmpty()) {
LOGGER.info("all streams are closed");
break;
}
LOGGER.info("waiting streams close, opened[{}], opening[{}], closing[{}]", openedStreams.keySet(), openingStreams.keySet(), closingStreams.keySet());
} finally {
lock.unlock();
}
Threads.sleep(1000);
}
LOGGER.info("S3StreamClient shutdown, cost {}ms", timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
}

private void checkState() {
if (closed) {
throw new IllegalStateException("S3StreamClient is already closed");
}
}

private void markClosed() {
runInLock(() -> closed = true);
}

private void runInLock(Runnable runnable) {
lock.lock();
try {
runnable.run();
} finally {
lock.unlock();
}
}

private <T> T runInLock(Supplier<T> supplier) {
lock.lock();
try {
return supplier.get();
} finally {
lock.unlock();
}
LOGGER.info("wait streams[{}] closed cost {}ms", streamCloseFutures.keySet(), timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
}

class StreamWrapper implements Stream {
Expand Down Expand Up @@ -218,12 +282,28 @@ public CompletableFuture<Void> trim(long newStartOffset) {

@Override
public CompletableFuture<Void> close() {
return stream.close().whenComplete((v, e) -> openedStreams.remove(streamId(), this));
return runInLock(() -> {
CompletableFuture<Stream> cf = new CompletableFuture<>();
openedStreams.remove(streamId(), this);
closingStreams.put(streamId(), cf);
return stream.close().whenComplete((v, e) -> runInLock(() -> {
cf.complete(StreamWrapper.this);
closingStreams.remove(streamId(), cf);
}));
});
}

@Override
public CompletableFuture<Void> destroy() {
return stream.destroy().whenComplete((v, e) -> openedStreams.remove(streamId(), this));
return runInLock(() -> {
CompletableFuture<Stream> cf = new CompletableFuture<>();
openedStreams.remove(streamId(), this);
closingStreams.put(streamId(), cf);
return stream.destroy().whenComplete((v, e) -> runInLock(() -> {
cf.complete(StreamWrapper.this);
closingStreams.remove(streamId(), cf);
}));
});
}

public boolean isClosed() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3;

import com.automq.stream.api.OpenStreamOptions;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metadata.StreamState;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.streams.StreamManager;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@Tag("S3Unit")
public class S3StreamClientTest {
private S3StreamClient client;
private StreamManager streamManager;
private ScheduledExecutorService scheduler;

@BeforeEach
void setup() {
streamManager = mock(StreamManager.class);
client = spy(new S3StreamClient(streamManager, mock(Storage.class), mock(ObjectManager.class), mock(S3Operator.class), new Config()));
scheduler = Executors.newSingleThreadScheduledExecutor();
}

@AfterEach
void cleanup() {
scheduler.shutdown();
}

@Test
public void testShutdown_withOpeningStream() {
S3Stream stream = mock(S3Stream.class);
when(stream.close()).thenReturn(CompletableFuture.completedFuture(null));
when(stream.streamId()).thenReturn(1L);

CompletableFuture<StreamMetadata> cf = new CompletableFuture<>();
when(streamManager.openStream(anyLong(), anyLong())).thenReturn(cf);

doAnswer(args -> stream).when(client).newStream(any());

scheduler.schedule(() -> {
cf.complete(new StreamMetadata(1, 2, 100, 200, StreamState.OPENED));
}, 100, MILLISECONDS);

client.openStream(1, OpenStreamOptions.builder().build());
client.shutdown();

verify(stream, times(1)).close();
assertEquals(0, client.openingStreams.size());
assertEquals(0, client.openedStreams.size());
assertEquals(0, client.closingStreams.size());
}

}
Loading