diff --git a/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java b/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java index 5256f56fa6..7c5bfaa04b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java @@ -66,6 +66,7 @@ public int compare( BoltServerAddress o1, BoltServerAddress o2 ) } }; private static final int MIN_SERVERS = 1; + private static final int CONNECTION_RETRIES = 3; private final ConnectionPool connections; private final BiFunction sessionProvider; private final Clock clock; @@ -138,11 +139,11 @@ private void getServers() { boolean success = false; - ConcurrentRoundRobinSet routers = new ConcurrentRoundRobinSet<>( routingServers ); + final ConcurrentRoundRobinSet newRouters = new ConcurrentRoundRobinSet<>( ); final Set seen = forgetAllServers(); - while ( !routers.isEmpty() && !success ) + while ( !routingServers.isEmpty() && !success ) { - address = routers.hop(); + address = routingServers.hop(); success = call( address, GET_SERVERS, new Consumer() { @Override @@ -162,12 +163,19 @@ public void accept( Record record ) writeServers.addAll( server.addresses() ); break; case "ROUTE": - routingServers.addAll( server.addresses() ); + newRouters.addAll( server.addresses() ); break; } } } } ); + //We got trough but server gave us an empty list of routers + if (success && newRouters.isEmpty()) { + success = false; + } else if (success) { + routingServers.clear(); + routingServers.addAll( newRouters ); + } } if ( !success ) { @@ -249,7 +257,7 @@ private boolean call( BoltServerAddress address, String procedureName, Consumer< recorder.accept( records.next() ); } } - catch ( ConnectionFailureException e ) + catch ( Throwable e ) { forget( address ); return false; @@ -306,18 +314,36 @@ public void onWriteFailure( BoltServerAddress address ) private Connection acquireConnection( AccessMode role ) { - //Potentially rediscover servers if we are not happy with our current knowledge - checkServers(); - + ConcurrentRoundRobinSet servers; switch ( role ) { case READ: - return connections.acquire( readServers.hop() ); + servers = readServers; + break; case WRITE: - return connections.acquire( writeServers.hop() ); + servers = writeServers; + break; default: throw new ClientException( role + " is not supported for creating new sessions" ); } + + //Potentially rediscover servers if we are not happy with our current knowledge + checkServers(); + int numberOfServers = servers.size(); + for ( int i = 0; i < numberOfServers; i++ ) + { + BoltServerAddress address = servers.hop(); + try + { + return connections.acquire( address ); + } + catch ( ConnectionFailureException e ) + { + forget( address ); + } + } + + throw new ConnectionFailureException( "Failed to connect to any servers" ); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverStubTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverStubTest.java index 2c0a69a2fc..5bcbb0c938 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverStubTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverStubTest.java @@ -42,6 +42,7 @@ import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; import org.neo4j.driver.v1.util.Function; @@ -49,6 +50,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.IsEqual.equalTo; @@ -331,6 +333,37 @@ public void shouldForgetEndpointsOnFailure() throws IOException, InterruptedExce assertThat( server.exitStatus(), equalTo( 0 ) ); } + @Test + public void shouldForgetEndpointsOnFailedSessionAcquisition() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 ); + + //no read servers + + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + try + { + driver.session( AccessMode.READ ); + fail(); + } + catch ( ConnectionFailureException e ) + { + //ignore + } + + assertThat( driver.readServers(), empty() ); + assertThat( driver.writeServers(), hasSize( 2 ) ); + assertFalse( driver.connectionPool().hasAddress( address( 9005 ) ) ); + assertFalse( driver.connectionPool().hasAddress( address( 9006 ) ) ); + driver.close(); + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Test public void shouldRediscoverIfNecessaryOnSessionAcquisition() throws IOException, InterruptedException, StubServer.ForceKilled