diff --git a/servicetalk-client-api-internal/src/main/java/io/servicetalk/client/api/internal/DefaultPartitionedClientGroup.java b/servicetalk-client-api-internal/src/main/java/io/servicetalk/client/api/internal/DefaultPartitionedClientGroup.java index 0922f240c1..831aa79c0f 100644 --- a/servicetalk-client-api-internal/src/main/java/io/servicetalk/client/api/internal/DefaultPartitionedClientGroup.java +++ b/servicetalk-client-api-internal/src/main/java/io/servicetalk/client/api/internal/DefaultPartitionedClientGroup.java @@ -131,6 +131,11 @@ public Completable onClose() { return partitionMap.onClose(); } + @Override + public Completable onClosing() { + return partitionMap.onClosing(); + } + @Override public Completable closeAsync() { // Cancel doesn't provide any status and is assumed to complete immediately so we just cancel when subscribe @@ -209,11 +214,21 @@ public Completable onClose() { return close.onClose(); } + @Override + public Completable onClosing() { + return close.onClosing(); + } + @Override public Completable closeAsync() { return close.closeAsync(); } + @Override + public Completable closeAsyncGracefully() { + return close.closeAsyncGracefully(); + } + static final class MutableInt { int value; } diff --git a/servicetalk-client-api-internal/src/main/java/io/servicetalk/client/api/internal/partition/PowerSetPartitionMap.java b/servicetalk-client-api-internal/src/main/java/io/servicetalk/client/api/internal/partition/PowerSetPartitionMap.java index 3c84267c8d..c0f443b25d 100644 --- a/servicetalk-client-api-internal/src/main/java/io/servicetalk/client/api/internal/partition/PowerSetPartitionMap.java +++ b/servicetalk-client-api-internal/src/main/java/io/servicetalk/client/api/internal/partition/PowerSetPartitionMap.java @@ -206,6 +206,11 @@ public Completable onClose() { return asyncCloseable.onClose(); } + @Override + public Completable onClosing() { + return asyncCloseable.onClosing(); + } + @Override public Completable closeAsync() { return asyncCloseable.closeAsync(); diff --git a/servicetalk-client-api/src/main/java/io/servicetalk/client/api/DefaultClientGroup.java b/servicetalk-client-api/src/main/java/io/servicetalk/client/api/DefaultClientGroup.java index ffdbbdf366..2ca653bf1f 100644 --- a/servicetalk-client-api/src/main/java/io/servicetalk/client/api/DefaultClientGroup.java +++ b/servicetalk-client-api/src/main/java/io/servicetalk/client/api/DefaultClientGroup.java @@ -175,6 +175,11 @@ public Completable onClose() { return asyncCloseable.onClose(); } + @Override + public Completable onClosing() { + return asyncCloseable.onClosing(); + } + @Override public Completable closeAsync() { return asyncCloseable.closeAsync(); diff --git a/servicetalk-client-api/src/main/java/io/servicetalk/client/api/DelegatingConnectionFactory.java b/servicetalk-client-api/src/main/java/io/servicetalk/client/api/DelegatingConnectionFactory.java index 89559d44ba..7bb7d90768 100644 --- a/servicetalk-client-api/src/main/java/io/servicetalk/client/api/DelegatingConnectionFactory.java +++ b/servicetalk-client-api/src/main/java/io/servicetalk/client/api/DelegatingConnectionFactory.java @@ -68,6 +68,11 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public Completable closeAsync() { return delegate.closeAsync(); diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AbstractExecutor.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AbstractExecutor.java index 5611da4c81..f009bd4d8c 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AbstractExecutor.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AbstractExecutor.java @@ -15,44 +15,39 @@ */ package io.servicetalk.concurrent.api; -import io.servicetalk.concurrent.CompletableSource; - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import javax.annotation.Nullable; - -import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; +import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable; +import static io.servicetalk.concurrent.api.AsyncCloseables.toListenableAsyncCloseable; +import static io.servicetalk.concurrent.api.Completable.completed; +import static io.servicetalk.concurrent.api.Completable.defer; abstract class AbstractExecutor implements Executor { - - private static final AtomicReferenceFieldUpdater - onCloseUpdater = newUpdater(AbstractExecutor.class, CompletableProcessor.class, "onClose"); - - @SuppressWarnings("unused") - @Nullable - private volatile CompletableProcessor onClose; + private final ListenableAsyncCloseable listenableAsyncCloseable = toListenableAsyncCloseable( + toAsyncCloseable(graceful -> defer(() -> { + // If closeAsync() is subscribed multiple times, we will call this method as many times. + // Since doClose() is idempotent and usually cheap, it is OK as compared to implementing at most + // once semantics. + doClose(); + return completed().shareContextOnSubscribe(); + }))); @Override public Completable onClose() { - return getOrCreateOnClose(); + return listenableAsyncCloseable.onClose(); + } + + @Override + public Completable onClosing() { + return listenableAsyncCloseable.onClosing(); } @Override public Completable closeAsync() { - return new CloseAsync(); + return listenableAsyncCloseable.closeAsync(); } - private CompletableProcessor getOrCreateOnClose() { - CompletableProcessor onClose = this.onClose; - if (onClose != null) { - return onClose; - } - final CompletableProcessor newOnClose = new CompletableProcessor(); - if (onCloseUpdater.compareAndSet(this, null, newOnClose)) { - return newOnClose; - } - onClose = this.onClose; - assert onClose != null; - return onClose; + @Override + public Completable closeAsyncGracefully() { + return listenableAsyncCloseable.closeAsyncGracefully(); } /** @@ -60,27 +55,4 @@ private CompletableProcessor getOrCreateOnClose() { * This method MUST be idempotent. */ abstract void doClose(); - - private final class CloseAsync extends Completable implements CompletableSource { - @Override - protected void handleSubscribe(Subscriber subscriber) { - CompletableProcessor onClose = getOrCreateOnClose(); - onClose.subscribeInternal(subscriber); - try { - // If closeAsync() is subscribed multiple times, we will call this method as many times. - // Since doClose() is idempotent and usually cheap, it is OK as compared to implementing at most - // once semantics. - doClose(); - } catch (Throwable cause) { - onClose.onError(cause); - return; - } - onClose.onComplete(); - } - - @Override - public void subscribe(final Subscriber subscriber) { - subscribeInternal(subscriber); - } - } } diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AsyncCloseables.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AsyncCloseables.java index 4000862fce..d09ad423eb 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AsyncCloseables.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/AsyncCloseables.java @@ -107,6 +107,7 @@ public static ListenableAsyncCloseable toListenableAsyncCloseable( return new ListenableAsyncCloseable() { private final CompletableProcessor onCloseProcessor = new CompletableProcessor(); + private final CompletableProcessor onClosing = new CompletableProcessor(); private final Completable onClose = onCloseDecorator.apply(onCloseProcessor); @Override @@ -114,6 +115,7 @@ public Completable closeAsyncGracefully() { return new SubscribableCompletable() { @Override protected void handleSubscribe(final Subscriber subscriber) { + onClosing.onComplete(); asyncCloseable.closeAsyncGracefully().subscribeInternal(onCloseProcessor); onClose.subscribeInternal(subscriber); } @@ -125,11 +127,17 @@ public Completable onClose() { return onClose; } + @Override + public Completable onClosing() { + return onClosing; + } + @Override public Completable closeAsync() { return new SubscribableCompletable() { @Override protected void handleSubscribe(final Subscriber subscriber) { + onClosing.onComplete(); asyncCloseable.closeAsync().subscribeInternal(onCloseProcessor); onClose.subscribeInternal(subscriber); } @@ -184,6 +192,7 @@ private static final class DefaultAsyncCloseable implements ListenableAsyncClose private static final AtomicIntegerFieldUpdater closedUpdater = newUpdater(DefaultAsyncCloseable.class, "closed"); private final CloseableResource closeableResource; + private final CompletableProcessor onClosing = new CompletableProcessor(); private final CompletableProcessor onClose = new CompletableProcessor(); @SuppressWarnings("unused") @@ -200,6 +209,7 @@ public Completable closeAsync() { protected void handleSubscribe(final Subscriber subscriber) { onClose.subscribeInternal(subscriber); if (closedUpdater.getAndSet(DefaultAsyncCloseable.this, HARD_CLOSE) != HARD_CLOSE) { + onClosing.onComplete(); closeableResource.doClose(false).subscribeInternal(onClose); } } @@ -213,6 +223,7 @@ public Completable closeAsyncGracefully() { protected void handleSubscribe(final Subscriber subscriber) { onClose.subscribeInternal(subscriber); if (closedUpdater.compareAndSet(DefaultAsyncCloseable.this, IDLE, CLOSED_GRACEFULLY)) { + onClosing.onComplete(); closeableResource.doClose(true).subscribeInternal(onClose); } } @@ -223,6 +234,11 @@ protected void handleSubscribe(final Subscriber subscriber) { public Completable onClose() { return onClose; } + + @Override + public Completable onClosing() { + return onClosing; + } } private abstract static class SubscribableCompletable extends Completable implements CompletableSource { diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ContextPreservingStExecutor.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ContextPreservingStExecutor.java index c73116f4d0..363b914b4e 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ContextPreservingStExecutor.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ContextPreservingStExecutor.java @@ -17,6 +17,7 @@ import io.servicetalk.concurrent.Cancellable; +import java.time.Duration; import java.util.concurrent.TimeUnit; import static java.util.Objects.requireNonNull; @@ -43,6 +44,11 @@ public Cancellable schedule(final Runnable task, final long delay, final TimeUni return delegate.schedule(new ContextPreservingRunnable(task), delay, unit); } + @Override + public Cancellable schedule(final Runnable task, final Duration delay) { + return delegate.schedule(new ContextPreservingRunnable(task), delay); + } + @Override public long currentTime(TimeUnit unit) { return delegate.currentTime(unit); @@ -53,13 +59,25 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public Completable closeAsync() { return delegate.closeAsync(); } + @Override + public Completable closeAsyncGracefully() { + return delegate.closeAsyncGracefully(); + } + static Executor of(Executor delegate) { return delegate instanceof ContextPreservingStExecutor ? delegate : new ContextPreservingStExecutor(delegate); } + + // Don't override methods that return an async source. Context will be captured at subscribe by the async source. } diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/DelegatingExecutor.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/DelegatingExecutor.java index 24dadc045e..e489a0560a 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/DelegatingExecutor.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/DelegatingExecutor.java @@ -111,6 +111,11 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public Completable closeAsync() { return delegate.closeAsync(); diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ListenableAsyncCloseable.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ListenableAsyncCloseable.java index 5424bddae7..06acf707c0 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ListenableAsyncCloseable.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/ListenableAsyncCloseable.java @@ -25,4 +25,21 @@ public interface ListenableAsyncCloseable extends AsyncCloseable { * @return the {@code Completable} that is notified on close. */ Completable onClose(); + + /** + * Returns a {@link Completable} that is notified when closing begins. + *

+ * Closing begin might be when a close operation is initiated locally (e.g. subscribing to {@link #closeAsync()}) or + * it could also be a transport event received from a remote peer (e.g. read a {@code connection: close} header). + *

+ * For backwards compatibility this method maybe functionally equivalent to {@link #onClose()}. Therefore, provides + * a best-effort leading edge notification of closing, but may fall back to notification on trailing edge. + *

+ * The goal of this method is often to notify asap when closing so this method may not be offloaded and care must + * be taken to avoid blocking if subscribing to the return {@link Completable}. + * @return a {@link Completable} that is notified when closing begins. + */ + default Completable onClosing() { + return onClose(); + } } diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java index 7d92a40a17..9dd62c91e3 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java @@ -272,6 +272,11 @@ public Completable onClose() { return asyncCloseable.onClose(); } + @Override + public Completable onClosing() { + return asyncCloseable.onClosing(); + } + @Override public Completable closeAsync() { return asyncCloseable.closeAsync(); diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DnsClientFilter.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DnsClientFilter.java index 46958b961f..2dd7b81968 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DnsClientFilter.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DnsClientFilter.java @@ -55,6 +55,11 @@ public Completable onClose() { return client.onClose(); } + @Override + public Completable onClosing() { + return client.onClosing(); + } + @Override public Completable closeAsync() { return client.closeAsync(); diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DnsClients.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DnsClients.java index 3c483e8527..138c9ab177 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DnsClients.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DnsClients.java @@ -64,6 +64,11 @@ public Completable onClose() { return dns.onClose(); } + @Override + public Completable onClosing() { + return dns.onClosing(); + } + @Override public Publisher>> discover(final String s) { return dns.dnsSrvQuery(s); @@ -103,6 +108,11 @@ public Completable onClose() { return dns.onClose(); } + @Override + public Completable onClosing() { + return dns.onClosing(); + } + @Override public Publisher>> discover( final HostAndPort hostAndPort) { diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcClientCallFactory.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcClientCallFactory.java index e6deda2d48..5a9675c3f5 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcClientCallFactory.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcClientCallFactory.java @@ -374,6 +374,11 @@ public Completable onClose() { return streamingHttpClient.onClose(); } + @Override + public Completable onClosing() { + return streamingHttpClient.onClosing(); + } + /** * Determines the timeout for a new request using three potential sources; the deadline in the async context, the * request timeout, and the client default. The timeout will be the lesser of the context and request timeouts or if diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcServiceContext.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcServiceContext.java index 9675cc64f3..aa7365bcc1 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcServiceContext.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcServiceContext.java @@ -102,6 +102,11 @@ public ConnectionContext parent() { return connectionContext.parent(); } + @Override + public Completable onClosing() { + return connectionContext.onClosing(); + } + @Override public Completable onClose() { return connectionContext.onClose(); diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java index 91c58de0a3..cb3b27229c 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcRouter.java @@ -233,6 +233,11 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public SocketAddress listenAddress() { return delegate.listenAddress(); diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/customtransport/ClientTransportGrpcCallFactory.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/customtransport/ClientTransportGrpcCallFactory.java index 0028b9868e..b8d39ecee2 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/customtransport/ClientTransportGrpcCallFactory.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/customtransport/ClientTransportGrpcCallFactory.java @@ -135,6 +135,11 @@ public Completable onClose() { return closeable.onClose(); } + @Override + public Completable onClosing() { + return closeable.onClosing(); + } + @Override public Completable closeAsync() { return closeable.closeAsync(); diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/customtransport/Utils.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/customtransport/Utils.java index b16583fd6c..191a8eb1dd 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/customtransport/Utils.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/customtransport/Utils.java @@ -154,6 +154,11 @@ public Completable onClose() { return closeAsync.onClose(); } + @Override + public Completable onClosing() { + return closeAsync.onClosing(); + } + @Deprecated @Override public String path() { diff --git a/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Generator.java b/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Generator.java index a4ceb3101a..2559634d54 100644 --- a/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Generator.java +++ b/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Generator.java @@ -153,6 +153,7 @@ import static io.servicetalk.grpc.protoc.Words.methodDescriptor; import static io.servicetalk.grpc.protoc.Words.methodDescriptors; import static io.servicetalk.grpc.protoc.Words.onClose; +import static io.servicetalk.grpc.protoc.Words.onClosing; import static io.servicetalk.grpc.protoc.Words.registerRoutes; import static io.servicetalk.grpc.protoc.Words.request; import static io.servicetalk.grpc.protoc.Words.requestEncoding; @@ -1274,6 +1275,7 @@ private TypeSpec newDefaultClientClassSpec(final State state, final ClassName de .build()) .addMethod(newDelegatingMethodSpec(executionContext, factory, GrpcExecutionContext, null)) .addMethod(newDelegatingCompletableMethodSpec(onClose, factory)) + .addMethod(newDelegatingCompletableMethodSpec(onClosing, factory)) .addMethod(newDelegatingCompletableMethodSpec(closeAsync, factory)) .addMethod(newDelegatingCompletableMethodSpec(closeAsyncGracefully, factory)) .addMethod(newDelegatingCompletableToBlockingMethodSpec(close, closeAsync, factory)) diff --git a/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Words.java b/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Words.java index a97a4f7d58..6b3ae5e059 100644 --- a/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Words.java +++ b/servicetalk-grpc-protoc/src/main/java/io/servicetalk/grpc/protoc/Words.java @@ -30,6 +30,7 @@ final class Words { static final String ctx = "ctx"; static final String factory = "factory"; static final String onClose = "onClose"; + static final String onClosing = "onClosing"; static final String metadata = "metadata"; static final String request = "request"; static final String response = "response"; diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java index 3b54440359..fe50d1cf70 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java @@ -105,6 +105,11 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public Completable closeAsync() { return delegate.closeAsync(); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpLoadBalancerFactory.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpLoadBalancerFactory.java index 452acecac9..222fa08a33 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpLoadBalancerFactory.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/HttpLoadBalancerFactory.java @@ -178,6 +178,11 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public Completable closeAsync() { return delegate.closeAsync(); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientFilter.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientFilter.java index 5fb1a0a12e..3ec859c7f5 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientFilter.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientFilter.java @@ -63,6 +63,11 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public Completable closeAsync() { return delegate.closeAsync(); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToHttpClient.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToHttpClient.java index 790e0d019f..5a9d73fc28 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToHttpClient.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpClientToHttpClient.java @@ -89,6 +89,11 @@ public Completable onClose() { return client.onClose(); } + @Override + public Completable onClosing() { + return client.onClosing(); + } + @Override public Completable closeAsync() { return client.closeAsync(); @@ -184,6 +189,11 @@ public Completable onClose() { return connection.onClose(); } + @Override + public Completable onClosing() { + return connection.onClosing(); + } + @Override public Completable closeAsync() { return connection.closeAsync(); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnectionFilter.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnectionFilter.java index 8914158165..921e973af0 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnectionFilter.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnectionFilter.java @@ -68,6 +68,11 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public Completable closeAsync() { return delegate.closeAsync(); diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnectionToHttpConnection.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnectionToHttpConnection.java index 4c80a8d035..e834eb1274 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnectionToHttpConnection.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/StreamingHttpConnectionToHttpConnection.java @@ -101,6 +101,11 @@ public Completable onClose() { return connection.onClose(); } + @Override + public Completable onClosing() { + return connection.onClosing(); + } + @Override public Completable closeAsync() { return connection.closeAsync(); diff --git a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/AbstractHttpRequesterFilterTest.java b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/AbstractHttpRequesterFilterTest.java index c2f505f49e..526f5ba6a5 100644 --- a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/AbstractHttpRequesterFilterTest.java +++ b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/AbstractHttpRequesterFilterTest.java @@ -292,6 +292,11 @@ public Completable onClose() { return connection.onClose(); } + @Override + public Completable onClosing() { + return connection.onClosing(); + } + @Override public HttpExecutionContext executionContext() { return connection.executionContext(); diff --git a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java index 0586078f92..7e24a88fd1 100644 --- a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java +++ b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java @@ -107,6 +107,11 @@ public ConnectionContext parent() { return null; } + @Override + public Completable onClosing() { + return completed(); + } + @Override public Completable onClose() { return completed(); diff --git a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestStreamingHttpClient.java b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestStreamingHttpClient.java index f371d8e019..14311a8fff 100644 --- a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestStreamingHttpClient.java +++ b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestStreamingHttpClient.java @@ -79,6 +79,11 @@ public Completable onClose() { return closeable.onClose(); } + @Override + public Completable onClosing() { + return closeable.onClosing(); + } + @Override public StreamingHttpRequest newRequest(final HttpRequestMethod method, final String requestTarget) { return reqRespFactory.newRequest(method, requestTarget); @@ -144,6 +149,11 @@ public Completable onClose() { return rc.onClose(); } + @Override + public Completable onClosing() { + return rc.onClosing(); + } + @Override public Completable closeAsync() { return rc.closeAsync(); @@ -182,6 +192,11 @@ public Completable onClose() { return filterChain.onClose(); } + @Override + public Completable onClosing() { + return filterChain.onClosing(); + } + @Override public HttpExecutionContext executionContext() { return filterChain.executionContext(); diff --git a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestStreamingHttpConnection.java b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestStreamingHttpConnection.java index 6af78a1bec..8f15d4d962 100644 --- a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestStreamingHttpConnection.java +++ b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestStreamingHttpConnection.java @@ -56,6 +56,11 @@ public Completable onClose() { return closeable.onClose(); } + @Override + public Completable onClosing() { + return closeable.onClosing(); + } + @Override public Single request(final StreamingHttpRequest request) { return failed(new UnsupportedOperationException()); @@ -111,6 +116,11 @@ public Completable onClose() { return filterChain.onClose(); } + @Override + public Completable onClosing() { + return filterChain.onClosing(); + } + @Override public Single request(final StreamingHttpRequest request) { return filterChain.request(request); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLBHttpConnectionFactory.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLBHttpConnectionFactory.java index 79c0a3e10d..2beb99994b 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLBHttpConnectionFactory.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractLBHttpConnectionFactory.java @@ -31,16 +31,11 @@ import io.servicetalk.http.api.StreamingHttpConnectionFilterFactory; import io.servicetalk.http.api.StreamingHttpRequestResponseFactory; import io.servicetalk.transport.api.ConnectExecutionStrategy; -import io.servicetalk.transport.api.ConnectionContext; import io.servicetalk.transport.api.ExecutionStrategy; import io.servicetalk.transport.api.IoThreadFactory; import io.servicetalk.transport.api.TransportObserver; -import io.servicetalk.transport.netty.internal.NettyConnectionContext; import io.servicetalk.transport.netty.internal.NoopTransportObserver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.function.Function; import javax.annotation.Nullable; @@ -51,11 +46,6 @@ abstract class AbstractLBHttpConnectionFactory implements ConnectionFactory { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLBHttpConnectionFactory.class); - - private static boolean onClosingWarningLogged; - @Nullable private final StreamingHttpConnectionFilterFactory connectionFilterFunction; final ReadOnlyHttpClientConfig config; @@ -101,6 +91,11 @@ public Completable onClose() { return close.onClose(); } + @Override + public Completable onClosing() { + return close.onClosing(); + } + @Override public Completable closeAsync() { return close.closeAsync(); @@ -124,32 +119,10 @@ public final Single newConnection connectionFilterFunction != null ? connectionFilterFunction.create(conn) : conn; return protocolBinding.bind(filteredConnection, newConcurrencyController(filteredConnection.transportEventStream(MAX_CONCURRENCY), - onClosing(filteredConnection)), context); + filteredConnection.onClosing()), context); }); } - /** - * Extract {@link Completable} that notifies when connection is preparing to close. This helps to receive an earlier - * notification for the concurrency controller. - * - * @param connection {@link FilterableStreamingHttpConnection} - * @return {@link Completable} that notifies when connection is preparing to close - */ - private static Completable onClosing(final FilterableStreamingHttpConnection connection) { - ConnectionContext ctx = connection.connectionContext(); - if (ctx instanceof NettyConnectionContext) { - return ((NettyConnectionContext) ctx).onClosing(); - } - if (!onClosingWarningLogged) { - onClosingWarningLogged = true; - LOGGER.warn("{} connection was wrapped in the way concurrency controller can not access the early " + - "onClosing() event. Fallback to onClose(), this may cause a race between closing a connection " + - "and selecting it for the next request. Reconsider how connection filters do wrapping of " + - "FilterableStreamingHttpConnection#connectionContext() and/or contact support.", ctx); - } - return connection.onClose(); - } - /** * The ultimate source of connections before filtering. * @@ -168,6 +141,11 @@ public final Completable onClose() { return filterableConnectionFactory.onClose(); } + @Override + public final Completable onClosing() { + return filterableConnectionFactory.onClosing(); + } + @Override public final Completable closeAsync() { return filterableConnectionFactory.closeAsync(); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractStreamingHttpConnection.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractStreamingHttpConnection.java index 4e328d5e24..06d8dd3190 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractStreamingHttpConnection.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AbstractStreamingHttpConnection.java @@ -256,6 +256,11 @@ public final Completable onClose() { return connectionContext.onClose(); } + @Override + public final Completable onClosing() { + return connectionContext.onClosing(); + } + @Override public final Completable closeAsync() { return connectionContext.closeAsync(); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ConditionalHttpClientFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ConditionalHttpClientFilter.java index a1e7f1cf3c..4cdf567fca 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ConditionalHttpClientFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ConditionalHttpClientFilter.java @@ -79,4 +79,9 @@ public Completable closeAsyncGracefully() { public Completable onClose() { return closeable.onClose(); } + + @Override + public Completable onClosing() { + return closeable.onClosing(); + } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ConditionalHttpConnectionFilter.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ConditionalHttpConnectionFilter.java index c0a2d4b0e1..a36acc7fb2 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ConditionalHttpConnectionFilter.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/ConditionalHttpConnectionFilter.java @@ -77,4 +77,9 @@ public Completable closeAsyncGracefully() { public Completable onClose() { return closeable.onClose(); } + + @Override + public Completable onClosing() { + return closeable.onClosing(); + } } diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultHttpLoadBalancerFactory.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultHttpLoadBalancerFactory.java index d8d0e10feb..93e4795f45 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultHttpLoadBalancerFactory.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultHttpLoadBalancerFactory.java @@ -179,6 +179,11 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public Completable closeAsync() { return delegate.closeAsync(); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultMultiAddressUrlHttpClientBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultMultiAddressUrlHttpClientBuilder.java index 606e7be654..4ad2a1b5ae 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultMultiAddressUrlHttpClientBuilder.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultMultiAddressUrlHttpClientBuilder.java @@ -382,6 +382,11 @@ public Completable onClose() { return closeable.onClose(); } + @Override + public Completable onClosing() { + return closeable.onClosing(); + } + @Override public Completable closeAsync() { return closeable.closeAsync(); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultNettyHttpConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultNettyHttpConnectionContext.java index 442bc9d49f..b007fcfb41 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultNettyHttpConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultNettyHttpConnectionContext.java @@ -16,7 +16,6 @@ package io.servicetalk.http.netty; import io.servicetalk.concurrent.Cancellable; -import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.Single; import io.servicetalk.http.api.HttpConnectionContext; import io.servicetalk.http.api.HttpExecutionContext; @@ -76,11 +75,6 @@ public Single transportError() { return nettyConnectionContext.transportError(); } - @Override - public Completable onClosing() { - return nettyConnectionContext.onClosing(); - } - @Override public Channel nettyChannel() { return nettyConnectionContext.nettyChannel(); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java index 0150bc8dce..52c46f6227 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultPartitionedHttpClientBuilder.java @@ -195,6 +195,11 @@ public Completable onClose() { return group.onClose(); } + @Override + public Completable onClosing() { + return group.onClosing(); + } + @Override public Completable closeAsync() { return group.closeAsync(); @@ -239,6 +244,11 @@ public Completable onClose() { return close.onClose(); } + @Override + public Completable onClosing() { + return close.onClosing(); + } + @Override public Completable closeAsync() { return close.closeAsync(); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java index 50f7cc8df5..cdf2fc522e 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java @@ -756,6 +756,11 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public Completable closeAsync() { return delegate.closeAsync(); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/FilterableClientToClient.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/FilterableClientToClient.java index f870552c2b..b6cc411f92 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/FilterableClientToClient.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/FilterableClientToClient.java @@ -139,6 +139,11 @@ public Completable onClose() { return rc.onClose(); } + @Override + public Completable onClosing() { + return rc.onClosing(); + } + @Override public Completable closeAsync() { return rc.closeAsync(); @@ -172,6 +177,11 @@ public Completable onClose() { return client.onClose(); } + @Override + public Completable onClosing() { + return client.onClosing(); + } + @Override public Completable closeAsync() { return client.closeAsync(); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/GlobalDnsServiceDiscoverer.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/GlobalDnsServiceDiscoverer.java index 18a65434ed..fce0a31846 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/GlobalDnsServiceDiscoverer.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/GlobalDnsServiceDiscoverer.java @@ -135,6 +135,11 @@ public Completable onClose() { return closeable.onClose(); } + @Override + public Completable onClosing() { + return closeable.onClosing(); + } + @Override public Completable closeAsync() { return closeable.closeAsync(); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java index bd48b6f6a5..1ca0d01b61 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java @@ -462,18 +462,27 @@ public Completable onClose() { return parentContext.onClose(); } + @Override + public Completable onClosing() { + return parentContext.onClosing(); + } + @Override public Completable closeAsync() { - maxConcurrencyProcessor.onNext(ZERO_MAX_CONCURRENCY_EVENT); - maxConcurrencyProcessor.onComplete(); - return parentContext.closeAsync(); + return Completable.defer(() -> { + maxConcurrencyProcessor.onNext(ZERO_MAX_CONCURRENCY_EVENT); + maxConcurrencyProcessor.onComplete(); + return parentContext.closeAsync().shareContextOnSubscribe(); + }); } @Override public Completable closeAsyncGracefully() { - maxConcurrencyProcessor.onNext(ZERO_MAX_CONCURRENCY_EVENT); - maxConcurrencyProcessor.onComplete(); - return parentContext.closeAsyncGracefully(); + return Completable.defer(() -> { + maxConcurrencyProcessor.onNext(ZERO_MAX_CONCURRENCY_EVENT); + maxConcurrencyProcessor.onComplete(); + return parentContext.closeAsyncGracefully().shareContextOnSubscribe(); + }); } @Override @@ -505,11 +514,6 @@ public FlushStrategy defaultFlushStrategy() { public Single transportError() { return parentContext.transportError(); } - - @Override - public Completable onClosing() { - return parentContext.onClosing(); - } } private static final class MaxConcurrencyConsumableEvent implements ConsumableEvent { diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpClients.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpClients.java index 31d2e268ad..4983fa1e29 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpClients.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpClients.java @@ -319,6 +319,11 @@ public Completable onClose() { return closeable.onClose(); } + @Override + public Completable onClosing() { + return closeable.onClosing(); + } + @Override public Completable closeAsync() { return closeable.closeAsync(); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancedStreamingHttpClient.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancedStreamingHttpClient.java index 5f5ea1350a..2597a077fb 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancedStreamingHttpClient.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancedStreamingHttpClient.java @@ -178,6 +178,11 @@ public Completable onClose() { return loadBalancer.onClose(); } + @Override + public Completable onClosing() { + return loadBalancer.onClosing(); + } + @Override public Completable closeAsync() { return loadBalancer.closeAsync(); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java index 026428860a..38dbb2da2c 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyHttpServer.java @@ -236,6 +236,11 @@ public Completable onClose() { return asyncCloseable.onClose(); } + @Override + public Completable onClosing() { + return asyncCloseable.onClosing(); + } + @Override public String toString() { return delegate.toString(); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java index 0deffb9a0c..b7ef548bd7 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AbstractNettyHttpServerTest.java @@ -82,7 +82,7 @@ import static io.servicetalk.transport.api.ConnectionAcceptor.ACCEPT_ALL; import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress; import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort; -import static io.servicetalk.utils.internal.PlatformDependent.throwException; +import static io.servicetalk.utils.internal.ThrowableUtils.throwException; import static java.lang.Boolean.parseBoolean; import static java.lang.Thread.NORM_PRIORITY; import static java.nio.charset.StandardCharsets.US_ASCII; diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/CloseUtils.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/CloseUtils.java index 85350c435b..70870ea60e 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/CloseUtils.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/CloseUtils.java @@ -44,6 +44,9 @@ private CloseUtils() { * @param closingStarted a {@link CountDownLatch} to notify */ static void onGracefulClosureStarted(ConnectionContext cc, CountDownLatch closingStarted) { + // cc.onClosing() will trigger on the leading edge of closure, which maybe when the user calls closeAsync(). + // The tests that depend upon this method need to wait until protocol events occur to ensure no more data will + // be processed, which isn't the same as cc.onClosing(). NettyConnectionContext nettyCtx = (NettyConnectionContext) cc; if (cc.protocol() == HTTP_1_1) { nettyCtx.transportError().subscribe(t -> { diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionFactoryFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionFactoryFilterTest.java index 672a380f1b..ba725bd0f2 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionFactoryFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionFactoryFilterTest.java @@ -18,9 +18,7 @@ import io.servicetalk.client.api.ConnectionFactoryFilter; import io.servicetalk.client.api.DelegatingConnectionFactory; import io.servicetalk.concurrent.Cancellable; -import io.servicetalk.concurrent.CompletableSource; import io.servicetalk.concurrent.api.AsyncCloseables; -import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.CompositeCloseable; import io.servicetalk.concurrent.api.Single; import io.servicetalk.context.api.ContextMap; @@ -55,8 +53,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor; -import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress; import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort; import static org.hamcrest.MatcherAssert.assertThat; @@ -120,18 +116,15 @@ void wrapConnection() throws Exception { @Test void onClosingIsDelegated() throws Exception { - CompletableSource.Processor onClosing = newCompletableProcessor(); client = clientBuilder.appendConnectionFactoryFilter( - newConnectionFactoryFilter(delegate -> - new NettyConnectionContextReturningConnection(delegate, onClosing))) + newConnectionFactoryFilter(NettyConnectionContextReturningConnection::new)) .buildBlocking(); ReservedStreamingHttpConnection con = client.asStreamingClient() .reserveConnection(client.get("/")) .toFuture().get(); - NettyConnectionContext ctx = (NettyConnectionContext) con.connectionContext(); - onClosing.onComplete(); - ctx.onClosing().toFuture().get(); + con.closeAsyncGracefully().subscribe(); + con.onClosing().toFuture().get(); } private static HttpResponse sendRequest(BlockingHttpClient client) throws Exception { @@ -190,10 +183,9 @@ static void assertResponseHeader(HttpResponseMetaData metaData) { private static final class NettyConnectionContextReturningConnection extends StreamingHttpConnectionFilter { private final HttpConnectionContext ctx; - NettyConnectionContextReturningConnection(final FilterableStreamingHttpConnection delegate, - final CompletableSource.Processor onClosing) { + NettyConnectionContextReturningConnection(final FilterableStreamingHttpConnection delegate) { super(delegate); - ctx = new DelegatingNettyConnectionContext(delegate.connectionContext(), onClosing); + ctx = new DelegatingNettyConnectionContext(delegate.connectionContext()); } @Override @@ -206,13 +198,10 @@ private static final class DelegatingNettyConnectionContext extends DelegatingHt implements NettyConnectionContext { private final NettyConnectionContext delegate; - private final CompletableSource.Processor onClosing; - DelegatingNettyConnectionContext(final HttpConnectionContext delegate, - final CompletableSource.Processor onClosing) { + DelegatingNettyConnectionContext(final HttpConnectionContext delegate) { super(delegate); this.delegate = (NettyConnectionContext) delegate; - this.onClosing = onClosing; } @Override @@ -234,10 +223,5 @@ public FlushStrategy defaultFlushStrategy() { public Single transportError() { return delegate.transportError(); } - - @Override - public Completable onClosing() { - return fromSource(onClosing); - } } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionFactoryOffloadingTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionFactoryOffloadingTest.java index 55979b6780..c43a4b838c 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionFactoryOffloadingTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ConnectionFactoryOffloadingTest.java @@ -93,6 +93,11 @@ public Completable onClose() { return close.onClose(); } + @Override + public Completable onClosing() { + return close.onClosing(); + } + @Override public Completable closeAsync() { return close.closeAsync(); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpClientsCompileTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpClientsCompileTest.java index 7e07169192..87b169ff1f 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpClientsCompileTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpClientsCompileTest.java @@ -60,6 +60,11 @@ public Completable onClose() { return Completable.completed(); } + @Override + public Completable onClosing() { + return Completable.completed(); + } + @Override public Completable closeAsync() { return Completable.completed(); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java index d69a0547be..36b6ffc4fd 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyHttpServerConnectionDrainTest.java @@ -179,11 +179,21 @@ public Completable onClose() { return serverContext.onClose(); } + @Override + public Completable onClosing() { + return serverContext.onClosing(); + } + @Override public Completable closeAsync() { return serverContext.closeAsync(); } + @Override + public Completable closeAsyncGracefully() { + return serverContext.closeAsyncGracefully(); + } + @Override public void close() { // Without draining the request is expected to hang, don't wait too long unless on CI diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java index 302a3aaf73..5ecdfcbeea 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/RetryingHttpRequesterFilterTest.java @@ -262,6 +262,11 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public Completable closeAsync() { return delegate.closeAsync(); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerGracefulConnectionClosureHandlingTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerGracefulConnectionClosureHandlingTest.java index d9f17b0eee..2ef14f1e66 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerGracefulConnectionClosureHandlingTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerGracefulConnectionClosureHandlingTest.java @@ -78,8 +78,6 @@ void setUp() throws Exception { @Override public Completable accept(final ConnectionContext context) { CloseUtils.onGracefulClosureStarted(context, serverConnectionClosing); - // ((NettyHttpServerConnection) context).onClosing() - // .whenFinally(serverConnectionClosing::countDown).subscribe(); context.onClose().whenFinally(serverConnectionClosed::countDown).subscribe(); return completed(); } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/TimeoutHttpRequesterFilterTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/TimeoutHttpRequesterFilterTest.java index e4c1240642..3c17adcfd5 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/TimeoutHttpRequesterFilterTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/TimeoutHttpRequesterFilterTest.java @@ -25,7 +25,6 @@ import io.servicetalk.http.api.StreamingHttpResponse; import io.servicetalk.http.utils.BeforeFinallyHttpOperator; import io.servicetalk.http.utils.TimeoutHttpRequesterFilter; -import io.servicetalk.transport.netty.internal.NettyConnectionContext; import org.junit.jupiter.api.Test; @@ -53,7 +52,7 @@ private enum Event { @Test void onClosingWinsOnError() throws Exception { connectionFilterFactory(appendConnectionFilter(c -> { - ((NettyConnectionContext) c.connectionContext()).onClosing() + c.connectionContext().onClosing() .subscribe(() -> firstEventRef.compareAndSet(null, Event.ON_CLOSING)); return new StreamingHttpConnectionFilter(c) { @Override diff --git a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/PayloadSizeLimitingHttpRequesterFilterTest.java b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/PayloadSizeLimitingHttpRequesterFilterTest.java index 6ad12ed230..a5cf60642e 100644 --- a/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/PayloadSizeLimitingHttpRequesterFilterTest.java +++ b/servicetalk-http-utils/src/test/java/io/servicetalk/http/utils/PayloadSizeLimitingHttpRequesterFilterTest.java @@ -111,6 +111,11 @@ public Completable onClose() { return closeable.onClose(); } + @Override + public Completable onClosing() { + return closeable.onClosing(); + } + @Override public Completable closeAsync() { return closeable.closeAsync(); diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 48c832d328..ca6a80aaec 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -235,7 +235,7 @@ private List> markHostAsExpired( private Host createHost(ResolvedAddress addr) { Host host = new Host<>(targetResource, addr, healthCheckConfig); - host.onClose().afterFinally(() -> + host.onClosing().afterFinally(() -> usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, previousHosts -> { @SuppressWarnings("unchecked") List> previousHostsTyped = @@ -438,6 +438,11 @@ public Completable onClose() { return asyncCloseable.onClose(); } + @Override + public Completable onClosing() { + return asyncCloseable.onClosing(); + } + @Override public Completable closeAsync() { return asyncCloseable.closeAsync(); @@ -635,7 +640,7 @@ previous, new ConnState(newList, newState))) { LOGGER.trace("Load balancer for {}: added a new connection {} to {} after {} attempt(s).", targetResource, connection, this, addAttempt); // Instrument the new connection so we prune it on close - connection.onClose().beforeFinally(() -> { + connection.onClosing().beforeFinally(() -> { int removeAttempt = 0; for (;;) { ++removeAttempt; @@ -708,14 +713,20 @@ public Completable onClose() { return closeable.onClose(); } + @Override + public Completable onClosing() { + return closeable.onClosing(); + } + @SuppressWarnings("unchecked") private Completable doClose(final Function closeFunction) { return Completable.defer(() -> { final ConnState oldState = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE); cancelIfHealthCheck(oldState.state); final Object[] connections = oldState.connections; - return connections.length == 0 ? completed() : - from(connections).flatMapCompletableDelayError(conn -> closeFunction.apply((C) conn)); + return (connections.length == 0 ? completed() : + from(connections).flatMapCompletableDelayError(conn -> closeFunction.apply((C) conn))) + .shareContextOnSubscribe(); }); } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java index d8edc21833..846d95cebf 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java @@ -760,6 +760,7 @@ private TestLoadBalancedConnection newConnection(final String address) { when(cnx.closeAsync()).thenReturn(closeable.closeAsync()); when(cnx.closeAsyncGracefully()).thenReturn(closeable.closeAsyncGracefully()); when(cnx.onClose()).thenReturn(closeable.onClose()); + when(cnx.onClosing()).thenReturn(closeable.onClosing()); when(cnx.address()).thenReturn(address); when(cnx.toString()).thenReturn(address + '@' + cnx.hashCode()); diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/DelegatingConnectionContext.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/DelegatingConnectionContext.java index 748366e2d0..db2b511f0e 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/DelegatingConnectionContext.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/DelegatingConnectionContext.java @@ -98,6 +98,11 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public Completable closeAsync() { return delegate.closeAsync(); diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/AbstractNettyIoExecutor.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/AbstractNettyIoExecutor.java index e98733f2b6..5b02c4a698 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/AbstractNettyIoExecutor.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/AbstractNettyIoExecutor.java @@ -16,6 +16,7 @@ package io.servicetalk.transport.netty.internal; import io.servicetalk.concurrent.Cancellable; +import io.servicetalk.concurrent.CompletableSource; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.Executor; @@ -26,13 +27,15 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor; +import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static java.util.concurrent.TimeUnit.NANOSECONDS; abstract class AbstractNettyIoExecutor implements NettyIoExecutor, Executor { - protected final boolean isIoThreadSupported; protected final T eventLoop; protected final boolean interruptOnCancel; + private final CompletableSource.Processor closingProcessor = newCompletableProcessor(); AbstractNettyIoExecutor(T eventLoop, boolean interruptOnCancel) { this(eventLoop, interruptOnCancel, false); @@ -42,16 +45,24 @@ abstract class AbstractNettyIoExecutor implements Nett this.eventLoop = eventLoop; this.interruptOnCancel = interruptOnCancel; this.isIoThreadSupported = isIoThreadSupported; + try { + // Best effort completion of closingProcessor if the EventLoop is closed outside the scope of this Object + eventLoop.terminationFuture().addListener(f -> closingProcessor.onComplete()); + } catch (Throwable ignored) { + // EmbeddedEventLoop doesn't support this method. + } } @Override public Completable closeAsync() { - return new NettyFutureCompletable(() -> eventLoop.shutdownGracefully(0, 0, NANOSECONDS)); + return new NettyFutureCompletable(() -> eventLoop.shutdownGracefully(0, 0, NANOSECONDS)) + .beforeOnSubscribe(c -> closingProcessor.onComplete()); } @Override public Completable closeAsyncGracefully() { - return new NettyFutureCompletable(eventLoop::shutdownGracefully); + return new NettyFutureCompletable(eventLoop::shutdownGracefully) + .beforeOnSubscribe(c -> closingProcessor.onComplete()); } @Override @@ -59,6 +70,11 @@ public final Completable onClose() { return new NettyFutureCompletable(eventLoop::terminationFuture); } + @Override + public Completable onClosing() { + return fromSource(closingProcessor); + } + @Override public final boolean isUnixDomainSocketSupported() { return NativeTransportUtils.isUnixDomainSocketSupported(eventLoop); diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/ChannelSet.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/ChannelSet.java index 1ccb0a592e..28247f783f 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/ChannelSet.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/ChannelSet.java @@ -66,6 +66,7 @@ public void operationComplete(final ChannelFuture future) { private final Map channelMap = new ConcurrentHashMap<>(); private final Processor onCloseProcessor = newCompletableProcessor(); + private final Processor onClosingProcessor = newCompletableProcessor(); private final Completable onClose; @SuppressWarnings("unused") private volatile int state; @@ -113,6 +114,7 @@ protected void handleSubscribe(final Subscriber subscriber) { return; } + onClosingProcessor.onComplete(); if (channelMap.isEmpty()) { onCloseProcessor.onComplete(); return; @@ -137,6 +139,7 @@ protected void handleSubscribe(final Subscriber subscriber) { return; } + onClosingProcessor.onComplete(); if (channelMap.isEmpty()) { toSource(onClose).subscribe(subscriber); onCloseProcessor.onComplete(); @@ -144,7 +147,6 @@ protected void handleSubscribe(final Subscriber subscriber) { } CompositeCloseable closeable = newCompositeCloseable(); - for (final Channel channel : channelMap.values()) { Attribute closeableAttribute = channel.attr(CHANNEL_CLOSEABLE_KEY); @@ -181,4 +183,9 @@ public Completable closeAsyncGracefully() { public Completable onClose() { return onClose; } + + @Override + public Completable onClosing() { + return fromSource(onClosingProcessor); + } } diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/GlobalExecutionContext.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/GlobalExecutionContext.java index 1f812979d5..6a4d7ed0a3 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/GlobalExecutionContext.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/GlobalExecutionContext.java @@ -19,6 +19,7 @@ import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.Executors; +import io.servicetalk.concurrent.api.Single; import io.servicetalk.transport.api.DefaultExecutionContext; import io.servicetalk.transport.api.ExecutionContext; import io.servicetalk.transport.api.ExecutionStrategy; @@ -28,8 +29,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; +import java.util.concurrent.Callable; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; +import java.util.function.Supplier; import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; import static io.servicetalk.transport.api.ExecutionStrategy.offloadAll; @@ -103,6 +108,11 @@ public Completable onClose() { return delegate.onClose(); } + @Override + public Completable onClosing() { + return delegate.onClosing(); + } + @Override public boolean isUnixDomainSocketSupported() { return delegate.isUnixDomainSocketSupported(); @@ -118,6 +128,11 @@ public boolean isIoThreadSupported() { return delegate.isIoThreadSupported(); } + @Override + public BooleanSupplier shouldOffloadSupplier() { + return delegate.shouldOffloadSupplier(); + } + @Override public boolean isCurrentThreadEventLoop() { return delegate.isCurrentThreadEventLoop(); @@ -139,6 +154,11 @@ public Executor asExecutor() { return delegate.asExecutor(); } + @Override + public long currentTime(final TimeUnit unit) { + return delegate.currentTime(unit); + } + @Override public Cancellable execute(final Runnable task) throws RejectedExecutionException { return delegate.execute(task); @@ -149,6 +169,41 @@ public Cancellable schedule(final Runnable task, final long delay, final TimeUni throws RejectedExecutionException { return delegate.schedule(task, delay, unit); } + + @Override + public Cancellable schedule(final Runnable task, final Duration delay) throws RejectedExecutionException { + return delegate.schedule(task, delay); + } + + @Override + public Completable timer(final long delay, final TimeUnit unit) { + return delegate.timer(delay, unit); + } + + @Override + public Completable timer(final Duration delay) { + return delegate.timer(delay); + } + + @Override + public Completable submit(final Runnable runnable) { + return delegate.submit(runnable); + } + + @Override + public Completable submitRunnable(final Supplier runnableSupplier) { + return delegate.submitRunnable(runnableSupplier); + } + + @Override + public Single submit(final Callable callable) { + return delegate.submit(callable); + } + + @Override + public Single submitCallable(final Supplier> callableSupplier) { + return delegate.submitCallable(callableSupplier); + } } private static void log(final Logger logger, final String name, final String methodName) { diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyChannelListenableAsyncCloseable.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyChannelListenableAsyncCloseable.java index ddd8c26c7d..01a34dd764 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyChannelListenableAsyncCloseable.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyChannelListenableAsyncCloseable.java @@ -101,6 +101,7 @@ protected final void notifyOnClosing() { * * @return a {@link Completable} that notifies when the connection has begun its closing sequence. */ + @Override public final Completable onClosing() { return fromSource(onClosing); } @@ -117,12 +118,12 @@ public final Completable closeAsyncGracefully() { @Override public final Completable closeAsyncNoOffload() { - return closeAsync(onCloseNoOffload()); + return closeAsync(onCloseNoOffload); } @Override public final Completable closeAsyncGracefullyNoOffload() { - return closeAsyncGracefully(onCloseNoOffload()); + return closeAsyncGracefully(onCloseNoOffload); } private Completable closeAsync(Completable source) { @@ -162,10 +163,6 @@ public final Completable onClose() { return onClose; } - final Completable onCloseNoOffload() { - return onCloseNoOffload; - } - /** * Get access to the underlying {@link Channel}. * diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyConnectionContext.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyConnectionContext.java index 69d8dc5d7e..9cb3c8a92e 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyConnectionContext.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyConnectionContext.java @@ -16,7 +16,6 @@ package io.servicetalk.transport.netty.internal; import io.servicetalk.concurrent.Cancellable; -import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.Single; import io.servicetalk.transport.api.ConnectionContext; @@ -58,18 +57,6 @@ public interface NettyConnectionContext extends ConnectionContext { */ Single transportError(); - /** - * Returns a {@link Completable} that notifies when the connection has begun its closing sequence. - *

- * Note:The {@code Completable} is not required to be blocking-safe and should be offloaded if the - * {@link io.servicetalk.concurrent.CompletableSource.Subscriber} may block. - * - * @return a {@link Completable} that notifies when the connection has begun its closing sequence. A configured - * {@link CloseHandler} will determine whether more reads or writes will be allowed on this - * {@link NettyConnectionContext}. - */ - Completable onClosing(); - /** * Return the Netty {@link Channel} backing this connection. * diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java index f8b38db457..c88679b7c0 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/NettyServerContext.java @@ -110,4 +110,9 @@ public Completable closeAsyncGracefully() { public Completable onClose() { return closeable.onClose(); } + + @Override + public Completable onClosing() { + return closeable.onClosing(); + } }