Skip to content

Commit

Permalink
Merge pull request #191 from bosch-io/bugfix/search-thread-leak
Browse files Browse the repository at this point in the history
Bugfix/search thread leak
  • Loading branch information
Yannic92 authored Aug 8, 2022
2 parents 35d06f1 + 00fb138 commit 2e9bb2a
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -30,6 +33,8 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import javax.annotation.Nullable;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
Expand All @@ -54,6 +59,7 @@ public final class SpliteratorSubscriber<T> implements Subscriber<T>, Spliterato
private final AtomicInteger splits;
private final AtomicInteger quota;
private final AtomicBoolean cancelled;
@Nullable private volatile ExecutorService subscriptionExecutor;

private SpliteratorSubscriber(final long timeoutMillis, final int bufferSize, final int batchSize) {
// reserve 2*bufferSize+1 space in buffer for <=bufferSize elements and bufferSize+1 EOS markers
Expand Down Expand Up @@ -110,6 +116,10 @@ public void onSubscribe(final Subscription s) {
previousSubscription = subscription.get();
if (previousSubscription == null) {
subscription.set(s);
if (s instanceof ThingSearchSubscription) {
subscriptionExecutor = Executors.newSingleThreadExecutor();
((ThingSearchSubscription) s).setSingleThreadedExecutor(subscriptionExecutor);
}
}
}
if (previousSubscription == null) {
Expand All @@ -130,30 +140,38 @@ public void onNext(final T t) {
@Override
public void onError(final Throwable t) {
LOGGER.trace("onError", t);
cancelled.set(true);
goToCancelledState();
addErrors(t);
}

@Override
public void onComplete() {
LOGGER.trace("onComplete");
cancelled.set(true);
goToCancelledState();
addEos();
}

private void goToCancelledState() {
if (!cancelled.getAndSet(true)) {
if (subscriptionExecutor != null) {
Objects.requireNonNull(subscriptionExecutor).shutdown();
}
}
}

// always cancel the stream on error thrown, because user code catching the error is outside
// the element handling code and should consider this spliterator "used up."
// as a precaution, the error is propagated to all threads reading from this spliterator.
private void cancelOnError(final Consumer<? super T> consumer, final T element) {
try {
consumer.accept(element);
} catch (final RuntimeException e) {
cancelled.set(true);
addErrors(e);
final Subscription s = subscription.get();
if (s != null) {
s.cancel();
}
goToCancelledState();
addErrors(e);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
package org.eclipse.ditto.client.streaming;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.client.internal.bus.AdaptableBus;
Expand Down Expand Up @@ -53,7 +54,7 @@ public final class ThingSearchSubscription implements Subscription {
private final Subscriber<? super SubscriptionHasNextPage> subscriber;
private final AtomicBoolean cancelled;
private final AtomicReference<AdaptableBus.SubscriptionId> busSubscription;
private final ExecutorService singleThreadedExecutorService;
@Nullable private volatile ExecutorService singleThreadedExecutor;

private ThingSearchSubscription(final String subscriptionId,
final ProtocolAdapter protocolAdapter,
Expand All @@ -65,9 +66,6 @@ private ThingSearchSubscription(final String subscriptionId,
this.subscriber = subscriber;
cancelled = new AtomicBoolean(false);
busSubscription = new AtomicReference<>();

// not shutdown to handle queued messages; will be shutdown by garbage collector
singleThreadedExecutorService = Executors.newSingleThreadExecutor();
}

/**
Expand All @@ -94,7 +92,7 @@ public static void start(final SubscriptionCreated event,
// called by subscriber
@Override
public void request(final long n) {
singleThreadedExecutorService.submit(() -> {
singleThreaded(() -> {
if (n <= 0) {
doCancel();
subscriber.onError(new IllegalArgumentException("Expect positive demand, got: " + n));
Expand All @@ -112,9 +110,26 @@ public void request(final long n) {
// called by subscriber
@Override
public void cancel() {
if (!singleThreadedExecutorService.isShutdown() && !singleThreadedExecutorService.isTerminated()) {
CompletableFuture.runAsync(this::doCancel, singleThreadedExecutorService)
.whenComplete((result, error) -> singleThreadedExecutorService.shutdownNow());
singleThreaded(this::doCancel);
}

/**
* Set the single-threaded executor of this subscription.
* Subscription methods run in the executor in order to maintain element order.
* Creating the executor within this class is not possible because the garbage collector may not stop the executor.
*
* @param singleThreadedExecutor The single-threaded executor.
*/
public void setSingleThreadedExecutor(final ExecutorService singleThreadedExecutor) {
// TODO: After upgrading to Java 9, consider using the Cleaner interface instead.
this.singleThreadedExecutor = singleThreadedExecutor;
}

private void singleThreaded(final Runnable runnable) {
if (singleThreadedExecutor != null) {
Objects.requireNonNull(singleThreadedExecutor).submit(runnable);
} else {
runnable.run();
}
}

Expand All @@ -127,7 +142,7 @@ private void doCancel() {

// called by bus
private void onTimeout(final Throwable timeoutError) {
singleThreadedExecutorService.submit(() -> {
singleThreaded(() -> {
if (!cancelled.getAndSet(true)) {
// bus subscription already cancelled
// trust back-end to free resources on its own
Expand All @@ -138,7 +153,7 @@ private void onTimeout(final Throwable timeoutError) {

// called by bus
private void onNext(final Adaptable adaptable) {
singleThreadedExecutorService.submit(() -> {
singleThreaded(() -> {
LOGGER.trace("Received from bus: <{}>", adaptable);
handleAdaptable(adaptable);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.client.internal.AbstractDittoClientTest;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CreateSubscription;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.RequestFromSubscription;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionComplete;
Expand All @@ -43,9 +44,9 @@ public void run() throws Exception {
final Publisher<SubscriptionHasNextPage> underTest =
ThingSearchPublisher.of(CreateSubscription.of(DittoHeaders.empty()), PROTOCOL_ADAPTER, messaging);
final SpliteratorSubscriber<SubscriptionHasNextPage> subscriber = SpliteratorSubscriber.of();
final ExecutorService executor = Executors.newSingleThreadExecutor();
final CompletableFuture<List<SubscriptionHasNextPage>> subscriberFuture =
CompletableFuture.supplyAsync(() -> subscriber.asStream().collect(Collectors.toList()),
Executors.newSingleThreadExecutor());
CompletableFuture.supplyAsync(() -> subscriber.asStream().collect(Collectors.toList()), executor);
underTest.subscribe(subscriber);
final CreateSubscription createSubscription = expectMsgClass(CreateSubscription.class);
final String subscriptionId = "subscription1234";
Expand All @@ -62,6 +63,7 @@ public void run() throws Exception {
final RequestFromSubscription futileRequest = expectMsgClass(RequestFromSubscription.class);
reply(SubscriptionComplete.of(subscriptionId, futileRequest.getDittoHeaders()));
subscriberFuture.get(1L, TimeUnit.SECONDS);
executor.shutdownNow();
assertThat(subscriberFuture).isCompletedWithValue(expectedResult);
}
}

0 comments on commit 2e9bb2a

Please sign in to comment.