Skip to content

Commit

Permalink
Merge pull request #192 from bosch-io/bugfix/search-thread-leak
Browse files Browse the repository at this point in the history
Move creation of single threaded executor back into ThingSearchSubscription for spec conformance (Rule 1.03)
  • Loading branch information
Yannic92 authored Sep 2, 2022
2 parents 2e9bb2a + 103ed26 commit 117bc2d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@

import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -33,8 +30,6 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import javax.annotation.Nullable;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
Expand All @@ -59,7 +54,6 @@ public final class SpliteratorSubscriber<T> implements Subscriber<T>, Spliterato
private final AtomicInteger splits;
private final AtomicInteger quota;
private final AtomicBoolean cancelled;
@Nullable private volatile ExecutorService subscriptionExecutor;

private SpliteratorSubscriber(final long timeoutMillis, final int bufferSize, final int batchSize) {
// reserve 2*bufferSize+1 space in buffer for <=bufferSize elements and bufferSize+1 EOS markers
Expand Down Expand Up @@ -116,18 +110,14 @@ public void onSubscribe(final Subscription s) {
previousSubscription = subscription.get();
if (previousSubscription == null) {
subscription.set(s);
if (s instanceof ThingSearchSubscription) {
subscriptionExecutor = Executors.newSingleThreadExecutor();
((ThingSearchSubscription) s).setSingleThreadedExecutor(subscriptionExecutor);
}
}
}
if (previousSubscription == null) {
LOGGER.trace("Initial request: <{}>", capacity);
s.request(capacity);
} else {
LOGGER.warn("onSubscribe() called a second time; cancelling subscription <{}>.", s);
s.cancel();
cancelSubscription(s);
}
}

Expand All @@ -153,12 +143,15 @@ public void onComplete() {

private void goToCancelledState() {
if (!cancelled.getAndSet(true)) {
if (subscriptionExecutor != null) {
Objects.requireNonNull(subscriptionExecutor).shutdown();
}
ThingSearchSubscription.terminate(subscription.get());
}
}

private void cancelSubscription(final Subscription s) {
s.cancel();
ThingSearchSubscription.terminate(s);
}

// always cancel the stream on error thrown, because user code catching the error is outside
// the element handling code and should consider this spliterator "used up."
// as a precaution, the error is propagated to all threads reading from this spliterator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
package org.eclipse.ditto.client.streaming;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -54,7 +56,7 @@ public final class ThingSearchSubscription implements Subscription {
private final Subscriber<? super SubscriptionHasNextPage> subscriber;
private final AtomicBoolean cancelled;
private final AtomicReference<AdaptableBus.SubscriptionId> busSubscription;
@Nullable private volatile ExecutorService singleThreadedExecutor;
private final ExecutorService singleThreadedExecutorService;

private ThingSearchSubscription(final String subscriptionId,
final ProtocolAdapter protocolAdapter,
Expand All @@ -66,6 +68,8 @@ private ThingSearchSubscription(final String subscriptionId,
this.subscriber = subscriber;
cancelled = new AtomicBoolean(false);
busSubscription = new AtomicReference<>();

singleThreadedExecutorService = Executors.newSingleThreadExecutor();
}

/**
Expand All @@ -89,10 +93,29 @@ public static void start(final SubscriptionCreated event,
thingSearchSubscription.startForwarding();
}

/**
* Terminate the executor of any {@code ThingSearchSubscription}, or do nothing if the subscription is not a
* {@code ThingSearchSubscription}.
* After upgrading to Java 9, it is better to replace this method by the {@code Cleaner} interface.
*
* @param subscription the subscription.
* @since 3.0.0
*/
public static void terminate(@Nullable final Subscription subscription) {
if (subscription instanceof ThingSearchSubscription) {
final ThingSearchSubscription s = (ThingSearchSubscription) subscription;
try {
s.singleThreadedExecutorService.submit(s.singleThreadedExecutorService::shutdown);
} catch (final RejectedExecutionException e) {
// executor already shut down
}
}
}

// called by subscriber
@Override
public void request(final long n) {
singleThreaded(() -> {
singleThreadedExecutorService.submit(() -> {
if (n <= 0) {
doCancel();
subscriber.onError(new IllegalArgumentException("Expect positive demand, got: " + n));
Expand All @@ -110,26 +133,9 @@ public void request(final long n) {
// called by subscriber
@Override
public void cancel() {
singleThreaded(this::doCancel);
}

/**
* Set the single-threaded executor of this subscription.
* Subscription methods run in the executor in order to maintain element order.
* Creating the executor within this class is not possible because the garbage collector may not stop the executor.
*
* @param singleThreadedExecutor The single-threaded executor.
*/
public void setSingleThreadedExecutor(final ExecutorService singleThreadedExecutor) {
// TODO: After upgrading to Java 9, consider using the Cleaner interface instead.
this.singleThreadedExecutor = singleThreadedExecutor;
}

private void singleThreaded(final Runnable runnable) {
if (singleThreadedExecutor != null) {
Objects.requireNonNull(singleThreadedExecutor).submit(runnable);
} else {
runnable.run();
if (!singleThreadedExecutorService.isShutdown() && !singleThreadedExecutorService.isTerminated()) {
CompletableFuture.runAsync(this::doCancel, singleThreadedExecutorService)
.whenComplete((result, error) -> singleThreadedExecutorService.shutdownNow());
}
}

Expand All @@ -142,18 +148,19 @@ private void doCancel() {

// called by bus
private void onTimeout(final Throwable timeoutError) {
singleThreaded(() -> {
singleThreadedExecutorService.submit(() -> {
if (!cancelled.getAndSet(true)) {
// bus subscription already cancelled
// trust back-end to free resources on its own
subscriber.onError(timeoutError);
}
});
singleThreadedExecutorService.shutdown();
}

// called by bus
private void onNext(final Adaptable adaptable) {
singleThreaded(() -> {
singleThreadedExecutorService.submit(() -> {
LOGGER.trace("Received from bus: <{}>", adaptable);
handleAdaptable(adaptable);
});
Expand Down

0 comments on commit 117bc2d

Please sign in to comment.