Skip to content

Commit

Permalink
Add support for multiple tx result streams in reactive backend (#1085)
Browse files Browse the repository at this point in the history
Prior to this update only a single result stream could exist at a time because publishing thread (event loop) used to be blocked for iterative consumption and, hence, could not be used for additional result streams. Now, the publishing thread will not be blocked and will be available for other result streams too.

Skip reasons have been clarified for tests that required investigation.

Testkit configs have been updated.
  • Loading branch information
injectives authored Nov 22, 2021
1 parent ddb3959 commit 4366ce0
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 289 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package neo4j.org.testkit.backend;

import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

/**
* Buffered subscriber for testing purposes.
* <p>
* It consumes incoming signals as soon as they arrive and prevents publishing thread from getting blocked.
* <p>
* The consumed signals can be retrieved one-by-one using {@link #next()}. It calls upstream {@link org.reactivestreams.Subscription#request(long)} with
* configured fetch size only when next signal is requested and no signals are expected to be emitted either because they have not been requested yet or the
* previous demand has been satisfied.
*
* @param <T>
*/
public class RxBufferedSubscriber<T> extends BaseSubscriber<T>
{
private final Lock lock = new ReentrantLock();
private final long fetchSize;
private final CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
private final FluxSink<T> itemsSink;
private final OneSignalSubscriber<T> itemsSubscriber;
private long pendingItems;
private boolean nextInProgress;

public RxBufferedSubscriber( long fetchSize )
{
this.fetchSize = fetchSize;
AtomicReference<FluxSink<T>> sinkRef = new AtomicReference<>();
itemsSubscriber = new OneSignalSubscriber<>();
Flux.<T>create( fluxSink ->
{
sinkRef.set( fluxSink );
fluxSink.onRequest( ignored -> requestFromUpstream() );
} ).subscribe( itemsSubscriber );
itemsSink = sinkRef.get();
}

/**
* Returns a {@link Mono} of next signal from this subscription.
* <p>
* If necessary, a request with configured fetch size is made for more signals to be published.
* <p>
* <b>Only a single in progress request is supported at a time.</b> The returned {@link Mono} must succeed or error before next call is permitted.
* <p>
* Both empty successful completion and error completion indicate the completion of the subscribed publisher. This method must not be called after this.
*
* @return the {@link Mono} of next signal.
*/
public Mono<T> next()
{
executeWithLock( lock, () ->
{
if ( nextInProgress )
{
throw new IllegalStateException( "Only one in progress next is allowed at a time" );
}
return nextInProgress = true;
} );
return Mono.fromCompletionStage( subscriptionFuture )
.then( Mono.create( itemsSubscriber::requestNext ) )
.doOnSuccess( ignored -> executeWithLock( lock, () -> nextInProgress = false ) )
.doOnError( ignored -> executeWithLock( lock, () -> nextInProgress = false ) );
}

@Override
protected void hookOnSubscribe( Subscription subscription )
{
subscriptionFuture.complete( subscription );
}

@Override
protected void hookOnNext( T value )
{
executeWithLock( lock, () -> pendingItems-- );
itemsSink.next( value );
}

@Override
protected void hookOnComplete()
{
itemsSink.complete();
}

@Override
protected void hookOnError( Throwable throwable )
{
itemsSink.error( throwable );
}

private void requestFromUpstream()
{
boolean moreItemsPending = executeWithLock( lock, () ->
{
boolean morePending;
if ( pendingItems > 0 )
{
morePending = true;
}
else
{
pendingItems = fetchSize;
morePending = false;
}
return morePending;
} );
if ( moreItemsPending )
{
return;
}
Subscription subscription = subscriptionFuture.getNow( null );
if ( subscription == null )
{
throw new IllegalStateException( "Upstream subscription must not be null at this stage" );
}
subscription.request( fetchSize );
}

public static <T> T executeWithLock( Lock lock, Supplier<T> supplier )
{
lock.lock();
try
{
return supplier.get();
}
finally
{
lock.unlock();
}
}

private static class OneSignalSubscriber<T> extends BaseSubscriber<T>
{
private final Lock lock = new ReentrantLock();
private MonoSink<T> sink;
private boolean emitted;
private boolean done;
private Throwable throwable;

public void requestNext( MonoSink<T> sink )
{
boolean done = executeWithLock( lock, () ->
{
this.sink = sink;
emitted = false;
return this.done;
} );

if ( done )
{
if ( throwable != null )
{
this.sink.error( throwable );
}
else
{
this.sink.success();
}
}
else
{
upstream().request( 1 );
}
}

@Override
protected void hookOnSubscribe( Subscription subscription )
{
// left empty to prevent requesting signals immediately
}

@Override
protected void hookOnNext( T value )
{
MonoSink<T> sink = executeWithLock( lock, () ->
{
emitted = true;
return this.sink;
} );
sink.success( value );
}

@Override
protected void hookOnComplete()
{
MonoSink<T> sink = executeWithLock( lock, () ->
{
done = true;
return !emitted ? this.sink : null;
} );
if ( sink != null )
{
sink.success();
}
}

@Override
protected void hookOnError( Throwable throwable )
{
MonoSink<T> sink = executeWithLock( lock, () ->
{
done = true;
this.throwable = throwable;
return !emitted ? this.sink : null;
} );
if ( sink != null )
{
sink.error( throwable );
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import lombok.Getter;
import lombok.Setter;
import neo4j.org.testkit.backend.RxBlockingSubscriber;
import neo4j.org.testkit.backend.RxBufferedSubscriber;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -31,23 +31,21 @@
public class RxResultHolder extends AbstractResultHolder<RxSessionHolder,RxTransactionHolder,RxResult>
{
@Setter
private RxBlockingSubscriber<Record> subscriber;
private RxBufferedSubscriber<Record> subscriber;
@Getter
private final AtomicLong requestedRecordsCounter = new AtomicLong();

public RxResultHolder( RxSessionHolder sessionHolder, RxResult result )
{
super( sessionHolder, result );
sessionHolder.setResultHolder( this );
}

public RxResultHolder( RxTransactionHolder transactionHolder, RxResult result )
{
super( transactionHolder, result );
transactionHolder.getSessionHolder().setResultHolder( this );
}

public Optional<RxBlockingSubscriber<Record>> getSubscriber()
public Optional<RxBufferedSubscriber<Record>> getSubscriber()
{
return Optional.ofNullable( subscriber );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,13 @@
*/
package neo4j.org.testkit.backend.holder;

import lombok.Setter;

import java.util.Optional;

import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.reactive.RxSession;

public class RxSessionHolder extends AbstractSessionHolder<RxSession>
{
@Setter
private RxResultHolder resultHolder;

public RxSessionHolder( DriverHolder driverHolder, RxSession session, SessionConfig config )
{
super( driverHolder, session, config );
}

public Optional<RxResultHolder> getResultHolder()
{
return Optional.ofNullable( resultHolder );
}
}
Loading

0 comments on commit 4366ce0

Please sign in to comment.