Skip to content

Commit

Permalink
Added ttl handling
Browse files Browse the repository at this point in the history
The returned `ttl` from server is now respected. Whenever we do acquisition we check if
the information is stale, if so we do new `getServers` call.
  • Loading branch information
pontusmelke committed Sep 21, 2016
1 parent d8b6c43 commit b2ab2ed
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 24 deletions.
28 changes: 17 additions & 11 deletions driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/**
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
* <p>
*
* This file is part of Neo4j.
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -23,11 +23,13 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.internal.util.ConcurrentRoundRobinSet;
import org.neo4j.driver.internal.util.Consumer;
import org.neo4j.driver.v1.AccessRole;
Expand Down Expand Up @@ -65,30 +67,34 @@ public int compare( BoltServerAddress o1, BoltServerAddress o2 )
private static final int MIN_SERVERS = 2;
private final ConnectionPool connections;
private final BiFunction<Connection,Logger,Session> sessionProvider;

private final Clock clock;
private final ConcurrentRoundRobinSet<BoltServerAddress> routingServers =
new ConcurrentRoundRobinSet<>( COMPARATOR );
private final ConcurrentRoundRobinSet<BoltServerAddress> readServers = new ConcurrentRoundRobinSet<>( COMPARATOR );
private final ConcurrentRoundRobinSet<BoltServerAddress> writeServers = new ConcurrentRoundRobinSet<>( COMPARATOR );
private final AtomicLong expires = new AtomicLong( 0L );

public ClusterDriver( BoltServerAddress seedAddress,
ConnectionPool connections,
SecurityPlan securityPlan,
BiFunction<Connection,Logger,Session> sessionProvider,
Clock clock,
Logging logging )
{
super( securityPlan, logging );
routingServers.add( seedAddress );
this.connections = connections;
this.sessionProvider = sessionProvider;
this.clock = clock;
checkServers();
}

private void checkServers()
{
synchronized ( routingServers )
{
if ( routingServers.size() < MIN_SERVERS ||
if ( expires.get() < clock.millis() ||
routingServers.size() < MIN_SERVERS ||
readServers.isEmpty() ||
writeServers.isEmpty() )
{
Expand All @@ -99,7 +105,7 @@ private void checkServers()

private Set<BoltServerAddress> forgetAllServers()
{
final Set<BoltServerAddress> seen = new HashSet<>( );
final Set<BoltServerAddress> seen = new HashSet<>();
seen.addAll( routingServers );
seen.addAll( readServers );
seen.addAll( writeServers );
Expand Down Expand Up @@ -127,7 +133,7 @@ private void getServers()
@Override
public void accept( Record record )
{
long ttl = record.get( "ttl" ).asLong();
expires.set( clock.millis() + record.get( "ttl" ).asLong() * 1000L);
List<ServerInfo> servers = servers( record );
for ( ServerInfo server : servers )
{
Expand Down Expand Up @@ -206,14 +212,14 @@ private List<ServerInfo> servers( Record record )
@Override
public ServerInfo apply( Value value )
{
return new ServerInfo( value.get("addresses").asList( new Function<Value,BoltServerAddress>()
return new ServerInfo( value.get( "addresses" ).asList( new Function<Value,BoltServerAddress>()
{
@Override
public BoltServerAddress apply( Value value )
{
return new BoltServerAddress( value.asString() );
}
} ), value.get("role").asString() );
} ), value.get( "role" ).asString() );
}
} );
}
Expand Down
3 changes: 2 additions & 1 deletion driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.util.BiFunction;

Expand Down Expand Up @@ -189,7 +190,7 @@ public static Driver driver( URI uri, AuthToken authToken, Config config )
case "bolt":
return new DirectDriver( address, connectionPool, securityPlan, config.logging() );
case "bolt+routing":
return new ClusterDriver( address, connectionPool, securityPlan, SESSION_PROVIDER, config.logging() );
return new ClusterDriver( address, connectionPool, securityPlan, SESSION_PROVIDER, Clock.SYSTEM, config.logging() );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/**
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
* <p>
*
* This file is part of Neo4j.
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/**
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
* <p>
*
* This file is part of Neo4j.
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -31,6 +31,7 @@
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.AccessRole;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
Expand Down Expand Up @@ -191,7 +192,63 @@ public void shouldForgetAboutServersOnRerouting()
verify( pool ).purge( boltAddress( "localhost", 1111 ) );
}

@Test
public void shouldRediscoverOnTimeout()
{
// Given
final Session session = mock( Session.class );
Clock clock = mock( Clock.class );
when(clock.millis()).thenReturn( 0L, 11000L, 22000L );
when( session.run( GET_SERVERS ) )
.thenReturn(
getServers( asList( "localhost:1111", "localhost:1112", "localhost:1113" ),
singletonList( "localhost:2222" ),
singletonList( "localhost:3333" ), 10L/*seconds*/ ) )
.thenReturn(
getServers( singletonList( "localhost:5555" ), singletonList( "localhost:5555" ), singletonList( "localhost:5555" ) ) );

ClusterDriver clusterDriver = forSession( session, clock );

// When
clusterDriver.session( AccessRole.WRITE );

// Then
assertThat( clusterDriver.routingServers(), containsInAnyOrder( boltAddress( "localhost", 5555 ) ) );
assertThat( clusterDriver.readServers(), containsInAnyOrder( boltAddress( "localhost", 5555 ) ) );
assertThat( clusterDriver.writeServers(), containsInAnyOrder( boltAddress( "localhost", 5555 ) ) );
}

@Test
public void shouldNotRediscoverWheNoTimeout()
{
// Given
final Session session = mock( Session.class );
Clock clock = mock( Clock.class );
when(clock.millis()).thenReturn( 0L, 9900L, 18800L );
when( session.run( GET_SERVERS ) )
.thenReturn(
getServers( asList( "localhost:1111", "localhost:1112", "localhost:1113" ),
singletonList( "localhost:2222" ),
singletonList( "localhost:3333" ), 10L/*seconds*/ ) )
.thenReturn(
getServers( singletonList( "localhost:5555" ), singletonList( "localhost:5555" ), singletonList( "localhost:5555" ) ) );

ClusterDriver clusterDriver = forSession( session, clock );

// When
clusterDriver.session( AccessRole.WRITE );

// Then
assertThat( clusterDriver.routingServers(), containsInAnyOrder( boltAddress( "localhost", 1111 ), boltAddress( "localhost", 1112 ), boltAddress( "localhost", 1113 ) ) );
assertThat( clusterDriver.readServers(), containsInAnyOrder( boltAddress( "localhost", 2222 ) ) );
assertThat( clusterDriver.writeServers(), containsInAnyOrder( boltAddress( "localhost", 3333 ) ) );
}

private ClusterDriver forSession( final Session session )
{
return forSession( session, Clock.SYSTEM );
}
private ClusterDriver forSession( final Session session, Clock clock )
{
return new ClusterDriver( SEED, pool, insecure(),
new BiFunction<Connection,Logger,Session>()
Expand All @@ -201,16 +258,23 @@ public Session apply( Connection connection, Logger ignore )
{
return session;
}
}, logging() );
}, clock, logging() );
}

private BoltServerAddress boltAddress( String host, int port )
{
return new BoltServerAddress( host, port );
}


StatementResult getServers( final List<String> routers, final List<String> readers,
final List<String> writers )
{
return getServers( routers,readers, writers, Long.MAX_VALUE );
}

StatementResult getServers( final List<String> routers, final List<String> readers,
final List<String> writers, final long ttl )
{
return new StatementResult()
{
Expand All @@ -233,7 +297,7 @@ public Record next()
{
return new InternalRecord( asList( "ttl", "servers" ),
new Value[]{
value( Long.MAX_VALUE ),
value( ttl ),
value( asList( serverInfo( "ROUTE", routers ), serverInfo( "WRITE", writers ),
serverInfo( "READ", readers ) ) )
} );
Expand Down

0 comments on commit b2ab2ed

Please sign in to comment.