Skip to content

Commit

Permalink
Prevent duplicate summary error reporting on session closure in react…
Browse files Browse the repository at this point in the history
…ive (#1015) (#1067)

Reactive driver might emit the same error both on unconsumed result stream disposal and session closure, this update is intended to fix this.
  • Loading branch information
injectives authored Nov 11, 2021
1 parent 7ff129f commit 87cf5de
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,22 @@

public class RxResultCursorImpl implements RxResultCursor
{
static final BiConsumer<Record,Throwable> DISCARD_RECORD_CONSUMER = ( record, throwable ) -> {/*do nothing*/};
static final BiConsumer<Record,Throwable> DISCARD_RECORD_CONSUMER = ( record, throwable ) ->
{/*do nothing*/};
private final RunResponseHandler runHandler;
private final PullResponseHandler pullHandler;
private final Throwable runResponseError;
private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
private boolean summaryFutureExposed;
private boolean resultConsumed;
private RecordConsumerStatus consumerStatus = NOT_INSTALLED;

public RxResultCursorImpl(RunResponseHandler runHandler, PullResponseHandler pullHandler )
public RxResultCursorImpl( RunResponseHandler runHandler, PullResponseHandler pullHandler )
{
this( null, runHandler, pullHandler );
}

public RxResultCursorImpl(Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler )
public RxResultCursorImpl( Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler )
{
Objects.requireNonNull( runHandler );
Objects.requireNonNull( pullHandler );
Expand Down Expand Up @@ -105,7 +107,8 @@ public void cancel()
public CompletionStage<Throwable> discardAllFailureAsync()
{
// calling this method will enforce discarding record stream and finish running cypher query
return summaryAsync().thenApply( summary -> (Throwable) null ).exceptionally( error -> error );
return summaryStage().thenApply( summary -> (Throwable) null )
.exceptionally( throwable -> summaryFutureExposed ? null : throwable );
}

@Override
Expand All @@ -122,6 +125,18 @@ public CompletionStage<Throwable> pullAllFailureAsync()

@Override
public CompletionStage<ResultSummary> summaryAsync()
{
summaryFutureExposed = true;
return summaryStage();
}

@Override
public boolean isDone()
{
return summaryFuture.isDone();
}

public CompletionStage<ResultSummary> summaryStage()
{
if ( !isDone() && !resultConsumed ) // the summary is called before record streaming
{
Expand All @@ -132,12 +147,6 @@ public CompletionStage<ResultSummary> summaryAsync()
return this.summaryFuture;
}

@Override
public boolean isDone()
{
return summaryFuture.isDone();
}

private void assertRunCompletedSuccessfully()
{
if ( runResponseError != null )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,32 @@
package org.neo4j.driver.internal.cursor;

import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;

import org.neo4j.driver.exceptions.ResultConsumedException;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
import org.neo4j.driver.internal.reactive.util.ListBasedPullHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.summary.ResultSummary;

import static java.util.Arrays.asList;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.neo4j.driver.Values.value;
Expand Down Expand Up @@ -251,6 +258,55 @@ void shouldCancelIfNotPulled()
assertFalse( cursor.isDone() );
}

@Test
void shouldPropagateSummaryErrorViaSummaryStageWhenItIsRetrievedExternally() throws ExecutionException, InterruptedException
{
// Given
RunResponseHandler runHandler = mock( RunResponseHandler.class );
PullResponseHandler pullHandler = mock( PullResponseHandler.class );
@SuppressWarnings( "unchecked" )
ArgumentCaptor<BiConsumer<ResultSummary,Throwable>> summaryConsumerCaptor = ArgumentCaptor.forClass( BiConsumer.class );
RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler );
verify( pullHandler, times( 1 ) ).installSummaryConsumer( summaryConsumerCaptor.capture() );
BiConsumer<ResultSummary,Throwable> summaryConsumer = summaryConsumerCaptor.getValue();
RuntimeException exception = mock( RuntimeException.class );

// When
CompletionStage<ResultSummary> summaryStage = cursor.summaryAsync();
CompletionStage<Throwable> discardStage = cursor.discardAllFailureAsync();
summaryConsumer.accept( null, exception );

// Then
verify( pullHandler ).installRecordConsumer( DISCARD_RECORD_CONSUMER );
verify( pullHandler ).cancel();
ExecutionException actualException = assertThrows( ExecutionException.class, () -> summaryStage.toCompletableFuture().get() );
assertSame( exception, actualException.getCause() );
assertNull( discardStage.toCompletableFuture().get() );
}

@Test
void shouldPropagateSummaryErrorViaDiscardStageWhenSummaryStageIsNotRetrievedExternally() throws ExecutionException, InterruptedException
{
// Given
RunResponseHandler runHandler = mock( RunResponseHandler.class );
PullResponseHandler pullHandler = mock( PullResponseHandler.class );
@SuppressWarnings( "unchecked" )
ArgumentCaptor<BiConsumer<ResultSummary,Throwable>> summaryConsumerCaptor = ArgumentCaptor.forClass( BiConsumer.class );
RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler );
verify( pullHandler, times( 1 ) ).installSummaryConsumer( summaryConsumerCaptor.capture() );
BiConsumer<ResultSummary,Throwable> summaryConsumer = summaryConsumerCaptor.getValue();
RuntimeException exception = mock( RuntimeException.class );

// When
CompletionStage<Throwable> discardStage = cursor.discardAllFailureAsync();
summaryConsumer.accept( null, exception );

// Then
verify( pullHandler ).installRecordConsumer( DISCARD_RECORD_CONSUMER );
verify( pullHandler ).cancel();
assertSame( exception, discardStage.toCompletableFuture().get().getCause() );
}

private static RunResponseHandler newRunResponseHandler( CompletableFuture<Void> runFuture )
{
return new RunResponseHandler( runFuture, METADATA_EXTRACTOR, mock( Connection.class ), null );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public class StartTest implements TestkitRequest
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_after_hello$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_run$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_on_tx_run$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" );
skipMessage = "Requires investigation";
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_agent", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_version", skipMessage );
Expand All @@ -77,12 +76,8 @@ public class StartTest implements TestkitRequest
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_fail_on_reset$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRunParameters\\..*$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_autocommit_transactions_should_support_timeout$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_fails_on_bad_syntax$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_fails_on_missing_parameter$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_iteration_nested$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxFuncRun\\.test_iteration_nested$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_should_not_run_valid_query_in_invalid_tx$", skipMessage );
}

private StartTestBody data;
Expand Down

0 comments on commit 87cf5de

Please sign in to comment.