Skip to content

Commit

Permalink
Add ListenableAsyncCloseable#onClosing() (#2430)
Browse files Browse the repository at this point in the history
Motivation:
It is useful to get a notified on the leading edge when closing beings.
This may drive decisions like stopping to use a connection or resources
that is being closed and not suitable for reuse.

Modifications:
- Use existing transport mechansims to drive onClosing for connections.
- Remove NettyConnectionContext override of onClosing and defer to the base class.
  • Loading branch information
Scottmitch authored Dec 1, 2022
1 parent 344083e commit 557f530
Show file tree
Hide file tree
Showing 63 changed files with 474 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public Completable onClose() {
return partitionMap.onClose();
}

@Override
public Completable onClosing() {
return partitionMap.onClosing();
}

@Override
public Completable closeAsync() {
// Cancel doesn't provide any status and is assumed to complete immediately so we just cancel when subscribe
Expand Down Expand Up @@ -209,11 +214,21 @@ public Completable onClose() {
return close.onClose();
}

@Override
public Completable onClosing() {
return close.onClosing();
}

@Override
public Completable closeAsync() {
return close.closeAsync();
}

@Override
public Completable closeAsyncGracefully() {
return close.closeAsyncGracefully();
}

static final class MutableInt {
int value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ public Completable onClose() {
return asyncCloseable.onClose();
}

@Override
public Completable onClosing() {
return asyncCloseable.onClosing();
}

@Override
public Completable closeAsync() {
return asyncCloseable.closeAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ public Completable onClose() {
return asyncCloseable.onClose();
}

@Override
public Completable onClosing() {
return asyncCloseable.onClosing();
}

@Override
public Completable closeAsync() {
return asyncCloseable.closeAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public Completable onClose() {
return delegate.onClose();
}

@Override
public Completable onClosing() {
return delegate.onClosing();
}

@Override
public Completable closeAsync() {
return delegate.closeAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,72 +15,44 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.CompletableSource;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;

import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
import static io.servicetalk.concurrent.api.AsyncCloseables.toAsyncCloseable;
import static io.servicetalk.concurrent.api.AsyncCloseables.toListenableAsyncCloseable;
import static io.servicetalk.concurrent.api.Completable.completed;
import static io.servicetalk.concurrent.api.Completable.defer;

abstract class AbstractExecutor implements Executor {

private static final AtomicReferenceFieldUpdater<AbstractExecutor, CompletableProcessor>
onCloseUpdater = newUpdater(AbstractExecutor.class, CompletableProcessor.class, "onClose");

@SuppressWarnings("unused")
@Nullable
private volatile CompletableProcessor onClose;
private final ListenableAsyncCloseable listenableAsyncCloseable = toListenableAsyncCloseable(
toAsyncCloseable(graceful -> defer(() -> {
// If closeAsync() is subscribed multiple times, we will call this method as many times.
// Since doClose() is idempotent and usually cheap, it is OK as compared to implementing at most
// once semantics.
doClose();
return completed().shareContextOnSubscribe();
})));

@Override
public Completable onClose() {
return getOrCreateOnClose();
return listenableAsyncCloseable.onClose();
}

@Override
public Completable onClosing() {
return listenableAsyncCloseable.onClosing();
}

@Override
public Completable closeAsync() {
return new CloseAsync();
return listenableAsyncCloseable.closeAsync();
}

private CompletableProcessor getOrCreateOnClose() {
CompletableProcessor onClose = this.onClose;
if (onClose != null) {
return onClose;
}
final CompletableProcessor newOnClose = new CompletableProcessor();
if (onCloseUpdater.compareAndSet(this, null, newOnClose)) {
return newOnClose;
}
onClose = this.onClose;
assert onClose != null;
return onClose;
@Override
public Completable closeAsyncGracefully() {
return listenableAsyncCloseable.closeAsyncGracefully();
}

/**
* Do any close actions required for this {@link Executor}.
* This method MUST be idempotent.
*/
abstract void doClose();

private final class CloseAsync extends Completable implements CompletableSource {
@Override
protected void handleSubscribe(Subscriber subscriber) {
CompletableProcessor onClose = getOrCreateOnClose();
onClose.subscribeInternal(subscriber);
try {
// If closeAsync() is subscribed multiple times, we will call this method as many times.
// Since doClose() is idempotent and usually cheap, it is OK as compared to implementing at most
// once semantics.
doClose();
} catch (Throwable cause) {
onClose.onError(cause);
return;
}
onClose.onComplete();
}

@Override
public void subscribe(final Subscriber subscriber) {
subscribeInternal(subscriber);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,15 @@ public static ListenableAsyncCloseable toListenableAsyncCloseable(
return new ListenableAsyncCloseable() {

private final CompletableProcessor onCloseProcessor = new CompletableProcessor();
private final CompletableProcessor onClosing = new CompletableProcessor();
private final Completable onClose = onCloseDecorator.apply(onCloseProcessor);

@Override
public Completable closeAsyncGracefully() {
return new SubscribableCompletable() {
@Override
protected void handleSubscribe(final Subscriber subscriber) {
onClosing.onComplete();
asyncCloseable.closeAsyncGracefully().subscribeInternal(onCloseProcessor);
onClose.subscribeInternal(subscriber);
}
Expand All @@ -125,11 +127,17 @@ public Completable onClose() {
return onClose;
}

@Override
public Completable onClosing() {
return onClosing;
}

@Override
public Completable closeAsync() {
return new SubscribableCompletable() {
@Override
protected void handleSubscribe(final Subscriber subscriber) {
onClosing.onComplete();
asyncCloseable.closeAsync().subscribeInternal(onCloseProcessor);
onClose.subscribeInternal(subscriber);
}
Expand Down Expand Up @@ -184,6 +192,7 @@ private static final class DefaultAsyncCloseable implements ListenableAsyncClose
private static final AtomicIntegerFieldUpdater<DefaultAsyncCloseable> closedUpdater =
newUpdater(DefaultAsyncCloseable.class, "closed");
private final CloseableResource closeableResource;
private final CompletableProcessor onClosing = new CompletableProcessor();
private final CompletableProcessor onClose = new CompletableProcessor();

@SuppressWarnings("unused")
Expand All @@ -200,6 +209,7 @@ public Completable closeAsync() {
protected void handleSubscribe(final Subscriber subscriber) {
onClose.subscribeInternal(subscriber);
if (closedUpdater.getAndSet(DefaultAsyncCloseable.this, HARD_CLOSE) != HARD_CLOSE) {
onClosing.onComplete();
closeableResource.doClose(false).subscribeInternal(onClose);
}
}
Expand All @@ -213,6 +223,7 @@ public Completable closeAsyncGracefully() {
protected void handleSubscribe(final Subscriber subscriber) {
onClose.subscribeInternal(subscriber);
if (closedUpdater.compareAndSet(DefaultAsyncCloseable.this, IDLE, CLOSED_GRACEFULLY)) {
onClosing.onComplete();
closeableResource.doClose(true).subscribeInternal(onClose);
}
}
Expand All @@ -223,6 +234,11 @@ protected void handleSubscribe(final Subscriber subscriber) {
public Completable onClose() {
return onClose;
}

@Override
public Completable onClosing() {
return onClosing;
}
}

private abstract static class SubscribableCompletable extends Completable implements CompletableSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.concurrent.Cancellable;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static java.util.Objects.requireNonNull;
Expand All @@ -43,6 +44,11 @@ public Cancellable schedule(final Runnable task, final long delay, final TimeUni
return delegate.schedule(new ContextPreservingRunnable(task), delay, unit);
}

@Override
public Cancellable schedule(final Runnable task, final Duration delay) {
return delegate.schedule(new ContextPreservingRunnable(task), delay);
}

@Override
public long currentTime(TimeUnit unit) {
return delegate.currentTime(unit);
Expand All @@ -53,13 +59,25 @@ public Completable onClose() {
return delegate.onClose();
}

@Override
public Completable onClosing() {
return delegate.onClosing();
}

@Override
public Completable closeAsync() {
return delegate.closeAsync();
}

@Override
public Completable closeAsyncGracefully() {
return delegate.closeAsyncGracefully();
}

static Executor of(Executor delegate) {
return delegate instanceof ContextPreservingStExecutor ? delegate :
new ContextPreservingStExecutor(delegate);
}

// Don't override methods that return an async source. Context will be captured at subscribe by the async source.
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public Completable onClose() {
return delegate.onClose();
}

@Override
public Completable onClosing() {
return delegate.onClosing();
}

@Override
public Completable closeAsync() {
return delegate.closeAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,21 @@ public interface ListenableAsyncCloseable extends AsyncCloseable {
* @return the {@code Completable} that is notified on close.
*/
Completable onClose();

/**
* Returns a {@link Completable} that is notified when closing begins.
* <p>
* Closing begin might be when a close operation is initiated locally (e.g. subscribing to {@link #closeAsync()}) or
* it could also be a transport event received from a remote peer (e.g. read a {@code connection: close} header).
* <p>
* For backwards compatibility this method maybe functionally equivalent to {@link #onClose()}. Therefore, provides
* a best-effort leading edge notification of closing, but may fall back to notification on trailing edge.
* <p>
* The goal of this method is often to notify asap when closing so this method may not be offloaded and care must
* be taken to avoid blocking if subscribing to the return {@link Completable}.
* @return a {@link Completable} that is notified when closing begins.
*/
default Completable onClosing() {
return onClose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ public Completable onClose() {
return asyncCloseable.onClose();
}

@Override
public Completable onClosing() {
return asyncCloseable.onClosing();
}

@Override
public Completable closeAsync() {
return asyncCloseable.closeAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public Completable onClose() {
return client.onClose();
}

@Override
public Completable onClosing() {
return client.onClosing();
}

@Override
public Completable closeAsync() {
return client.closeAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public Completable onClose() {
return dns.onClose();
}

@Override
public Completable onClosing() {
return dns.onClosing();
}

@Override
public Publisher<Collection<ServiceDiscovererEvent<InetSocketAddress>>> discover(final String s) {
return dns.dnsSrvQuery(s);
Expand Down Expand Up @@ -103,6 +108,11 @@ public Completable onClose() {
return dns.onClose();
}

@Override
public Completable onClosing() {
return dns.onClosing();
}

@Override
public Publisher<Collection<ServiceDiscovererEvent<InetSocketAddress>>> discover(
final HostAndPort hostAndPort) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ public Completable onClose() {
return streamingHttpClient.onClose();
}

@Override
public Completable onClosing() {
return streamingHttpClient.onClosing();
}

/**
* Determines the timeout for a new request using three potential sources; the deadline in the async context, the
* request timeout, and the client default. The timeout will be the lesser of the context and request timeouts or if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public ConnectionContext parent() {
return connectionContext.parent();
}

@Override
public Completable onClosing() {
return connectionContext.onClosing();
}

@Override
public Completable onClose() {
return connectionContext.onClose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ public Completable onClose() {
return delegate.onClose();
}

@Override
public Completable onClosing() {
return delegate.onClosing();
}

@Override
public SocketAddress listenAddress() {
return delegate.listenAddress();
Expand Down
Loading

0 comments on commit 557f530

Please sign in to comment.