Skip to content

Commit

Permalink
Rediscovery no longer additive
Browse files Browse the repository at this point in the history
Whenever we go down to the database to call `getServers` we should use the
values returned from the call and forget about any servers returned in previous
calls.
  • Loading branch information
pontusmelke committed Sep 21, 2016
1 parent 6eb2dd7 commit d8b6c43
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 62 deletions.
27 changes: 25 additions & 2 deletions driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -96,16 +97,31 @@ private void checkServers()
}
}

private Set<BoltServerAddress> forgetAllServers()
{
final Set<BoltServerAddress> seen = new HashSet<>( );
seen.addAll( routingServers );
seen.addAll( readServers );
seen.addAll( writeServers );
routingServers.clear();
readServers.clear();
writeServers.clear();
return seen;
}

//must be called from a synchronized block
private void getServers()
{
BoltServerAddress address = null;
try
{
boolean success = false;
while ( !routingServers.isEmpty() && !success )

ConcurrentRoundRobinSet<BoltServerAddress> routers = new ConcurrentRoundRobinSet<>( routingServers );
final Set<BoltServerAddress> seen = forgetAllServers();
while ( !routers.isEmpty() && !success )
{
address = routingServers.hop();
address = routers.hop();
success = call( address, GET_SERVERS, new Consumer<Record>()
{
@Override
Expand All @@ -115,6 +131,7 @@ public void accept( Record record )
List<ServerInfo> servers = servers( record );
for ( ServerInfo server : servers )
{
seen.removeAll( server.addresses() );
switch ( server.role() )
{
case "READ":
Expand All @@ -135,6 +152,12 @@ public void accept( Record record )
{
throw new ServiceUnavailableException( "Run out of servers" );
}

//the server no longer think we should care about these
for ( BoltServerAddress remove : seen )
{
connections.purge( remove );
}
}
catch ( ClientException ex )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public ConcurrentRoundRobinSet( Comparator<T> comparator )
set = new ConcurrentSkipListSet<>( comparator );
}

public ConcurrentRoundRobinSet(ConcurrentRoundRobinSet<T> original)
{
set = new ConcurrentSkipListSet<>( original.set.comparator() );
set.addAll( original );
}

public T hop()
{
if ( current == null )
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/**
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* <p>
* This file is part of Neo4j.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -46,13 +46,16 @@
import org.neo4j.driver.v1.util.Function;
import org.neo4j.driver.v1.util.StubServer;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@Ignore
public class ClusterDriverStubTest
{
Expand All @@ -73,10 +76,7 @@ public void shouldDiscoverServers() throws IOException, InterruptedException, St
{
// Then
Set<BoltServerAddress> addresses = driver.routingServers();
assertThat( addresses, hasSize( 3 ) );
assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) );
assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ) );
assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ) );
assertThat( addresses, containsInAnyOrder( address(9001), address( 9002 ), address( 9003 ) ) );
}

// Finally
Expand All @@ -89,23 +89,21 @@ public void shouldDiscoverNewServers() throws IOException, InterruptedException,
// Given
StubServer server = StubServer.start( resource( "discover_new_servers.script" ), 9001 );
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
BoltServerAddress seed = address( 9001 );

// When
try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) )
{
// Then
Set<BoltServerAddress> addresses = driver.routingServers();
assertThat( addresses, hasSize( 4 ) );
assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) );
assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ) );
assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ) );
assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9004 ) ) );
assertThat( addresses, containsInAnyOrder( address(9002), address( 9003 ), address( 9004 ) ) );
}

// Finally
assertThat( server.exitStatus(), equalTo( 0 ) );
}


@Test
public void shouldHandleEmptyResponse() throws IOException, InterruptedException, StubServer.ForceKilled
{
Expand All @@ -115,8 +113,8 @@ public void shouldHandleEmptyResponse() throws IOException, InterruptedException
try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) )
{
Set<BoltServerAddress> servers = driver.routingServers();
assertThat( servers, hasSize( 1 ) );
assertThat( servers, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) );
assertThat( servers, hasSize( 0 ) );
assertFalse( driver.connectionPool().hasAddress( address( 9001 ) ) );
}

// Finally
Expand Down Expand Up @@ -265,7 +263,7 @@ public void shouldRoundRobinWriteSessions() throws IOException, InterruptedExcep
{
for ( int i = 0; i < 2; i++ )
{
try(Session session = driver.session() )
try ( Session session = driver.session() )
{
session.run( "CREATE (n {name:'Bob'})" );
}
Expand All @@ -291,15 +289,9 @@ public void shouldRememberEndpoints() throws IOException, InterruptedException,
{
session.run( "MATCH (n) RETURN n.name" ).consume();

assertThat( driver.readServers(), hasSize( 2 ));
assertThat( driver.readServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9005 ) ) );
assertThat( driver.readServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9006 ) ) );
assertThat( driver.writeServers(), hasSize( 2 ));
assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9007 ) ) );
assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9008 ) ) );
//Make sure we don't cache acquired servers as discovery servers
assertThat( driver.routingServers(), not(hasItem( new BoltServerAddress( "127.0.0.1", 9005 ))));
assertThat( driver.routingServers(), not(hasItem( new BoltServerAddress( "127.0.0.1", 9006 ))));
assertThat( driver.readServers(), containsInAnyOrder( address( 9005 ), address( 9006 ) ) );
assertThat( driver.writeServers(), containsInAnyOrder( address( 9007 ), address( 9008 ) ) );
assertThat( driver.routingServers(), containsInAnyOrder( address( 9001 ), address( 9002 ), address( 9003 ) ) );
}
// Finally
assertThat( server.exitStatus(), equalTo( 0 ) );
Expand All @@ -316,29 +308,30 @@ public void shouldForgetEndpointsOnFailure() throws IOException, InterruptedExce
StubServer.start( resource( "dead_server.script" ), 9005 );
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config );
boolean failed = false;
try
{
Session session = driver.session( AccessRole.READ );
session.run( "MATCH (n) RETURN n.name" ).consume();
session.close();
fail();
}
catch ( SessionExpiredException e )
{
failed = true;
//ignore
}

assertTrue( failed );
assertThat( driver.readServers(), not(hasItem( new BoltServerAddress( "127.0.0.1", 9005 ) ) ));
assertThat( driver.readServers(), not( hasItem( address( 9005 ) ) ) );
assertThat( driver.writeServers(), hasSize( 2 ) );
assertFalse( driver.connectionPool().hasAddress( address( 9005 ) ) );
driver.close();

// Finally
assertThat( server.exitStatus(), equalTo( 0 ) );
}

@Test
public void shouldRediscoverIfNecessaryOnSessionAcquisition() throws IOException, InterruptedException, StubServer.ForceKilled
public void shouldRediscoverIfNecessaryOnSessionAcquisition()
throws IOException, InterruptedException, StubServer.ForceKilled
{
// Given
StubServer server = StubServer.start( resource( "rediscover.script" ), 9001 );
Expand All @@ -349,17 +342,15 @@ public void shouldRediscoverIfNecessaryOnSessionAcquisition() throws IOException

//On creation we only find ourselves
ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config );
assertThat( driver.routingServers(), hasSize( 1 ) );
assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ));
assertThat( driver.routingServers(), containsInAnyOrder( address( 9001 ) ) );
assertTrue( driver.connectionPool().hasAddress( address( 9001 ) ) );

//since we know about less than three servers a rediscover should be triggered
//since we have no write nor read servers we must rediscover
Session session = driver.session( AccessRole.READ );
assertThat( driver.routingServers(), hasSize( 4 ) );
assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ));
assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ));
assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ));
assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9004 ) ));

assertThat( driver.routingServers(), containsInAnyOrder(address( 9002 ),
address( 9003 ), address( 9004 ) ) );
//server told os to forget 9001
assertFalse( driver.connectionPool().hasAddress( address( 9001 ) ) );
session.close();
driver.close();

Expand All @@ -379,8 +370,7 @@ public void shouldOnlyGetServersOnce() throws IOException, InterruptedException,

//On creation we only find ourselves
final ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config );
assertThat( driver.routingServers(), hasSize( 1 ) );
assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ));
assertThat( driver.routingServers(), containsInAnyOrder( address( 9001 ) ) );

ExecutorService runner = Executors.newFixedThreadPool( 10 );
for ( int i = 0; i < 10; i++ )
Expand All @@ -391,7 +381,7 @@ public void shouldOnlyGetServersOnce() throws IOException, InterruptedException,
public void run()
{
//noinspection EmptyTryBlock
try(Session ignore = driver.session( AccessRole.READ ))
try ( Session ignore = driver.session( AccessRole.READ ) )
{
//empty
}
Expand All @@ -401,11 +391,7 @@ public void run()
}
runner.awaitTermination( 10, TimeUnit.SECONDS );
//since we know about less than three servers a rediscover should be triggered
assertThat( driver.routingServers(), hasSize( 4 ) );
assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ));
assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ));
assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ));
assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9004 ) ));
assertThat( driver.routingServers(), containsInAnyOrder( address( 9002 ), address( 9003 ), address( 9004 ) ) );

driver.close();

Expand Down Expand Up @@ -450,19 +436,19 @@ public void shouldHandleLeaderSwitchWhenWriting()
boolean failed = false;
try ( Session session = driver.session( AccessRole.WRITE ) )
{
assertThat(driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9007 ) ));
assertThat(driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9008 ) ));
assertThat( driver.writeServers(), hasItem(address( 9007 ) ) );
assertThat( driver.writeServers(), hasItem( address( 9008 ) ) );
session.run( "CREATE ()" ).consume();
}
catch (SessionExpiredException e)
catch ( SessionExpiredException e )
{
failed = true;
assertThat(e.getMessage(), equalTo( "Server at 127.0.0.1:9007 no longer accepts writes" ));
assertThat( e.getMessage(), equalTo( "Server at 127.0.0.1:9007 no longer accepts writes" ) );
}
assertTrue( failed );
assertThat( driver.writeServers(), not( hasItem( new BoltServerAddress( "127.0.0.1", 9007 ) ) ) );
assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9008 ) ) );
assertTrue( driver.connectionPool().hasAddress( new BoltServerAddress( "127.0.0.1", 9007 ) ) );
assertThat( driver.writeServers(), not( hasItem( address( 9007 ) ) ) );
assertThat( driver.writeServers(), hasItem( address( 9008 ) ) );
assertTrue( driver.connectionPool().hasAddress( address( 9007 ) ) );

driver.close();
// Finally
Expand All @@ -478,4 +464,9 @@ String resource( String fileName )
}
return resource.getFile();
}

private BoltServerAddress address( int port )
{
return new BoltServerAddress( "127.0.0.1", port );
}
}
Loading

0 comments on commit d8b6c43

Please sign in to comment.