Skip to content

Commit

Permalink
Make AddressSet retain resolved addresses (#1034)
Browse files Browse the repository at this point in the history
Routing table address set update may override resolved router address. This leads to routing connection pool closures. This update aims to optimise this.
  • Loading branch information
injectives authored Oct 15, 2021
1 parent 9034eb9 commit e7b8c21
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,23 @@ public int size()
return addresses.length;
}

/**
* Updates addresses using the provided set.
* <p>
* It aims to retain existing addresses by checking if they are present in the new set. To benefit from this, the provided set MUST contain specifically
* {@link BoltServerAddress} instances with equal host and connection host values.
*
* @param newAddresses the new address set.
*/
public synchronized void retainAllAndAdd( Set<BoltServerAddress> newAddresses )
{
BoltServerAddress[] addressesArr = new BoltServerAddress[newAddresses.size()];
int insertionIdx = 0;
for ( BoltServerAddress address : addresses )
{
if ( newAddresses.remove( address ) )
BoltServerAddress lookupAddress =
BoltServerAddress.class.equals( address.getClass() ) ? address : new BoltServerAddress( address.host(), address.port() );
if ( newAddresses.remove( lookupAddress ) )
{
addressesArr[insertionIdx] = address;
insertionIdx++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private synchronized void freshClusterCompositionFetched( ClusterCompositionLook
{
try
{
log.debug( "Fetched cluster composition for database '%s'. %s", databaseName.description(), compositionLookupResult.getClusterComposition() );
routingTable.update( compositionLookupResult.getClusterComposition() );
routingTableRegistry.removeAged();

Expand Down Expand Up @@ -166,7 +167,8 @@ private synchronized void freshClusterCompositionFetched( ClusterCompositionLook

private synchronized void clusterCompositionLookupFailed( Throwable error )
{
log.error( String.format( "Failed to update routing table for database '%s'. Current routing table: %s.", databaseName.description(), routingTable ), error );
log.error( String.format( "Failed to update routing table for database '%s'. Current routing table: %s.", databaseName.description(), routingTable ),
error );
routingTableRegistry.remove( databaseName );
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
refreshRoutingTableFuture = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@

import org.junit.jupiter.api.Test;

import java.net.InetAddress;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;

import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.ResolvedBoltServerAddress;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;

class AddressSetTest
{
Expand Down Expand Up @@ -142,6 +147,32 @@ void shouldHaveCorrectSize()
assertEquals( 2, addressSet.size() );
}

@Test
void shouldRetainExistingAddresses()
{
AddressSet addressSet = new AddressSet();
BoltServerAddress address0 = new BoltServerAddress( "node0", 7687 );
BoltServerAddress address1 = new ResolvedBoltServerAddress( "node1", 7687, new InetAddress[]{InetAddress.getLoopbackAddress()} );
BoltServerAddress address2 = new BoltServerAddress( "node2", 7687 );
BoltServerAddress address3 = new BoltServerAddress( "node3", 7687 );
BoltServerAddress address4 = new BoltServerAddress( "node4", 7687 );
addressSet.retainAllAndAdd( new HashSet<>( Arrays.asList( address0, address1, address2, address3, address4 ) ) );

BoltServerAddress sameAddress0 = new BoltServerAddress( "node0", 7687 );
BoltServerAddress sameAddress1 = new BoltServerAddress( "node1", 7687 );
BoltServerAddress differentAddress2 = new BoltServerAddress( "different-node2", 7687 );
BoltServerAddress sameAddress3 = new BoltServerAddress( "node3", 7687 );
BoltServerAddress sameAddress4 = new BoltServerAddress( "node4", 7687 );
addressSet.retainAllAndAdd( new HashSet<>( Arrays.asList( sameAddress0, sameAddress1, differentAddress2, sameAddress3, sameAddress4 ) ) );

assertEquals( 5, addressSet.size() );
assertSame( addressSet.toArray()[0], address0 );
assertSame( addressSet.toArray()[1], address1 );
assertSame( addressSet.toArray()[2], address3 );
assertSame( addressSet.toArray()[3], address4 );
assertSame( addressSet.toArray()[4], differentAddress2 );
}

private static Set<BoltServerAddress> addresses( String... strings )
{
Set<BoltServerAddress> set = new LinkedHashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.DatabaseNameUtil;
import org.neo4j.driver.internal.cluster.AddressSet;
Expand All @@ -43,6 +42,11 @@
@Getter
public class GetRoutingTable implements TestkitRequest
{
private static final Function<AddressSet,List<String>> ADDRESSES_TO_STRINGS =
( addresses ) -> Arrays.stream( addresses.toArray() )
.map( address -> String.format( "%s:%d", address.host(), address.port() ) )
.collect( Collectors.toList() );

private GetRoutingTableBody data;

@Override
Expand All @@ -61,17 +65,15 @@ public TestkitResponse process( TestkitState testkitState )
String.format( "There is no routing table handler for the '%s' database.", databaseName.databaseName().orElse( "null" ) ) ) );

org.neo4j.driver.internal.cluster.RoutingTable routingTable = routingTableHandler.routingTable();
Function<AddressSet,List<String>> addressesToStrings = ( addresses ) -> Arrays.stream( addresses.toArray() )
.map( BoltServerAddress::toString ).collect( Collectors.toList() );

return RoutingTable
.builder()
.data( RoutingTable.RoutingTableBody
.builder()
.database( databaseName.databaseName().orElse( null ) )
.routers( addressesToStrings.apply( routingTable.routers() ) )
.readers( addressesToStrings.apply( routingTable.readers() ) )
.writers( addressesToStrings.apply( routingTable.writers() ) )
.routers( ADDRESSES_TO_STRINGS.apply( routingTable.routers() ) )
.readers( ADDRESSES_TO_STRINGS.apply( routingTable.readers() ) )
.writers( ADDRESSES_TO_STRINGS.apply( routingTable.writers() ) )
.build()
).build();
}
Expand Down

0 comments on commit e7b8c21

Please sign in to comment.