Skip to content

Commit

Permalink
Merge pull request #241 from pontusmelke/1.1-remove-address-on-connec…
Browse files Browse the repository at this point in the history
…tion-failure

Remove server when failing on session acquisition
  • Loading branch information
pontusmelke authored Oct 5, 2016
2 parents 9fc758f + b4885f6 commit a81d882
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 11 deletions.
47 changes: 36 additions & 11 deletions driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public int compare( BoltServerAddress o1, BoltServerAddress o2 )
}
};
private static final int MIN_SERVERS = 1;
private static final int CONNECTION_RETRIES = 3;
private final ConnectionPool connections;
private final Function<Connection,Session> sessionProvider;
private final Clock clock;
Expand Down Expand Up @@ -108,7 +109,6 @@ private Set<BoltServerAddress> forgetAllServers()
seen.addAll( routingServers );
seen.addAll( readServers );
seen.addAll( writeServers );
routingServers.clear();
readServers.clear();
writeServers.clear();
return seen;
Expand Down Expand Up @@ -136,11 +136,11 @@ private void getServers()
{
boolean success = false;

ConcurrentRoundRobinSet<BoltServerAddress> routers = new ConcurrentRoundRobinSet<>( routingServers );
final Set<BoltServerAddress> newRouters = new HashSet<>( );
final Set<BoltServerAddress> seen = forgetAllServers();
while ( !routers.isEmpty() && !success )
while ( !routingServers.isEmpty() && !success )
{
address = routers.hop();
address = routingServers.hop();
success = call( address, GET_SERVERS, new Consumer<Record>()
{
@Override
Expand All @@ -160,12 +160,19 @@ public void accept( Record record )
writeServers.addAll( server.addresses() );
break;
case "ROUTE":
routingServers.addAll( server.addresses() );
newRouters.addAll( server.addresses() );
break;
}
}
}
} );
//We got trough but server gave us an empty list of routers
if (success && newRouters.isEmpty()) {
success = false;
} else if (success) {
routingServers.clear();
routingServers.addAll( newRouters );
}
}
if ( !success )
{
Expand Down Expand Up @@ -247,7 +254,7 @@ private boolean call( BoltServerAddress address, String procedureName, Consumer<
recorder.accept( records.next() );
}
}
catch ( ConnectionFailureException e )
catch ( Throwable e )
{
forget( address );
return false;
Expand Down Expand Up @@ -303,18 +310,36 @@ public void onWriteFailure( BoltServerAddress address )

private Connection acquireConnection( AccessMode role )
{
//Potentially rediscover servers if we are not happy with our current knowledge
checkServers();

ConcurrentRoundRobinSet<BoltServerAddress> servers;
switch ( role )
{
case READ:
return connections.acquire( readServers.hop() );
servers = readServers;
break;
case WRITE:
return connections.acquire( writeServers.hop() );
servers = writeServers;
break;
default:
throw new ClientException( role + " is not supported for creating new sessions" );
}

//Potentially rediscover servers if we are not happy with our current knowledge
checkServers();
int numberOfServers = servers.size();
for ( int i = 0; i < numberOfServers; i++ )
{
BoltServerAddress address = servers.hop();
try
{
return connections.acquire( address );
}
catch ( ConnectionFailureException e )
{
forget( address );
}
}

throw new ConnectionFailureException( "Failed to connect to any servers" );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public void close()
}
}

public BoltServerAddress address()
{
return connection.address();
}

static Neo4jException filterFailureToWrite( ClientException e, AccessMode mode, RoutingErrorHandler onError,
BoltServerAddress address )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,8 @@ public ResultSummary consume()
}
}

public BoltServerAddress address()
{
return address;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
import org.neo4j.driver.v1.util.Function;
import org.neo4j.driver.v1.util.StubServer;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.core.IsEqual.equalTo;
Expand Down Expand Up @@ -329,6 +331,37 @@ public void shouldForgetEndpointsOnFailure() throws IOException, InterruptedExce
assertThat( server.exitStatus(), equalTo( 0 ) );
}

@Test
public void shouldForgetEndpointsOnFailedSessionAcquisition() throws IOException, InterruptedException, StubServer.ForceKilled
{
// Given
StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 );

//no read servers

URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config );
try
{
driver.session( AccessMode.READ );
fail();
}
catch ( ConnectionFailureException e )
{
//ignore
}

assertThat( driver.readServers(), empty() );
assertThat( driver.writeServers(), hasSize( 2 ) );
assertFalse( driver.connectionPool().hasAddress( address( 9005 ) ) );
assertFalse( driver.connectionPool().hasAddress( address( 9006 ) ) );
driver.close();

// Finally
assertThat( server.exitStatus(), equalTo( 0 ) );
}


@Test
public void shouldRediscoverIfNecessaryOnSessionAcquisition()
throws IOException, InterruptedException, StubServer.ForceKilled
Expand Down

0 comments on commit a81d882

Please sign in to comment.