Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent duplicate summary error reporting on session closure in reactive #1067

Merged
merged 1 commit into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,22 @@

public class RxResultCursorImpl implements RxResultCursor
{
static final BiConsumer<Record,Throwable> DISCARD_RECORD_CONSUMER = ( record, throwable ) -> {/*do nothing*/};
static final BiConsumer<Record,Throwable> DISCARD_RECORD_CONSUMER = ( record, throwable ) ->
{/*do nothing*/};
private final RunResponseHandler runHandler;
private final PullResponseHandler pullHandler;
private final Throwable runResponseError;
private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture<>();
private boolean summaryFutureExposed;
private boolean resultConsumed;
private RecordConsumerStatus consumerStatus = NOT_INSTALLED;

public RxResultCursorImpl(RunResponseHandler runHandler, PullResponseHandler pullHandler )
public RxResultCursorImpl( RunResponseHandler runHandler, PullResponseHandler pullHandler )
{
this( null, runHandler, pullHandler );
}

public RxResultCursorImpl(Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler )
public RxResultCursorImpl( Throwable runError, RunResponseHandler runHandler, PullResponseHandler pullHandler )
{
Objects.requireNonNull( runHandler );
Objects.requireNonNull( pullHandler );
Expand Down Expand Up @@ -105,7 +107,8 @@ public void cancel()
public CompletionStage<Throwable> discardAllFailureAsync()
{
// calling this method will enforce discarding record stream and finish running cypher query
return summaryAsync().thenApply( summary -> (Throwable) null ).exceptionally( error -> error );
return summaryStage().thenApply( summary -> (Throwable) null )
.exceptionally( throwable -> summaryFutureExposed ? null : throwable );
}

@Override
Expand All @@ -122,6 +125,18 @@ public CompletionStage<Throwable> pullAllFailureAsync()

@Override
public CompletionStage<ResultSummary> summaryAsync()
{
summaryFutureExposed = true;
return summaryStage();
}

@Override
public boolean isDone()
{
return summaryFuture.isDone();
}

public CompletionStage<ResultSummary> summaryStage()
{
if ( !isDone() && !resultConsumed ) // the summary is called before record streaming
{
Expand All @@ -132,12 +147,6 @@ public CompletionStage<ResultSummary> summaryAsync()
return this.summaryFuture;
}

@Override
public boolean isDone()
{
return summaryFuture.isDone();
}

private void assertRunCompletedSuccessfully()
{
if ( runResponseError != null )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,32 @@
package org.neo4j.driver.internal.cursor;

import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;

import org.neo4j.driver.exceptions.ResultConsumedException;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
import org.neo4j.driver.internal.handlers.pulln.PullResponseHandler;
import org.neo4j.driver.internal.reactive.util.ListBasedPullHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.summary.ResultSummary;

import static java.util.Arrays.asList;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.neo4j.driver.Values.value;
Expand Down Expand Up @@ -251,6 +258,55 @@ void shouldCancelIfNotPulled()
assertFalse( cursor.isDone() );
}

@Test
void shouldPropagateSummaryErrorViaSummaryStageWhenItIsRetrievedExternally() throws ExecutionException, InterruptedException
{
// Given
RunResponseHandler runHandler = mock( RunResponseHandler.class );
PullResponseHandler pullHandler = mock( PullResponseHandler.class );
@SuppressWarnings( "unchecked" )
ArgumentCaptor<BiConsumer<ResultSummary,Throwable>> summaryConsumerCaptor = ArgumentCaptor.forClass( BiConsumer.class );
RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler );
verify( pullHandler, times( 1 ) ).installSummaryConsumer( summaryConsumerCaptor.capture() );
BiConsumer<ResultSummary,Throwable> summaryConsumer = summaryConsumerCaptor.getValue();
RuntimeException exception = mock( RuntimeException.class );

// When
CompletionStage<ResultSummary> summaryStage = cursor.summaryAsync();
CompletionStage<Throwable> discardStage = cursor.discardAllFailureAsync();
summaryConsumer.accept( null, exception );

// Then
verify( pullHandler ).installRecordConsumer( DISCARD_RECORD_CONSUMER );
verify( pullHandler ).cancel();
ExecutionException actualException = assertThrows( ExecutionException.class, () -> summaryStage.toCompletableFuture().get() );
assertSame( exception, actualException.getCause() );
assertNull( discardStage.toCompletableFuture().get() );
}

@Test
void shouldPropagateSummaryErrorViaDiscardStageWhenSummaryStageIsNotRetrievedExternally() throws ExecutionException, InterruptedException
{
// Given
RunResponseHandler runHandler = mock( RunResponseHandler.class );
PullResponseHandler pullHandler = mock( PullResponseHandler.class );
@SuppressWarnings( "unchecked" )
ArgumentCaptor<BiConsumer<ResultSummary,Throwable>> summaryConsumerCaptor = ArgumentCaptor.forClass( BiConsumer.class );
RxResultCursor cursor = new RxResultCursorImpl( runHandler, pullHandler );
verify( pullHandler, times( 1 ) ).installSummaryConsumer( summaryConsumerCaptor.capture() );
BiConsumer<ResultSummary,Throwable> summaryConsumer = summaryConsumerCaptor.getValue();
RuntimeException exception = mock( RuntimeException.class );

// When
CompletionStage<Throwable> discardStage = cursor.discardAllFailureAsync();
summaryConsumer.accept( null, exception );

// Then
verify( pullHandler ).installRecordConsumer( DISCARD_RECORD_CONSUMER );
verify( pullHandler ).cancel();
assertSame( exception, discardStage.toCompletableFuture().get().getCause() );
}

private static RunResponseHandler newRunResponseHandler( CompletableFuture<Void> runFuture )
{
return new RunResponseHandler( runFuture, METADATA_EXTRACTOR, mock( Connection.class ), null );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public class StartTest implements TestkitRequest
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_after_hello$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_run$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_on_tx_run$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.NoRouting[^.]+\\.test_should_error_on_database_shutdown_using_tx_run$", "Session close throws error" );
skipMessage = "Requires investigation";
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_agent", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestProtocolVersions\\.test_server_version", skipMessage );
Expand All @@ -77,12 +76,8 @@ public class StartTest implements TestkitRequest
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_disconnect_session_on_tx_commit$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestDisconnects\\.test_fail_on_reset$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRunParameters\\..*$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_autocommit_transactions_should_support_timeout$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_fails_on_bad_syntax$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_fails_on_missing_parameter$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_iteration_nested$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxFuncRun\\.test_iteration_nested$", skipMessage );
REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_should_not_run_valid_query_in_invalid_tx$", skipMessage );
}

private StartTestBody data;
Expand Down