diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index 49d3692271..b181841ae1 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -20,6 +20,7 @@ import com.automq.stream.api.StreamClient; import com.automq.stream.s3.context.AppendContext; import com.automq.stream.s3.context.FetchContext; +import com.automq.stream.s3.metadata.StreamMetadata; import com.automq.stream.s3.metrics.TimerUtil; import com.automq.stream.s3.metrics.stats.StreamOperationStats; import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter; @@ -39,7 +40,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +50,7 @@ public class S3StreamClient implements StreamClient { private static final long STREAM_OBJECT_COMPACTION_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); private final ScheduledExecutorService streamObjectCompactionScheduler = Threads.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("stream-object-compaction-scheduler", true), LOGGER, true); - private final Map openedStreams; + final Map openedStreams; private final StreamManager streamManager; private final Storage storage; private final ObjectManager objectManager; @@ -58,6 +60,13 @@ public class S3StreamClient implements StreamClient { private final AsyncNetworkBandwidthLimiter networkOutboundBucket; private ScheduledFuture scheduledCompactionTaskFuture; + private final ReentrantLock lock = new ReentrantLock(); + + final Map> openingStreams = new ConcurrentHashMap<>(); + final Map> closingStreams = new ConcurrentHashMap<>(); + + private boolean closed; + @SuppressWarnings("unused") public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManager objectManager, S3Operator s3Operator, Config config) { @@ -80,21 +89,30 @@ public S3StreamClient(StreamManager streamManager, Storage storage, ObjectManage @Override public CompletableFuture createAndOpenStream(CreateStreamOptions options) { - TimerUtil timerUtil = new TimerUtil(); - return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> { - StreamOperationStats.getInstance().createStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); - return openStream0(streamId, options.epoch()); - }), LOGGER, "createAndOpenStream"); + return runInLock(() -> { + checkState(); + TimerUtil timerUtil = new TimerUtil(); + return FutureUtil.exec(() -> streamManager.createStream().thenCompose(streamId -> { + StreamOperationStats.getInstance().createStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + return openStream0(streamId, options.epoch()); + }), LOGGER, "createAndOpenStream"); + }); } @Override public CompletableFuture openStream(long streamId, OpenStreamOptions openStreamOptions) { - return FutureUtil.exec(() -> openStream0(streamId, openStreamOptions.epoch()), LOGGER, "openStream"); + return runInLock(() -> { + checkState(); + return FutureUtil.exec(() -> openStream0(streamId, openStreamOptions.epoch()), LOGGER, "openStream"); + }); } @Override public Optional getStream(long streamId) { - return Optional.ofNullable(openedStreams.get(streamId)); + return runInLock(() -> { + checkState(); + return Optional.ofNullable(openedStreams.get(streamId)); + }); } /** @@ -108,21 +126,32 @@ private void startStreamObjectsCompactions() { } private CompletableFuture openStream0(long streamId, long epoch) { - TimerUtil timerUtil = new TimerUtil(); - return streamManager.openStream(streamId, epoch). - thenApply(metadata -> { - StreamWrapper stream = new StreamWrapper(new S3Stream( - metadata.streamId(), metadata.epoch(), - metadata.startOffset(), metadata.endOffset(), - storage, streamManager, networkInboundBucket, networkOutboundBucket)); - openedStreams.put(streamId, stream); - StreamOperationStats.getInstance().openStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); - return stream; - }); + return runInLock(() -> { + TimerUtil timerUtil = new TimerUtil(); + CompletableFuture cf = streamManager.openStream(streamId, epoch). + thenApply(metadata -> { + StreamWrapper stream = new StreamWrapper(newStream(metadata)); + runInLock(() -> openedStreams.put(streamId, stream)); + StreamOperationStats.getInstance().openStreamStats.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS)); + return stream; + }); + openingStreams.put(streamId, cf); + cf.whenComplete((stream, ex) -> runInLock(() -> openingStreams.remove(streamId, cf))); + return cf; + }); + } + + S3Stream newStream(StreamMetadata metadata) { + return new S3Stream( + metadata.streamId(), metadata.epoch(), + metadata.startOffset(), metadata.endOffset(), + storage, streamManager, networkInboundBucket, networkOutboundBucket); } @Override public void shutdown() { + LOGGER.info("S3StreamClient start shutting down"); + markClosed(); // cancel the submitted task if not started; do not interrupt the task if it is running. if (scheduledCompactionTaskFuture != null) { scheduledCompactionTaskFuture.cancel(false); @@ -139,17 +168,52 @@ public void shutdown() { } TimerUtil timerUtil = new TimerUtil(); - Map> streamCloseFutures = new ConcurrentHashMap<>(); - openedStreams.forEach((streamId, stream) -> streamCloseFutures.put(streamId, stream.close())); for (; ; ) { - Threads.sleep(1000); - List closingStreams = streamCloseFutures.entrySet().stream().filter(e -> !e.getValue().isDone()).map(Map.Entry::getKey).collect(Collectors.toList()); - LOGGER.info("waiting streams close, closed {} / all {}, closing[{}]", streamCloseFutures.size() - closingStreams.size(), streamCloseFutures.size(), closingStreams); - if (closingStreams.isEmpty()) { - break; + lock.lock(); + try { + openedStreams.forEach((streamId, stream) -> { + LOGGER.info("trigger stream force close, streamId={}", streamId); + stream.close(); + }); + if (openedStreams.isEmpty() && openingStreams.isEmpty() && closingStreams.isEmpty()) { + LOGGER.info("all streams are closed"); + break; + } + LOGGER.info("waiting streams close, opened[{}], opening[{}], closing[{}]", openedStreams.keySet(), openingStreams.keySet(), closingStreams.keySet()); + } finally { + lock.unlock(); } + Threads.sleep(1000); + } + LOGGER.info("S3StreamClient shutdown, cost {}ms", timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); + } + + private void checkState() { + if (closed) { + throw new IllegalStateException("S3StreamClient is already closed"); + } + } + + private void markClosed() { + runInLock(() -> closed = true); + } + + private void runInLock(Runnable runnable) { + lock.lock(); + try { + runnable.run(); + } finally { + lock.unlock(); + } + } + + private T runInLock(Supplier supplier) { + lock.lock(); + try { + return supplier.get(); + } finally { + lock.unlock(); } - LOGGER.info("wait streams[{}] closed cost {}ms", streamCloseFutures.keySet(), timerUtil.elapsedAs(TimeUnit.MILLISECONDS)); } class StreamWrapper implements Stream { @@ -218,12 +282,28 @@ public CompletableFuture trim(long newStartOffset) { @Override public CompletableFuture close() { - return stream.close().whenComplete((v, e) -> openedStreams.remove(streamId(), this)); + return runInLock(() -> { + CompletableFuture cf = new CompletableFuture<>(); + openedStreams.remove(streamId(), this); + closingStreams.put(streamId(), cf); + return stream.close().whenComplete((v, e) -> runInLock(() -> { + cf.complete(StreamWrapper.this); + closingStreams.remove(streamId(), cf); + })); + }); } @Override public CompletableFuture destroy() { - return stream.destroy().whenComplete((v, e) -> openedStreams.remove(streamId(), this)); + return runInLock(() -> { + CompletableFuture cf = new CompletableFuture<>(); + openedStreams.remove(streamId(), this); + closingStreams.put(streamId(), cf); + return stream.destroy().whenComplete((v, e) -> runInLock(() -> { + cf.complete(StreamWrapper.this); + closingStreams.remove(streamId(), cf); + })); + }); } public boolean isClosed() { diff --git a/s3stream/src/test/java/com/automq/stream/s3/S3StreamClientTest.java b/s3stream/src/test/java/com/automq/stream/s3/S3StreamClientTest.java new file mode 100644 index 0000000000..52f65eb3fa --- /dev/null +++ b/s3stream/src/test/java/com/automq/stream/s3/S3StreamClientTest.java @@ -0,0 +1,81 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3; + +import com.automq.stream.api.OpenStreamOptions; +import com.automq.stream.s3.metadata.StreamMetadata; +import com.automq.stream.s3.metadata.StreamState; +import com.automq.stream.s3.objects.ObjectManager; +import com.automq.stream.s3.operator.S3Operator; +import com.automq.stream.s3.streams.StreamManager; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@Tag("S3Unit") +public class S3StreamClientTest { + private S3StreamClient client; + private StreamManager streamManager; + private ScheduledExecutorService scheduler; + + @BeforeEach + void setup() { + streamManager = mock(StreamManager.class); + client = spy(new S3StreamClient(streamManager, mock(Storage.class), mock(ObjectManager.class), mock(S3Operator.class), new Config())); + scheduler = Executors.newSingleThreadScheduledExecutor(); + } + + @AfterEach + void cleanup() { + scheduler.shutdown(); + } + + @Test + public void testShutdown_withOpeningStream() { + S3Stream stream = mock(S3Stream.class); + when(stream.close()).thenReturn(CompletableFuture.completedFuture(null)); + when(stream.streamId()).thenReturn(1L); + + CompletableFuture cf = new CompletableFuture<>(); + when(streamManager.openStream(anyLong(), anyLong())).thenReturn(cf); + + doAnswer(args -> stream).when(client).newStream(any()); + + scheduler.schedule(() -> { + cf.complete(new StreamMetadata(1, 2, 100, 200, StreamState.OPENED)); + }, 100, MILLISECONDS); + + client.openStream(1, OpenStreamOptions.builder().build()); + client.shutdown(); + + verify(stream, times(1)).close(); + assertEquals(0, client.openingStreams.size()); + assertEquals(0, client.openedStreams.size()); + assertEquals(0, client.closingStreams.size()); + } + +}