diff --git a/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java b/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java index e851ba20eb..8d8b4b23af 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java +++ b/driver/src/main/java/org/neo4j/driver/internal/BoltServerAddress.java @@ -22,15 +22,14 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; -import java.net.UnknownHostException; -import java.util.List; +import java.util.Collections; +import java.util.LinkedHashSet; import java.util.Objects; -import java.util.stream.Stream; +import java.util.Set; import org.neo4j.driver.net.ServerAddress; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.toList; /** * Holds a host and port pair that denotes a Bolt server address. @@ -43,8 +42,8 @@ public class BoltServerAddress implements ServerAddress private final String host; // This could either be the same as originalHost or it is an IP address resolved from the original host. private final int port; private final String stringValue; - - private InetAddress resolved; + + private final Set resolved; public BoltServerAddress( String address ) { @@ -58,15 +57,15 @@ public BoltServerAddress( URI uri ) public BoltServerAddress( String host, int port ) { - this( host, null, port ); + this( host, port, Collections.emptySet() ); } - private BoltServerAddress( String host, InetAddress resolved, int port ) + public BoltServerAddress( String host, int port, Set resolved ) { this.host = requireNonNull( host, "host" ); - this.resolved = resolved; this.port = requireValidPort( port ); - this.stringValue = resolved != null ? String.format( "%s(%s):%d", host, resolved.getHostAddress(), port ) : String.format( "%s:%d", host, port ); + this.stringValue = String.format( "%s:%d", host, port ); + this.resolved = Collections.unmodifiableSet( new LinkedHashSet<>( resolved ) ); } public static BoltServerAddress from( ServerAddress address ) @@ -112,33 +111,7 @@ public String toString() */ public SocketAddress toSocketAddress() { - return resolved == null ? new InetSocketAddress( host, port ) : new InetSocketAddress( resolved, port ); - } - - /** - * Resolve the host name down to an IP address - * - * @return a new address instance - * @throws UnknownHostException if no IP address for the host could be found - * @see InetAddress#getByName(String) - */ - public BoltServerAddress resolve() throws UnknownHostException - { - return new BoltServerAddress( host, InetAddress.getByName( host ), port ); - } - - /** - * Resolve the host name down to all IP addresses that can be resolved to - * - * @return an array of new address instances that holds resolved addresses - * @throws UnknownHostException if no IP address for the host could be found - * @see InetAddress#getAllByName(String) - */ - public List resolveAll() throws UnknownHostException - { - return Stream.of( InetAddress.getAllByName( host ) ) - .map( address -> new BoltServerAddress( host, address, port ) ) - .collect( toList() ); + return new InetSocketAddress( host, port ); } @Override @@ -153,9 +126,9 @@ public int port() return port; } - public boolean isResolved() + public Set resolved() { - return resolved != null; + return this.resolved; } private static String hostFrom( URI uri ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/DefaultDomainNameResolver.java b/driver/src/main/java/org/neo4j/driver/internal/DefaultDomainNameResolver.java new file mode 100644 index 0000000000..fc94c3d51d --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/DefaultDomainNameResolver.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class DefaultDomainNameResolver implements DomainNameResolver +{ + private static final DefaultDomainNameResolver INSTANCE = new DefaultDomainNameResolver(); + + public static DefaultDomainNameResolver getInstance() + { + return INSTANCE; + } + + private DefaultDomainNameResolver() + { + } + + @Override + public InetAddress[] resolve( String name ) throws UnknownHostException + { + return InetAddress.getAllByName( name ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/DomainNameResolver.java b/driver/src/main/java/org/neo4j/driver/internal/DomainNameResolver.java new file mode 100644 index 0000000000..94d6f613b2 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/DomainNameResolver.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * A resolver function used by the driver to resolve domain names. + */ +@FunctionalInterface +public interface DomainNameResolver +{ + /** + * Resolve the given domain name to a set of addresses. + * + * @param name the name to resolve. + * @return the resolved addresses. + * @throws UnknownHostException must be thrown if the given name can not be resolved to at least one address. + */ + InetAddress[] resolve( String name ) throws UnknownHostException; +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index df6f919b52..0d2bde136f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -127,7 +127,7 @@ protected static MetricsProvider createDriverMetrics( Config config, Clock clock protected ChannelConnector createConnector( ConnectionSettings settings, SecurityPlan securityPlan, Config config, Clock clock, RoutingContext routingContext ) { - return new ChannelConnectorImpl( settings, securityPlan, config.logging(), clock, routingContext ); + return new ChannelConnectorImpl( settings, securityPlan, config.logging(), clock, routingContext, getDomainNameResolver() ); } private InternalDriver createDriver( URI uri, SecurityPlan securityPlan, BoltServerAddress address, ConnectionPool connectionPool, @@ -210,7 +210,7 @@ protected LoadBalancer createLoadBalancer( BoltServerAddress address, Connection LoadBalancingStrategy loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( connectionPool, config.logging() ); ServerAddressResolver resolver = createResolver( config ); return new LoadBalancer( address, routingSettings, connectionPool, eventExecutorGroup, createClock(), - config.logging(), loadBalancingStrategy, resolver ); + config.logging(), loadBalancingStrategy, resolver, getDomainNameResolver() ); } private static ServerAddressResolver createResolver( Config config ) @@ -271,6 +271,17 @@ protected Bootstrap createBootstrap( EventLoopGroup eventLoopGroup ) return BootstrapFactory.newBootstrap( eventLoopGroup ); } + /** + * Provides an instance of {@link DomainNameResolver} that is used for domain name resolution. + *

+ * This method is protected only for testing + * + * @return the instance of {@link DomainNameResolver}. + */ + protected DomainNameResolver getDomainNameResolver() + { + return DefaultDomainNameResolver.getInstance(); + } private static void assertNoRoutingContext( URI uri, RoutingSettings routingSettings ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectorImpl.java index d927b8c9ec..96fd19fb33 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectorImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/ChannelConnectorImpl.java @@ -24,21 +24,22 @@ import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; +import io.netty.resolver.AddressResolverGroup; -import java.util.Map; +import java.net.InetSocketAddress; +import org.neo4j.driver.AuthToken; +import org.neo4j.driver.AuthTokens; +import org.neo4j.driver.Logging; +import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.DomainNameResolver; import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler; import org.neo4j.driver.internal.cluster.RoutingContext; import org.neo4j.driver.internal.security.InternalAuthToken; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.util.Clock; -import org.neo4j.driver.AuthToken; -import org.neo4j.driver.AuthTokens; -import org.neo4j.driver.Logging; -import org.neo4j.driver.Value; -import org.neo4j.driver.exceptions.ClientException; import static java.util.Objects.requireNonNull; @@ -52,15 +53,17 @@ public class ChannelConnectorImpl implements ChannelConnector private final int connectTimeoutMillis; private final Logging logging; private final Clock clock; + private final AddressResolverGroup addressResolverGroup; public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan, Logging logging, - Clock clock, RoutingContext routingContext ) + Clock clock, RoutingContext routingContext, DomainNameResolver domainNameResolver ) { - this( connectionSettings, securityPlan, new ChannelPipelineBuilderImpl(), logging, clock, routingContext ); + this( connectionSettings, securityPlan, new ChannelPipelineBuilderImpl(), logging, clock, routingContext, domainNameResolver ); } public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan, - ChannelPipelineBuilder pipelineBuilder, Logging logging, Clock clock, RoutingContext routingContext ) + ChannelPipelineBuilder pipelineBuilder, Logging logging, Clock clock, RoutingContext routingContext, + DomainNameResolver domainNameResolver ) { this.userAgent = connectionSettings.userAgent(); this.authToken = requireValidAuthToken( connectionSettings.authToken() ); @@ -70,6 +73,7 @@ public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan this.pipelineBuilder = pipelineBuilder; this.logging = requireNonNull( logging ); this.clock = requireNonNull( clock ); + this.addressResolverGroup = new NettyDomainNameResolverGroup( requireNonNull( domainNameResolver ) ); } @Override @@ -77,6 +81,7 @@ public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap ) { bootstrap.option( ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis ); bootstrap.handler( new NettyChannelInitializer( address, securityPlan, connectTimeoutMillis, clock, logging ) ); + bootstrap.resolver( addressResolverGroup ); ChannelFuture channelConnected = bootstrap.connect( address.toSocketAddress() ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/connection/NettyDomainNameResolver.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/NettyDomainNameResolver.java new file mode 100644 index 0000000000..87351e252b --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/NettyDomainNameResolver.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async.connection; + +import io.netty.resolver.InetNameResolver; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Promise; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.List; + +import org.neo4j.driver.internal.DomainNameResolver; + +public class NettyDomainNameResolver extends InetNameResolver +{ + private final DomainNameResolver domainNameResolver; + + public NettyDomainNameResolver( EventExecutor executor, DomainNameResolver domainNameResolver ) + { + super( executor ); + this.domainNameResolver = domainNameResolver; + } + + @Override + protected void doResolve( String inetHost, Promise promise ) + { + try + { + promise.setSuccess( domainNameResolver.resolve( inetHost )[0] ); + } + catch ( UnknownHostException e ) + { + promise.setFailure( e ); + } + } + + @Override + protected void doResolveAll( String inetHost, Promise> promise ) + { + try + { + promise.setSuccess( Arrays.asList( domainNameResolver.resolve( inetHost ) ) ); + } + catch ( UnknownHostException e ) + { + promise.setFailure( e ); + } + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/connection/NettyDomainNameResolverGroup.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/NettyDomainNameResolverGroup.java new file mode 100644 index 0000000000..720e213be6 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/NettyDomainNameResolverGroup.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.async.connection; + +import io.netty.resolver.AddressResolver; +import io.netty.resolver.AddressResolverGroup; +import io.netty.util.concurrent.EventExecutor; + +import java.net.InetSocketAddress; + +import org.neo4j.driver.internal.DomainNameResolver; + +public class NettyDomainNameResolverGroup extends AddressResolverGroup +{ + private final DomainNameResolver domainNameResolver; + + public NettyDomainNameResolverGroup( DomainNameResolver domainNameResolver ) + { + this.domainNameResolver = domainNameResolver; + } + + @Override + protected AddressResolver newResolver( EventExecutor executor ) throws Exception + { + return new NettyDomainNameResolver( executor, domainNameResolver ).asAddressResolver(); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java index b4ab6c731d..c4cc3f2b20 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java @@ -19,6 +19,7 @@ package org.neo4j.driver.internal.cluster; import java.util.Arrays; +import java.util.Iterator; import java.util.Set; import org.neo4j.driver.internal.BoltServerAddress; @@ -39,9 +40,35 @@ public int size() return addresses.length; } - public synchronized void update( Set addresses ) + public synchronized void retainAllAndAdd( Set newAddresses ) { - this.addresses = addresses.toArray( NONE ); + BoltServerAddress[] addressesArr = new BoltServerAddress[newAddresses.size()]; + int insertionIdx = 0; + for ( BoltServerAddress address : addresses ) + { + if ( newAddresses.remove( address ) ) + { + addressesArr[insertionIdx] = address; + insertionIdx++; + } + } + Iterator addressIterator = newAddresses.iterator(); + for ( ; insertionIdx < addressesArr.length && addressIterator.hasNext(); insertionIdx++ ) + { + addressesArr[insertionIdx] = addressIterator.next(); + } + addresses = addressesArr; + } + + public synchronized void replaceIfPresent( BoltServerAddress oldAddress, BoltServerAddress newAddress ) + { + for ( int i = 0; i < addresses.length; i++ ) + { + if ( addresses[i].equals( oldAddress ) ) + { + addresses[i] = newAddress; + } + } } public synchronized void remove( BoltServerAddress address ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java index 7d9f378499..7bddb70c21 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java @@ -45,7 +45,9 @@ private ClusterComposition( long expirationTimestamp ) this.expirationTimestamp = expirationTimestamp; } - /** For testing */ + /** + * For testing + */ public ClusterComposition( long expirationTimestamp, Set readers, @@ -83,7 +85,8 @@ public Set routers() return new LinkedHashSet<>( routers ); } - public long expirationTimestamp() { + public long expirationTimestamp() + { return this.expirationTimestamp; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionLookupResult.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionLookupResult.java new file mode 100644 index 0000000000..a374918089 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionLookupResult.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import java.util.Optional; +import java.util.Set; + +import org.neo4j.driver.internal.BoltServerAddress; + +public class ClusterCompositionLookupResult +{ + private final ClusterComposition composition; + + private final Set resolvedInitialRouters; + + public ClusterCompositionLookupResult( ClusterComposition composition ) + { + this( composition, null ); + } + + public ClusterCompositionLookupResult( ClusterComposition composition, Set resolvedInitialRouters ) + { + this.composition = composition; + this.resolvedInitialRouters = resolvedInitialRouters; + } + + public ClusterComposition getClusterComposition() + { + return composition; + } + + public Optional> getResolvedInitialRouters() + { + return Optional.ofNullable( resolvedInitialRouters ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java index 6cb925a3ff..3604a5ffc9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java @@ -47,7 +47,7 @@ public class ClusterRoutingTable implements RoutingTable public ClusterRoutingTable( DatabaseName ofDatabase, Clock clock, BoltServerAddress... routingAddresses ) { this( ofDatabase, clock ); - routers.update( new LinkedHashSet<>( asList( routingAddresses ) ) ); + routers.retainAllAndAdd( new LinkedHashSet<>( asList( routingAddresses ) ) ); } private ClusterRoutingTable( DatabaseName ofDatabase, Clock clock ) @@ -86,9 +86,9 @@ public boolean hasBeenStaleFor( long extraTime ) public synchronized void update( ClusterComposition cluster ) { expirationTimestamp = cluster.expirationTimestamp(); - readers.update( cluster.readers() ); - writers.update( cluster.writers() ); - routers.update( cluster.routers() ); + readers.retainAllAndAdd( cluster.readers() ); + writers.retainAllAndAdd( cluster.writers() ); + routers.retainAllAndAdd( cluster.routers() ); preferInitialRouter = !cluster.hasWriters(); } @@ -140,6 +140,12 @@ public void forgetWriter( BoltServerAddress toRemove ) writers.remove( toRemove ); } + @Override + public void replaceRouterIfPresent( BoltServerAddress oldRouter, BoltServerAddress newRouter ) + { + routers.replaceIfPresent( oldRouter, newRouter ); + } + @Override public boolean preferInitialRouter() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java index c9288c54cd..5faea2186b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal.cluster; +import java.net.UnknownHostException; import java.util.List; import java.util.concurrent.CompletionStage; @@ -27,7 +28,7 @@ public interface Rediscovery { - CompletionStage lookupClusterComposition( RoutingTable routingTable, ConnectionPool connectionPool, Bookmark bookmark ); + CompletionStage lookupClusterComposition( RoutingTable routingTable, ConnectionPool connectionPool, Bookmark bookmark ); - List resolve(); + List resolve() throws UnknownHostException; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java index 777d97c213..e73d9b74ad 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java @@ -21,14 +21,18 @@ import io.netty.util.concurrent.EventExecutorGroup; import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; +import java.util.stream.Collectors; import org.neo4j.driver.Bookmark; import org.neo4j.driver.Logger; @@ -37,15 +41,16 @@ import org.neo4j.driver.exceptions.SecurityException; import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.internal.BoltServerAddress; -import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.DomainNameResolver; import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.util.Futures; +import org.neo4j.driver.net.ServerAddress; import org.neo4j.driver.net.ServerAddressResolver; import static java.lang.String.format; import static java.util.Collections.emptySet; +import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; -import static java.util.stream.Collectors.toList; import static org.neo4j.driver.internal.util.Futures.completedWithNull; import static org.neo4j.driver.internal.util.Futures.failedFuture; @@ -64,9 +69,10 @@ public class RediscoveryImpl implements Rediscovery private final ClusterCompositionProvider provider; private final ServerAddressResolver resolver; private final EventExecutorGroup eventExecutorGroup; + private final DomainNameResolver domainNameResolver; public RediscoveryImpl( BoltServerAddress initialRouter, RoutingSettings settings, ClusterCompositionProvider provider, - EventExecutorGroup eventExecutorGroup, ServerAddressResolver resolver, Logger logger ) + EventExecutorGroup eventExecutorGroup, ServerAddressResolver resolver, Logger logger, DomainNameResolver domainNameResolver ) { this.initialRouter = initialRouter; this.settings = settings; @@ -74,20 +80,22 @@ public RediscoveryImpl( BoltServerAddress initialRouter, RoutingSettings setting this.provider = provider; this.resolver = resolver; this.eventExecutorGroup = eventExecutorGroup; + this.domainNameResolver = requireNonNull( domainNameResolver ); } /** - * Given a database and its current routing table, and the global connection pool, use the global cluster composition provider to fetch a new - * cluster composition, which would be used to update the routing table of the given database and global connection pool. + * Given a database and its current routing table, and the global connection pool, use the global cluster composition provider to fetch a new cluster + * composition, which would be used to update the routing table of the given database and global connection pool. * - * @param routingTable current routing table of the given database. + * @param routingTable current routing table of the given database. * @param connectionPool connection pool. - * @return new cluster composition. + * @return new cluster composition and an optional set of resolved initial router addresses. */ @Override - public CompletionStage lookupClusterComposition( RoutingTable routingTable, ConnectionPool connectionPool, Bookmark bookmark ) + public CompletionStage lookupClusterComposition( RoutingTable routingTable, ConnectionPool connectionPool, + Bookmark bookmark ) { - CompletableFuture result = new CompletableFuture<>(); + CompletableFuture result = new CompletableFuture<>(); // if we failed discovery, we will chain all errors into this one. ServiceUnavailableException baseError = new ServiceUnavailableException( String.format( NO_ROUTERS_AVAILABLE, routingTable.database().description() ) ); lookupClusterComposition( routingTable, connectionPool, 0, 0, result, bookmark, baseError ); @@ -95,43 +103,47 @@ public CompletionStage lookupClusterComposition( RoutingTabl } private void lookupClusterComposition( RoutingTable routingTable, ConnectionPool pool, - int failures, long previousDelay, CompletableFuture result, Bookmark bookmark, Throwable baseError ) + int failures, long previousDelay, CompletableFuture result, Bookmark bookmark, + Throwable baseError ) { - lookup( routingTable, pool, bookmark, baseError ).whenComplete( ( composition, completionError ) -> - { - Throwable error = Futures.completionExceptionCause( completionError ); - if ( error != null ) - { - result.completeExceptionally( error ); - } - else if ( composition != null ) - { - result.complete( composition ); - } - else - { - int newFailures = failures + 1; - if ( newFailures >= settings.maxRoutingFailures() ) - { - // now we throw our saved error out - result.completeExceptionally( baseError ); - } - else - { - long nextDelay = Math.max( settings.retryTimeoutDelay(), previousDelay * 2 ); - logger.info( "Unable to fetch new routing table, will try again in " + nextDelay + "ms" ); - eventExecutorGroup.next().schedule( - () -> lookupClusterComposition( routingTable, pool, newFailures, nextDelay, result, bookmark, baseError ), - nextDelay, TimeUnit.MILLISECONDS - ); - } - } - } ); + lookup( routingTable, pool, bookmark, baseError ) + .whenComplete( + ( compositionLookupResult, completionError ) -> + { + Throwable error = Futures.completionExceptionCause( completionError ); + if ( error != null ) + { + result.completeExceptionally( error ); + } + else if ( compositionLookupResult != null ) + { + result.complete( compositionLookupResult ); + } + else + { + int newFailures = failures + 1; + if ( newFailures >= settings.maxRoutingFailures() ) + { + // now we throw our saved error out + result.completeExceptionally( baseError ); + } + else + { + long nextDelay = Math.max( settings.retryTimeoutDelay(), previousDelay * 2 ); + logger.info( "Unable to fetch new routing table, will try again in " + nextDelay + "ms" ); + eventExecutorGroup.next().schedule( + () -> lookupClusterComposition( routingTable, pool, newFailures, nextDelay, result, bookmark, baseError ), + nextDelay, TimeUnit.MILLISECONDS + ); + } + } + } ); } - private CompletionStage lookup( RoutingTable routingTable, ConnectionPool connectionPool, Bookmark bookmark, Throwable baseError ) + private CompletionStage lookup( RoutingTable routingTable, ConnectionPool connectionPool, Bookmark bookmark, + Throwable baseError ) { - CompletionStage compositionStage; + CompletionStage compositionStage; if ( routingTable.preferInitialRouter() ) { @@ -145,109 +157,132 @@ private CompletionStage lookup( RoutingTable routingTable, C return compositionStage; } - private CompletionStage lookupOnKnownRoutersThenOnInitialRouter( RoutingTable routingTable, ConnectionPool connectionPool, - Bookmark bookmark, Throwable baseError ) + private CompletionStage lookupOnKnownRoutersThenOnInitialRouter( RoutingTable routingTable, ConnectionPool connectionPool, + Bookmark bookmark, Throwable baseError ) { Set seenServers = new HashSet<>(); - return lookupOnKnownRouters( routingTable, connectionPool, seenServers, bookmark, baseError ).thenCompose( composition -> - { - if ( composition != null ) - { - return completedFuture( composition ); - } - return lookupOnInitialRouter( routingTable, connectionPool, seenServers, bookmark, baseError ); - } ); + return lookupOnKnownRouters( routingTable, connectionPool, seenServers, bookmark, baseError ) + .thenCompose( + compositionLookupResult -> + { + if ( compositionLookupResult != null ) + { + return completedFuture( + compositionLookupResult ); + } + return lookupOnInitialRouter( + routingTable, connectionPool, + seenServers, bookmark, + baseError ); + } ); } - private CompletionStage lookupOnInitialRouterThenOnKnownRouters( RoutingTable routingTable, - ConnectionPool connectionPool, Bookmark bookmark, Throwable baseError ) + private CompletionStage lookupOnInitialRouterThenOnKnownRouters( RoutingTable routingTable, + ConnectionPool connectionPool, Bookmark bookmark, + Throwable baseError ) { Set seenServers = emptySet(); - return lookupOnInitialRouter( routingTable, connectionPool, seenServers, bookmark, baseError ).thenCompose( composition -> - { - if ( composition != null ) - { - return completedFuture( composition ); - } - return lookupOnKnownRouters( routingTable, connectionPool, new HashSet<>(), bookmark, baseError ); - } ); + return lookupOnInitialRouter( routingTable, connectionPool, seenServers, bookmark, baseError ) + .thenCompose( + compositionLookupResult -> + { + if ( compositionLookupResult != null ) + { + return completedFuture( + compositionLookupResult ); + } + return lookupOnKnownRouters( + routingTable, connectionPool, + new HashSet<>(), bookmark, + baseError ); + } ); } - private CompletionStage lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connectionPool, Set seenServers, Bookmark bookmark, - Throwable baseError ) + private CompletionStage lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connectionPool, + Set seenServers, Bookmark bookmark, + Throwable baseError ) { BoltServerAddress[] addresses = routingTable.routers().toArray(); CompletableFuture result = completedWithNull(); for ( BoltServerAddress address : addresses ) { - result = result.thenCompose( composition -> - { - if ( composition != null ) - { - return completedFuture( composition ); - } - else - { - return lookupOnRouter( address, routingTable, connectionPool, bookmark, baseError ) - .whenComplete( ( ignore, error ) -> seenServers.add( address ) ); - } - } ); + result = result + .thenCompose( + composition -> + { + if ( composition != null ) + { + return completedFuture( composition ); + } + else + { + return lookupOnRouter( address, true, routingTable, connectionPool, seenServers, bookmark, baseError ); + } + } ); } - return result; + return result.thenApply( composition -> composition != null ? new ClusterCompositionLookupResult( composition ) : null ); } - private CompletionStage lookupOnInitialRouter( RoutingTable routingTable, ConnectionPool connectionPool, Set seenServers, Bookmark bookmark, - Throwable baseError ) + private CompletionStage lookupOnInitialRouter( RoutingTable routingTable, ConnectionPool connectionPool, + Set seenServers, Bookmark bookmark, + Throwable baseError ) { - List addresses; + List resolvedRouters; try { - addresses = resolve(); + resolvedRouters = resolve(); } catch ( Throwable error ) { return failedFuture( error ); } - addresses.removeAll( seenServers ); + Set resolvedRouterSet = new HashSet<>( resolvedRouters ); + resolvedRouters.removeAll( seenServers ); CompletableFuture result = completedWithNull(); - for ( BoltServerAddress address : addresses ) + for ( BoltServerAddress address : resolvedRouters ) { - result = result.thenCompose( composition -> - { - if ( composition != null ) - { - return completedFuture( composition ); - } - return lookupOnRouter( address, routingTable, connectionPool, bookmark, baseError ); - } ); + result = result.thenCompose( + composition -> + { + if ( composition != null ) + { + return completedFuture( composition ); + } + return lookupOnRouter( address, false, routingTable, connectionPool, null, bookmark, baseError ); + } ); } - return result; + return result.thenApply( composition -> composition != null ? new ClusterCompositionLookupResult( composition, resolvedRouterSet ) : null ); } - private CompletionStage lookupOnRouter( BoltServerAddress routerAddress, - RoutingTable routingTable, ConnectionPool connectionPool, Bookmark bookmark, Throwable baseError ) + private CompletionStage lookupOnRouter( BoltServerAddress routerAddress, boolean resolveAddress, + RoutingTable routingTable, ConnectionPool connectionPool, + Set seenServers, Bookmark bookmark, Throwable baseError ) { - CompletionStage connectionStage = connectionPool.acquire( routerAddress ); + CompletableFuture addressFuture = CompletableFuture.completedFuture( routerAddress ); - return connectionStage + return addressFuture + .thenApply( address -> resolveAddress ? resolveByDomainNameOrThrowCompletionException( address, routingTable ) : address ) + .thenApply( address -> addAndReturn( seenServers, address ) ) + .thenCompose( connectionPool::acquire ) .thenCompose( connection -> provider.getClusterComposition( connection, routingTable.database(), bookmark ) ) - .handle( ( response, error ) -> { - Throwable cause = Futures.completionExceptionCause( error ); - if ( cause != null ) - { - return handleRoutingProcedureError( cause, routingTable, routerAddress, baseError ); - } - else - { - return response; - } - } ); + .handle( ( response, error ) -> + { + Throwable cause = Futures.completionExceptionCause( error ); + if ( cause != null ) + { + return handleRoutingProcedureError( cause, routingTable, routerAddress, baseError ); + } + else + { + return response; + } + } ); } private ClusterComposition handleRoutingProcedureError( Throwable error, RoutingTable routingTable, - BoltServerAddress routerAddress, Throwable baseError ) + BoltServerAddress routerAddress, Throwable baseError ) { if ( error instanceof SecurityException || error instanceof FatalDiscoveryException ) { @@ -265,24 +300,66 @@ private ClusterComposition handleRoutingProcedureError( Throwable error, Routing } @Override - public List resolve() + public List resolve() throws UnknownHostException + { + List resolvedAddresses = new LinkedList<>(); + UnknownHostException exception = null; + for ( ServerAddress serverAddress : resolver.resolve( initialRouter ) ) + { + try + { + resolvedAddresses.addAll( resolveAllByDomainName( BoltServerAddress.from( serverAddress ) ) ); + } + catch ( UnknownHostException e ) + { + if ( exception == null ) + { + exception = e; + } + else + { + exception.addSuppressed( e ); + } + } + } + + // give up only if there are no addresses to work with at all + if ( resolvedAddresses.isEmpty() && exception != null ) + { + throw exception; + } + + return resolvedAddresses; + } + + private T addAndReturn( Collection collection, T element ) { - return resolver.resolve( initialRouter ) - .stream() - .map( BoltServerAddress::from ) - .collect( toList() ); // collect to list to preserve the order + if ( collection != null ) + { + collection.add( element ); + } + return element; } - private Stream resolveAll( BoltServerAddress address ) + private BoltServerAddress resolveByDomainNameOrThrowCompletionException( BoltServerAddress address, RoutingTable routingTable ) { try { - return address.resolveAll().stream(); + Set resolvedAddresses = resolveAllByDomainName( address ); + routingTable.replaceRouterIfPresent( address, new BoltServerAddress( address.host(), address.port(), resolvedAddresses ) ); + return resolvedAddresses.stream().findFirst().orElseThrow( + () -> new IllegalStateException( "Domain name resolution returned empty result set and has not thrown an exception" ) ); } - catch ( UnknownHostException e ) + catch ( Throwable e ) { - logger.error( "Failed to resolve address `" + address + "` to IPs due to error: " + e.getMessage(), e ); - return Stream.of( address ); + throw new CompletionException( e ); } } + + private Set resolveAllByDomainName( BoltServerAddress address ) throws UnknownHostException + { + return Arrays.stream( domainNameResolver.resolve( address.host() ) ) + .map( inetAddress -> new BoltServerAddress( inetAddress.getHostAddress(), address.port() ) ) + .collect( Collectors.toCollection( LinkedHashSet::new ) ); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java index 77e4f8ea49..7fa7000bda 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java @@ -20,8 +20,8 @@ import java.util.Set; -import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.AccessMode; +import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DatabaseName; public interface RoutingTable @@ -46,5 +46,7 @@ public interface RoutingTable void forgetWriter( BoltServerAddress toRemove ); + void replaceRouterIfPresent( BoltServerAddress oldRouter, BoltServerAddress newRouter ); + boolean preferInitialRouter(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java index 27cd03202c..7226ce6685 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerImpl.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal.cluster; +import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -41,6 +43,7 @@ public class RoutingTableHandlerImpl implements RoutingTableHandler private final Rediscovery rediscovery; private final Logger log; private final long routingTablePurgeDelayMs; + private final Set resolvedInitialRouters = new HashSet<>(); public RoutingTableHandlerImpl( RoutingTable routingTable, Rediscovery rediscovery, ConnectionPool connectionPool, RoutingTableRegistry routingTableRegistry, Logger log, long routingTablePurgeDelayMs ) @@ -105,13 +108,27 @@ else if ( routingTable.isStaleFor( context.mode() ) ) } } - private synchronized void freshClusterCompositionFetched( ClusterComposition composition ) + private synchronized void freshClusterCompositionFetched( ClusterCompositionLookupResult composition ) { try { - routingTable.update( composition ); + routingTable.update( composition.getClusterComposition() ); routingTableRegistry.removeAged(); - connectionPool.retainAll( routingTableRegistry.allServers() ); + + Set addressesToRetain = new LinkedHashSet<>(); + for ( BoltServerAddress address : routingTableRegistry.allServers() ) + { + addressesToRetain.add( address ); + addressesToRetain.addAll( address.resolved() ); + } + composition.getResolvedInitialRouters().ifPresent( + addresses -> + { + resolvedInitialRouters.clear(); + resolvedInitialRouters.addAll( addresses ); + } ); + addressesToRetain.addAll( resolvedInitialRouters ); + connectionPool.retainAll( addressesToRetain ); log.debug( "Updated routing table for database '%s'. %s", databaseName.description(), routingTable ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java index 34b4dce032..3a6bd6683f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java @@ -31,6 +31,7 @@ import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.internal.BoltServerAddress; +import org.neo4j.driver.internal.DomainNameResolver; import org.neo4j.driver.internal.async.ConnectionContext; import org.neo4j.driver.internal.async.connection.RoutingConnection; import org.neo4j.driver.internal.cluster.AddressSet; @@ -50,6 +51,7 @@ import org.neo4j.driver.net.ServerAddressResolver; import static java.lang.String.format; +import static java.util.Objects.requireNonNull; import static org.neo4j.driver.internal.async.ImmutableConnectionContext.simple; import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsMultiDatabase; import static org.neo4j.driver.internal.util.Futures.completedWithNull; @@ -68,22 +70,24 @@ public class LoadBalancer implements ConnectionProvider private final Rediscovery rediscovery; public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connectionPool, - EventExecutorGroup eventExecutorGroup, Clock clock, Logging logging, - LoadBalancingStrategy loadBalancingStrategy, ServerAddressResolver resolver ) + EventExecutorGroup eventExecutorGroup, Clock clock, Logging logging, + LoadBalancingStrategy loadBalancingStrategy, ServerAddressResolver resolver, DomainNameResolver domainNameResolver ) { - this( connectionPool, createRediscovery( eventExecutorGroup, initialRouter, resolver, settings, clock, logging ), settings, loadBalancingStrategy, - eventExecutorGroup, clock, loadBalancerLogger( logging ) ); + this( connectionPool, createRediscovery( eventExecutorGroup, initialRouter, resolver, settings, clock, logging, requireNonNull( domainNameResolver ) ), + settings, + loadBalancingStrategy, + eventExecutorGroup, clock, loadBalancerLogger( logging ) ); } private LoadBalancer( ConnectionPool connectionPool, Rediscovery rediscovery, RoutingSettings settings, LoadBalancingStrategy loadBalancingStrategy, - EventExecutorGroup eventExecutorGroup, Clock clock, Logger log ) + EventExecutorGroup eventExecutorGroup, Clock clock, Logger log ) { this( connectionPool, createRoutingTables( connectionPool, rediscovery, settings, clock, log ), rediscovery, loadBalancingStrategy, eventExecutorGroup, - log ); + log ); } LoadBalancer( ConnectionPool connectionPool, RoutingTableRegistry routingTables, Rediscovery rediscovery, LoadBalancingStrategy loadBalancingStrategy, - EventExecutorGroup eventExecutorGroup, Logger log ) + EventExecutorGroup eventExecutorGroup, Logger log ) { this.connectionPool = connectionPool; this.routingTables = routingTables; @@ -252,11 +256,11 @@ private static RoutingTableRegistry createRoutingTables( ConnectionPool connecti } private static Rediscovery createRediscovery( EventExecutorGroup eventExecutorGroup, BoltServerAddress initialRouter, ServerAddressResolver resolver, - RoutingSettings settings, Clock clock, Logging logging ) + RoutingSettings settings, Clock clock, Logging logging, DomainNameResolver domainNameResolver ) { Logger log = loadBalancerLogger( logging ); ClusterCompositionProvider clusterCompositionProvider = new RoutingProcedureClusterCompositionProvider( clock, settings.routingContext() ); - return new RediscoveryImpl( initialRouter, settings, clusterCompositionProvider, eventExecutorGroup, resolver, log ); + return new RediscoveryImpl( initialRouter, settings, clusterCompositionProvider, eventExecutorGroup, resolver, log, domainNameResolver ); } private static Logger loadBalancerLogger( Logging logging ) diff --git a/driver/src/test/java/org/neo4j/driver/integration/ChannelConnectorImplIT.java b/driver/src/test/java/org/neo4j/driver/integration/ChannelConnectorImplIT.java index a8b6c8051b..a939ebebde 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/ChannelConnectorImplIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/ChannelConnectorImplIT.java @@ -42,14 +42,15 @@ import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.DefaultDomainNameResolver; import org.neo4j.driver.internal.RevocationStrategy; import org.neo4j.driver.internal.async.connection.BootstrapFactory; import org.neo4j.driver.internal.async.connection.ChannelConnector; import org.neo4j.driver.internal.async.connection.ChannelConnectorImpl; import org.neo4j.driver.internal.async.inbound.ConnectTimeoutHandler; import org.neo4j.driver.internal.cluster.RoutingContext; -import org.neo4j.driver.internal.security.SecurityPlanImpl; import org.neo4j.driver.internal.security.SecurityPlan; +import org.neo4j.driver.internal.security.SecurityPlanImpl; import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.util.DatabaseExtension; import org.neo4j.driver.util.ParallelizableIT; @@ -233,7 +234,8 @@ private ChannelConnectorImpl newConnector( AuthToken authToken, SecurityPlan sec int connectTimeoutMillis ) { ConnectionSettings settings = new ConnectionSettings( authToken, "test", connectTimeoutMillis ); - return new ChannelConnectorImpl( settings, securityPlan, DEV_NULL_LOGGING, new FakeClock(), RoutingContext.EMPTY ); + return new ChannelConnectorImpl( settings, securityPlan, DEV_NULL_LOGGING, new FakeClock(), RoutingContext.EMPTY, + DefaultDomainNameResolver.getInstance() ); } private static SecurityPlan trustAllCertificates() throws GeneralSecurityException diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplIT.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplIT.java index 12b1e493f8..5c219e33fd 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplIT.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImplIT.java @@ -30,6 +30,7 @@ import org.neo4j.driver.exceptions.ServiceUnavailableException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.DefaultDomainNameResolver; import org.neo4j.driver.internal.async.connection.BootstrapFactory; import org.neo4j.driver.internal.async.connection.ChannelConnector; import org.neo4j.driver.internal.async.connection.ChannelConnectorImpl; @@ -147,7 +148,7 @@ private ConnectionPoolImpl newPool() throws Exception FakeClock clock = new FakeClock(); ConnectionSettings connectionSettings = new ConnectionSettings( neo4j.authToken(), "test", 5000 ); ChannelConnector connector = new ChannelConnectorImpl( connectionSettings, SecurityPlanImpl.insecure(), - DEV_NULL_LOGGING, clock, RoutingContext.EMPTY ); + DEV_NULL_LOGGING, clock, RoutingContext.EMPTY, DefaultDomainNameResolver.getInstance() ); PoolSettings poolSettings = newSettings(); Bootstrap bootstrap = BootstrapFactory.newBootstrap( 1 ); return new ConnectionPoolImpl( connector, bootstrap, poolSettings, DEV_NULL_METRICS, DEV_NULL_LOGGING, clock, true ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolIT.java b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolIT.java index 693738af70..52d8d935d6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolIT.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/pool/NettyChannelPoolIT.java @@ -35,11 +35,12 @@ import org.neo4j.driver.Value; import org.neo4j.driver.exceptions.AuthenticationException; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.DefaultDomainNameResolver; import org.neo4j.driver.internal.async.connection.BootstrapFactory; import org.neo4j.driver.internal.async.connection.ChannelConnectorImpl; import org.neo4j.driver.internal.cluster.RoutingContext; -import org.neo4j.driver.internal.security.SecurityPlanImpl; import org.neo4j.driver.internal.security.InternalAuthToken; +import org.neo4j.driver.internal.security.SecurityPlanImpl; import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.internal.util.ImmediateSchedulingEventExecutor; import org.neo4j.driver.util.DatabaseExtension; @@ -184,7 +185,7 @@ private NettyChannelPool newPool( AuthToken authToken, int maxConnections ) { ConnectionSettings settings = new ConnectionSettings( authToken, "test", 5_000 ); ChannelConnectorImpl connector = new ChannelConnectorImpl( settings, SecurityPlanImpl.insecure(), DEV_NULL_LOGGING, - new FakeClock(), RoutingContext.EMPTY ); + new FakeClock(), RoutingContext.EMPTY, DefaultDomainNameResolver.getInstance() ); return new NettyChannelPool( neo4j.address(), connector, bootstrap, poolHandler, ChannelHealthChecker.ACTIVE, 1_000, maxConnections ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java index e078b0b8f5..57b80fac8a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java @@ -37,7 +37,7 @@ void shouldPreserveOrderWhenAdding() throws Exception Set servers = addresses( "one", "two", "tre" ); AddressSet set = new AddressSet(); - set.update( servers ); + set.retainAllAndAdd( servers ); assertArrayEquals( new BoltServerAddress[]{ new BoltServerAddress( "one" ), @@ -46,7 +46,7 @@ void shouldPreserveOrderWhenAdding() throws Exception // when servers.add( new BoltServerAddress( "fyr" ) ); - set.update( servers ); + set.retainAllAndAdd( servers ); // then assertArrayEquals( new BoltServerAddress[]{ @@ -62,7 +62,7 @@ void shouldPreserveOrderWhenRemoving() throws Exception // given Set servers = addresses( "one", "two", "tre" ); AddressSet set = new AddressSet(); - set.update( servers ); + set.retainAllAndAdd( servers ); assertArrayEquals( new BoltServerAddress[]{ new BoltServerAddress( "one" ), @@ -84,7 +84,7 @@ void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception // given Set servers = addresses( "one", "two", "tre" ); AddressSet set = new AddressSet(); - set.update( servers ); + set.retainAllAndAdd( servers ); assertArrayEquals( new BoltServerAddress[]{ new BoltServerAddress( "one" ), @@ -93,7 +93,7 @@ void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception // when servers.remove( new BoltServerAddress( "one" ) ); - set.update( servers ); + set.retainAllAndAdd( servers ); // then assertArrayEquals( new BoltServerAddress[]{ @@ -115,7 +115,7 @@ void shouldExposeEmptyArrayWhenEmpty() void shouldExposeCorrectArray() { AddressSet addressSet = new AddressSet(); - addressSet.update( addresses( "one", "two", "tre" ) ); + addressSet.retainAllAndAdd( addresses( "one", "two", "tre" ) ); BoltServerAddress[] addresses = addressSet.toArray(); @@ -137,7 +137,7 @@ void shouldHaveSizeZeroWhenEmpty() void shouldHaveCorrectSize() { AddressSet addressSet = new AddressSet(); - addressSet.update( addresses( "one", "two" ) ); + addressSet.retainAllAndAdd( addresses( "one", "two" ) ); assertEquals( 2, addressSet.size() ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java index 19b1174909..80f20af27b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java @@ -23,6 +23,8 @@ import org.mockito.ArgumentCaptor; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,6 +37,8 @@ import org.neo4j.driver.exceptions.SessionExpiredException; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.DatabaseName; +import org.neo4j.driver.internal.DefaultDomainNameResolver; +import org.neo4j.driver.internal.DomainNameResolver; import org.neo4j.driver.internal.InternalBookmark; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; @@ -50,7 +54,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -90,7 +93,7 @@ void shouldUseFirstRouterInTable() Rediscovery rediscovery = newRediscovery( A, compositionProvider, mock( ServerAddressResolver.class ) ); RoutingTable table = routingTableMock( B ); - ClusterComposition actualComposition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ); + ClusterComposition actualComposition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ).getClusterComposition(); assertEquals( expectedComposition, actualComposition ); verify( table, never() ).forget( B ); @@ -111,7 +114,7 @@ void shouldSkipFailingRouters() Rediscovery rediscovery = newRediscovery( A, compositionProvider, mock( ServerAddressResolver.class ) ); RoutingTable table = routingTableMock( A, B, C ); - ClusterComposition actualComposition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ); + ClusterComposition actualComposition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ).getClusterComposition(); assertEquals( expectedComposition, actualComposition ); verify( table ).forget( A ); @@ -156,7 +159,7 @@ void shouldFallbackToInitialRouterWhenKnownRoutersFail() Rediscovery rediscovery = newRediscovery( initialRouter, compositionProvider, resolver ); RoutingTable table = routingTableMock( B, C ); - ClusterComposition actualComposition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ); + ClusterComposition actualComposition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ).getClusterComposition(); assertEquals( expectedComposition, actualComposition ); verify( table ).forget( B ); @@ -181,7 +184,7 @@ void shouldFailImmediatelyWhenClusterCompositionProviderReturnsFailure() RoutingTable table = routingTableMock( B, C ); // When - ClusterComposition composition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ); + ClusterComposition composition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ).getClusterComposition(); assertEquals( validComposition, composition ); ArgumentCaptor argument = ArgumentCaptor.forClass( DiscoveryException.class ); @@ -208,7 +211,7 @@ void shouldResolveInitialRouterAddress() Rediscovery rediscovery = newRediscovery( initialRouter, compositionProvider, resolver ); RoutingTable table = routingTableMock( B, C ); - ClusterComposition actualComposition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ); + ClusterComposition actualComposition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ).getClusterComposition(); assertEquals( expectedComposition, actualComposition ); verify( table ).forget( B ); @@ -237,7 +240,7 @@ void shouldResolveInitialRouterAddressUsingCustomResolver() Rediscovery rediscovery = newRediscovery( A, compositionProvider, resolver ); RoutingTable table = routingTableMock( B, C ); - ClusterComposition actualComposition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ); + ClusterComposition actualComposition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ).getClusterComposition(); assertEquals( expectedComposition, actualComposition ); verify( table ).forget( B ); @@ -308,7 +311,7 @@ void shouldUseInitialRouterAfterDiscoveryReturnsNoWriters() RoutingTable table = new ClusterRoutingTable( defaultDatabase(), new FakeClock() ); table.update( noWritersComposition ); - ClusterComposition composition2 = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ); + ClusterComposition composition2 = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ).getClusterComposition(); assertEquals( validComposition, composition2 ); } @@ -327,7 +330,7 @@ void shouldUseInitialRouterToStartWith() Rediscovery rediscovery = newRediscovery( initialRouter, compositionProvider, resolver ); RoutingTable table = routingTableMock( true, B, C, D ); - ClusterComposition composition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ); + ClusterComposition composition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ).getClusterComposition(); assertEquals( validComposition, composition ); } @@ -348,7 +351,7 @@ void shouldUseKnownRoutersWhenInitialRouterFails() Rediscovery rediscovery = newRediscovery( initialRouter, compositionProvider, resolver ); RoutingTable table = routingTableMock( true, D, E ); - ClusterComposition composition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ); + ClusterComposition composition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ).getClusterComposition(); assertEquals( validComposition, composition ); verify( table ).forget( initialRouter ); verify( table ).forget( D ); @@ -375,10 +378,11 @@ void shouldRetryConfiguredNumberOfTimesWithDelay() ImmediateSchedulingEventExecutor eventExecutor = new ImmediateSchedulingEventExecutor(); RoutingSettings settings = new RoutingSettings( maxRoutingFailures, retryTimeoutDelay, 0 ); - Rediscovery rediscovery = new RediscoveryImpl( A, settings, compositionProvider, eventExecutor, resolver, DEV_NULL_LOGGER ); - RoutingTable table = routingTableMock(A, B ); + Rediscovery rediscovery = + new RediscoveryImpl( A, settings, compositionProvider, eventExecutor, resolver, DEV_NULL_LOGGER, DefaultDomainNameResolver.getInstance() ); + RoutingTable table = routingTableMock( A, B ); - ClusterComposition actualComposition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ); + ClusterComposition actualComposition = await( rediscovery.lookupClusterComposition( table, pool, empty() ) ).getClusterComposition(); assertEquals( expectedComposition, actualComposition ); verify( table, times( maxRoutingFailures ) ).forget( A ); @@ -399,7 +403,8 @@ void shouldNotLogWhenSingleRetryAttemptFails() ImmediateSchedulingEventExecutor eventExecutor = new ImmediateSchedulingEventExecutor(); RoutingSettings settings = new RoutingSettings( maxRoutingFailures, retryTimeoutDelay, 0 ); Logger logger = mock( Logger.class ); - Rediscovery rediscovery = new RediscoveryImpl( A, settings, compositionProvider, eventExecutor, resolver, logger ); + Rediscovery rediscovery = + new RediscoveryImpl( A, settings, compositionProvider, eventExecutor, resolver, logger, DefaultDomainNameResolver.getInstance() ); RoutingTable table = routingTableMock( A ); ServiceUnavailableException e = @@ -412,16 +417,20 @@ void shouldNotLogWhenSingleRetryAttemptFails() } @Test - void shouldNotResolveToIPs() + void shouldResolveToIP() throws UnknownHostException { ServerAddressResolver resolver = resolverMock( A, A ); - Rediscovery rediscovery = new RediscoveryImpl( A, null, null, null, resolver, null ); + DomainNameResolver domainNameResolver = mock( DomainNameResolver.class ); + InetAddress localhost = InetAddress.getLocalHost(); + when( domainNameResolver.resolve( A.host() ) ).thenReturn( new InetAddress[]{localhost} ); + Rediscovery rediscovery = new RediscoveryImpl( A, null, null, null, resolver, null, domainNameResolver ); List addresses = rediscovery.resolve(); verify( resolver, times( 1 ) ).resolve( A ); + verify( domainNameResolver, times( 1 ) ).resolve( A.host() ); assertEquals( 1, addresses.size() ); - assertFalse( addresses.get( 0 ).isResolved() ); + assertEquals( addresses.get( 0 ), new BoltServerAddress( localhost.getHostAddress(), A.port() ) ); } private Rediscovery newRediscovery( BoltServerAddress initialRouter, ClusterCompositionProvider compositionProvider, @@ -434,7 +443,8 @@ private Rediscovery newRediscovery( BoltServerAddress initialRouter, ClusterComp ServerAddressResolver resolver, Logger logger ) { RoutingSettings settings = new RoutingSettings( 1, 0, 0 ); - return new RediscoveryImpl( initialRouter, settings, compositionProvider, GlobalEventExecutor.INSTANCE, resolver, logger ); + return new RediscoveryImpl( initialRouter, settings, compositionProvider, GlobalEventExecutor.INSTANCE, resolver, logger, + DefaultDomainNameResolver.getInstance() ); } @SuppressWarnings( "unchecked" ) @@ -494,7 +504,7 @@ private static RoutingTable routingTableMock( boolean preferInitialRouter, BoltS { RoutingTable routingTable = mock( RoutingTable.class ); AddressSet addressSet = new AddressSet(); - addressSet.update( asOrderedSet( routers ) ); + addressSet.retainAllAndAdd( asOrderedSet( routers ) ); when( routingTable.routers() ).thenReturn( addressSet ); when( routingTable.database() ).thenReturn( defaultDatabase() ); when( routingTable.preferInitialRouter() ).thenReturn( preferInitialRouter ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java index e7763d74e2..85cd88be53 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java @@ -111,7 +111,7 @@ void acquireShouldUpdateRoutingTableWhenKnownRoutingTableIsStale() ClusterComposition clusterComposition = new ClusterComposition( 42, readers, writers, routers ); Rediscovery rediscovery = mock( RediscoveryImpl.class ); when( rediscovery.lookupClusterComposition( eq( routingTable ), eq( connectionPool ), any() ) ) - .thenReturn( completedFuture( clusterComposition ) ); + .thenReturn( completedFuture( new ClusterCompositionLookupResult( clusterComposition ) ) ); RoutingTableHandler handler = newRoutingTableHandler( routingTable, rediscovery, connectionPool ); @@ -158,7 +158,7 @@ void shouldRetainAllFetchedAddressesInConnectionPoolAfterFetchingOfRoutingTable( Rediscovery rediscovery = newRediscoveryMock(); when( rediscovery.lookupClusterComposition( any(), any(), any() ) ).thenReturn( completedFuture( - new ClusterComposition( 42, asOrderedSet( A, B ), asOrderedSet( B, C ), asOrderedSet( A, C ) ) ) ); + new ClusterCompositionLookupResult( new ClusterComposition( 42, asOrderedSet( A, B ), asOrderedSet( B, C ), asOrderedSet( A, C ) ) ) ) ); RoutingTableRegistry registry = new RoutingTableRegistry() { @@ -253,7 +253,7 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode ) when( routingTable.isStaleFor( mode ) ).thenReturn( true ); AddressSet addresses = new AddressSet(); - addresses.update( new HashSet<>( singletonList( LOCAL_DEFAULT ) ) ); + addresses.retainAllAndAdd( new HashSet<>( singletonList( LOCAL_DEFAULT ) ) ); when( routingTable.readers() ).thenReturn( addresses ); when( routingTable.writers() ).thenReturn( addresses ); when( routingTable.database() ).thenReturn( defaultDatabase() ); @@ -272,7 +272,7 @@ private static Rediscovery newRediscoveryMock() Set noServers = Collections.emptySet(); ClusterComposition clusterComposition = new ClusterComposition( 1, noServers, noServers, noServers ); when( rediscovery.lookupClusterComposition( any( RoutingTable.class ), any( ConnectionPool.class ), any( InternalBookmark.class ) ) ) - .thenReturn( completedFuture( clusterComposition ) ); + .thenReturn( completedFuture( new ClusterCompositionLookupResult( clusterComposition ) ) ); return rediscovery; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java index be9e1449e5..520c81946c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java @@ -414,7 +414,7 @@ private static LoadBalancer newLoadBalancer( ConnectionPool connectionPool, Rout when( routingTables.ensureRoutingTable( any( ConnectionContext.class ) ) ).thenReturn( CompletableFuture.completedFuture( handler ) ); Rediscovery rediscovery = mock( Rediscovery.class ); return new LoadBalancer( connectionPool, routingTables, rediscovery, new LeastConnectedLoadBalancingStrategy( connectionPool, DEV_NULL_LOGGING ), - GlobalEventExecutor.INSTANCE, DEV_NULL_LOGGER ); + GlobalEventExecutor.INSTANCE, DEV_NULL_LOGGER ); } private static LoadBalancer newLoadBalancer( ConnectionPool connectionPool, Rediscovery rediscovery ) @@ -428,6 +428,6 @@ private static LoadBalancer newLoadBalancer( ConnectionPool connectionPool, Rout { // Used only in testing return new LoadBalancer( connectionPool, routingTables, rediscovery, new LeastConnectedLoadBalancingStrategy( connectionPool, DEV_NULL_LOGGING ), - GlobalEventExecutor.INSTANCE, DEV_NULL_LOGGER ); + GlobalEventExecutor.INSTANCE, DEV_NULL_LOGGER ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java index 61172275c0..4f235b5535 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java @@ -48,6 +48,7 @@ import org.neo4j.driver.internal.async.pool.PoolSettings; import org.neo4j.driver.internal.async.pool.TestConnectionPool; import org.neo4j.driver.internal.cluster.ClusterComposition; +import org.neo4j.driver.internal.cluster.ClusterCompositionLookupResult; import org.neo4j.driver.internal.cluster.Rediscovery; import org.neo4j.driver.internal.cluster.RoutingTable; import org.neo4j.driver.internal.cluster.RoutingTableRegistry; @@ -326,30 +327,31 @@ private LoadBalancer newLoadBalancer( ConnectionPool connectionPool, RoutingTabl { Rediscovery rediscovery = mock( Rediscovery.class ); return new LoadBalancer( connectionPool, routingTables, rediscovery, new LeastConnectedLoadBalancingStrategy( connectionPool, logging ), - GlobalEventExecutor.INSTANCE, logging.getLog( "LB" ) ); + GlobalEventExecutor.INSTANCE, logging.getLog( "LB" ) ); } - private CompletableFuture clusterComposition( BoltServerAddress... addresses ) + private CompletableFuture clusterComposition( BoltServerAddress... addresses ) { return clusterComposition( Duration.ofSeconds( 30 ).toMillis(), addresses ); } - private CompletableFuture expiredClusterComposition( BoltServerAddress... addresses ) + private CompletableFuture expiredClusterComposition( BoltServerAddress... addresses ) { return clusterComposition( -STALE_ROUTING_TABLE_PURGE_DELAY_MS - 1, addresses ); } - private CompletableFuture clusterComposition( long expireAfterMs, BoltServerAddress... addresses ) + private CompletableFuture clusterComposition( long expireAfterMs, BoltServerAddress... addresses ) { HashSet servers = new HashSet<>( Arrays.asList( addresses ) ); ClusterComposition composition = new ClusterComposition( clock.millis() + expireAfterMs, servers, servers, servers ); - return CompletableFuture.completedFuture( composition ); + return CompletableFuture.completedFuture( new ClusterCompositionLookupResult( composition ) ); } private class RandomizedRediscovery implements Rediscovery { @Override - public CompletionStage lookupClusterComposition( RoutingTable routingTable, ConnectionPool connectionPool, Bookmark bookmark ) + public CompletionStage lookupClusterComposition( RoutingTable routingTable, ConnectionPool connectionPool, + Bookmark bookmark ) { // when looking up a new routing table, we return a valid random routing table back Set servers = new HashSet<>(); @@ -367,7 +369,7 @@ public CompletionStage lookupClusterComposition( RoutingTabl servers.add( A ); } ClusterComposition composition = new ClusterComposition( clock.millis() + 1, servers, servers, servers ); - return CompletableFuture.completedFuture( composition ); + return CompletableFuture.completedFuture( new ClusterCompositionLookupResult( composition ) ); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/BoltServerAddressTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/BoltServerAddressTest.java index 8b92004c29..5f3af8eede 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/BoltServerAddressTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/BoltServerAddressTest.java @@ -22,20 +22,11 @@ import java.net.SocketAddress; import java.net.URI; -import java.util.List; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.net.ServerAddress; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.endsWith; -import static org.hamcrest.Matchers.everyItem; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.junit.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; @@ -146,58 +137,10 @@ void shouldUseUriWithHostAndPort() assertEquals( 12345, address.port() ); } - @Test - void shouldResolveDNSToIPs() throws Exception - { - BoltServerAddress address = new BoltServerAddress( "google.com", 80 ); - List resolved = address.resolveAll(); - assertThat( resolved, hasSize( greaterThanOrEqualTo( 1 ) ) ); - assertThat( resolved, everyItem( equalTo( address ) ) ); - } - - @Test - void shouldResolveLocalhostIPDNSToIPs() throws Exception - { - BoltServerAddress address = new BoltServerAddress( "127.0.0.1", 80 ); - List resolved = address.resolveAll(); - assertThat( resolved, hasSize( 1 ) ); - assertThat( resolved, everyItem( equalTo( address ) ) ); - } - - @Test - void shouldResolveLocalhostDNSToIPs() throws Exception - { - BoltServerAddress address = new BoltServerAddress( "localhost", 80 ); - List resolved = address.resolveAll(); - assertThat( resolved, hasSize( greaterThanOrEqualTo( 1 ) ) ); - assertThat( resolved, everyItem( equalTo( address ) ) ); - } - - @Test - void shouldResolveIPv6LocalhostDNSToIPs() throws Exception - { - BoltServerAddress address = new BoltServerAddress( "[::1]", 80 ); - List resolved = address.resolveAll(); - assertThat( resolved, hasSize( greaterThanOrEqualTo( 1 ) ) ); - assertThat( resolved, everyItem( equalTo( address ) ) ); - } - @Test void shouldIncludeHostAndPortInToString() { BoltServerAddress address = new BoltServerAddress( "localhost", 8081 ); assertThat( address.toString(), equalTo( "localhost:8081" ) ); } - - @Test - void shouldIncludeHostResolvedIPAndPortInToStringWhenResolved() throws Exception - { - BoltServerAddress address = new BoltServerAddress( "localhost", 8081 ); - BoltServerAddress resolved = address.resolve(); - - assertThat( resolved.toString(), not( equalTo( "localhost:8081" ) ) ); - assertThat( resolved.toString(), anyOf( containsString( "(127.0.0.1)" ), containsString( "(::1)" ) ) ); - assertThat( resolved.toString(), startsWith( "localhost" ) ); - assertThat( resolved.toString(), endsWith( "8081" ) ); - } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/ClusterCompositionUtil.java b/driver/src/test/java/org/neo4j/driver/internal/util/ClusterCompositionUtil.java index 53804f82b1..86698f0bc6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/ClusterCompositionUtil.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/ClusterCompositionUtil.java @@ -29,16 +29,18 @@ public final class ClusterCompositionUtil { - private ClusterCompositionUtil() {} + private ClusterCompositionUtil() + { + } public static final long NEVER_EXPIRE = System.currentTimeMillis() + TimeUnit.HOURS.toMillis( 1 ); - public static final BoltServerAddress A = new BoltServerAddress( "1111:11" ); - public static final BoltServerAddress B = new BoltServerAddress( "2222:22" ); - public static final BoltServerAddress C = new BoltServerAddress( "3333:33" ); - public static final BoltServerAddress D = new BoltServerAddress( "4444:44" ); - public static final BoltServerAddress E = new BoltServerAddress( "5555:55" ); - public static final BoltServerAddress F = new BoltServerAddress( "6666:66" ); + public static final BoltServerAddress A = new BoltServerAddress( "192.168.100.100:11" ); + public static final BoltServerAddress B = new BoltServerAddress( "192.168.100.101:22" ); + public static final BoltServerAddress C = new BoltServerAddress( "192.168.100.102:33" ); + public static final BoltServerAddress D = new BoltServerAddress( "192.168.100.103:44" ); + public static final BoltServerAddress E = new BoltServerAddress( "192.168.100.104:55" ); + public static final BoltServerAddress F = new BoltServerAddress( "192.168.100.105:66" ); public static final List EMPTY = new ArrayList<>(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/MessageRecordingDriverFactory.java b/driver/src/test/java/org/neo4j/driver/internal/util/MessageRecordingDriverFactory.java index fc650673ca..9be58dea1b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/MessageRecordingDriverFactory.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/MessageRecordingDriverFactory.java @@ -28,7 +28,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import org.neo4j.driver.Config; +import org.neo4j.driver.Logging; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.DefaultDomainNameResolver; import org.neo4j.driver.internal.DriverFactory; import org.neo4j.driver.internal.async.connection.ChannelConnector; import org.neo4j.driver.internal.async.connection.ChannelConnectorImpl; @@ -39,8 +42,6 @@ import org.neo4j.driver.internal.messaging.Message; import org.neo4j.driver.internal.messaging.MessageFormat; import org.neo4j.driver.internal.security.SecurityPlan; -import org.neo4j.driver.Config; -import org.neo4j.driver.Logging; public class MessageRecordingDriverFactory extends DriverFactory { @@ -56,7 +57,8 @@ protected ChannelConnector createConnector( ConnectionSettings settings, Securit RoutingContext routingContext ) { ChannelPipelineBuilder pipelineBuilder = new MessageRecordingChannelPipelineBuilder(); - return new ChannelConnectorImpl( settings, securityPlan, pipelineBuilder, config.logging(), clock, routingContext ); + return new ChannelConnectorImpl( settings, securityPlan, pipelineBuilder, config.logging(), clock, routingContext, + DefaultDomainNameResolver.getInstance() ); } private class MessageRecordingChannelPipelineBuilder extends ChannelPipelineBuilderImpl diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/io/ChannelTrackingDriverFactoryWithFailingMessageFormat.java b/driver/src/test/java/org/neo4j/driver/internal/util/io/ChannelTrackingDriverFactoryWithFailingMessageFormat.java index e36720ffbd..7bc9faa59a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/io/ChannelTrackingDriverFactoryWithFailingMessageFormat.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/io/ChannelTrackingDriverFactoryWithFailingMessageFormat.java @@ -18,14 +18,15 @@ */ package org.neo4j.driver.internal.util.io; +import org.neo4j.driver.Config; import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.DefaultDomainNameResolver; import org.neo4j.driver.internal.async.connection.ChannelConnector; import org.neo4j.driver.internal.async.connection.ChannelConnectorImpl; import org.neo4j.driver.internal.cluster.RoutingContext; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.FailingMessageFormat; -import org.neo4j.driver.Config; public class ChannelTrackingDriverFactoryWithFailingMessageFormat extends ChannelTrackingDriverFactory { @@ -40,7 +41,8 @@ public ChannelTrackingDriverFactoryWithFailingMessageFormat( Clock clock ) protected ChannelConnector createRealConnector( ConnectionSettings settings, SecurityPlan securityPlan, Config config, Clock clock, RoutingContext routingContext ) { - return new ChannelConnectorImpl( settings, securityPlan, pipelineBuilder, config.logging(), clock, routingContext ); + return new ChannelConnectorImpl( settings, securityPlan, pipelineBuilder, config.logging(), clock, routingContext, + DefaultDomainNameResolver.getInstance() ); } public FailingMessageFormat getFailingMessageFormat() diff --git a/driver/src/test/java/org/neo4j/driver/util/cc/Cluster.java b/driver/src/test/java/org/neo4j/driver/util/cc/Cluster.java index 8f02f3207d..9fdd6cdbb6 100644 --- a/driver/src/test/java/org/neo4j/driver/util/cc/Cluster.java +++ b/driver/src/test/java/org/neo4j/driver/util/cc/Cluster.java @@ -19,6 +19,7 @@ package org.neo4j.driver.util.cc; import java.io.FileNotFoundException; +import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; import java.nio.file.Path; @@ -29,10 +30,10 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import org.neo4j.driver.Bookmark; import org.neo4j.driver.Driver; import org.neo4j.driver.Record; import org.neo4j.driver.internal.BoltServerAddress; -import org.neo4j.driver.Bookmark; import org.neo4j.driver.util.TestUtil; import org.neo4j.driver.util.cc.ClusterMemberRoleDiscoveryFactory.ClusterMemberRoleDiscovery; @@ -400,7 +401,7 @@ private static BoltServerAddress newBoltServerAddress( URI uri ) { try { - return new BoltServerAddress( uri ).resolve(); + return new BoltServerAddress( InetAddress.getByName( uri.getHost() ).getHostAddress(), uri.getPort() ); } catch ( UnknownHostException e ) { diff --git a/driver/src/test/java/org/neo4j/driver/util/cc/ClusterMember.java b/driver/src/test/java/org/neo4j/driver/util/cc/ClusterMember.java index e64b2d551f..68ec80aa1f 100644 --- a/driver/src/test/java/org/neo4j/driver/util/cc/ClusterMember.java +++ b/driver/src/test/java/org/neo4j/driver/util/cc/ClusterMember.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.FileNotFoundException; +import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; import java.nio.file.Path; @@ -111,7 +112,7 @@ private static BoltServerAddress newBoltServerAddress( URI uri ) { try { - return new BoltServerAddress( uri ).resolve(); + return new BoltServerAddress( InetAddress.getByName( uri.getHost() ).getHostAddress(), uri.getPort() ); } catch ( UnknownHostException e ) { diff --git a/driver/src/test/java/org/neo4j/driver/util/cc/ClusterMemberRoleDiscoveryFactory.java b/driver/src/test/java/org/neo4j/driver/util/cc/ClusterMemberRoleDiscoveryFactory.java index 6803df576a..0b48f2a299 100644 --- a/driver/src/test/java/org/neo4j/driver/util/cc/ClusterMemberRoleDiscoveryFactory.java +++ b/driver/src/test/java/org/neo4j/driver/util/cc/ClusterMemberRoleDiscoveryFactory.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.util.cc; +import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; import java.util.HashMap; @@ -27,14 +28,14 @@ import org.neo4j.driver.AccessMode; import org.neo4j.driver.Driver; import org.neo4j.driver.Record; -import org.neo4j.driver.Session; import org.neo4j.driver.Result; +import org.neo4j.driver.Session; import org.neo4j.driver.Values; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.util.ServerVersion; -import static org.neo4j.driver.Values.parameters; import static org.neo4j.driver.SessionConfig.builder; +import static org.neo4j.driver.Values.parameters; import static org.neo4j.driver.internal.util.Iterables.single; public class ClusterMemberRoleDiscoveryFactory @@ -145,7 +146,7 @@ private static BoltServerAddress newBoltServerAddress( URI uri ) { try { - return new BoltServerAddress( uri ).resolve(); + return new BoltServerAddress( InetAddress.getByName( uri.getHost() ).getHostAddress(), uri.getPort() ); } catch ( UnknownHostException e ) { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java index 1d966da508..a13bbb7460 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java @@ -21,6 +21,7 @@ import lombok.Getter; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.net.InetAddress; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -45,6 +46,7 @@ public class TestkitState private final Consumer responseWriter; private final Supplier processor; private final Map> idToServerAddresses = new HashMap<>(); + private final Map idToResolvedAddresses = new HashMap<>(); public TestkitState( Consumer responseWriter, Supplier processor ) { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java new file mode 100644 index 0000000000..b5605a64d4 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.messages.requests; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.messages.responses.TestkitResponse; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; + +@Setter +@Getter +@NoArgsConstructor +public class DomainNameResolutionCompleted implements TestkitRequest +{ + private DomainNameResolutionCompletedBody data; + + @Override + public TestkitResponse process( TestkitState testkitState ) + { + testkitState.getIdToResolvedAddresses().put( + data.getRequestId(), + data.getAddresses() + .stream() + .map( + addr -> + { + try + { + return InetAddress.getByName( addr ); + } + catch ( UnknownHostException e ) + { + throw new RuntimeException( e ); + } + } ) + .toArray( InetAddress[]::new ) ); + return null; + } + + @Setter + @Getter + @NoArgsConstructor + private static class DomainNameResolutionCompletedBody + { + private String requestId; + private List addresses; + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index c34bafc40d..258dfe2da3 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -20,19 +20,28 @@ import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.Setter; import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.messages.responses.DomainNameResolutionRequired; import neo4j.org.testkit.backend.messages.responses.Driver; import neo4j.org.testkit.backend.messages.responses.ResolverResolutionRequired; import neo4j.org.testkit.backend.messages.responses.TestkitErrorResponse; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.net.URI; import java.util.Optional; +import java.util.concurrent.TimeUnit; import org.neo4j.driver.AuthToken; import org.neo4j.driver.AuthTokens; import org.neo4j.driver.Config; -import org.neo4j.driver.GraphDatabase; +import org.neo4j.driver.internal.DefaultDomainNameResolver; +import org.neo4j.driver.internal.DomainNameResolver; +import org.neo4j.driver.internal.DriverFactory; +import org.neo4j.driver.internal.cluster.RoutingSettings; +import org.neo4j.driver.internal.retry.RetrySettings; +import org.neo4j.driver.internal.security.SecurityPlanImpl; import org.neo4j.driver.net.ServerAddressResolver; @Setter @@ -65,8 +74,14 @@ public TestkitResponse process( TestkitState testkitState ) { configBuilder.withResolver( callbackResolver( testkitState ) ); } + DomainNameResolver domainNameResolver = DefaultDomainNameResolver.getInstance(); + if ( data.isDomainNameResolverRegistered() ) + { + domainNameResolver = callbackDomainNameResolver( testkitState ); + } Optional.ofNullable( data.userAgent ).ifPresent( configBuilder::withUserAgent ); - testkitState.getDrivers().putIfAbsent( id, GraphDatabase.driver( data.uri, authToken, configBuilder.build() ) ); + Optional.ofNullable( data.connectionTimeoutMs ).ifPresent( timeout -> configBuilder.withConnectionTimeout( timeout, TimeUnit.MILLISECONDS ) ); + testkitState.getDrivers().putIfAbsent( id, driver( URI.create( data.uri ), authToken, configBuilder.build(), domainNameResolver ) ); return Driver.builder().data( Driver.DriverBody.builder().id( id ).build() ).build(); } @@ -90,6 +105,34 @@ private ServerAddressResolver callbackResolver( TestkitState testkitState ) }; } + private DomainNameResolver callbackDomainNameResolver( TestkitState testkitState ) + { + return address -> + { + String callbackId = testkitState.newId(); + DomainNameResolutionRequired.DomainNameResolutionRequiredBody body = + DomainNameResolutionRequired.DomainNameResolutionRequiredBody.builder() + .id( callbackId ) + .name( address ) + .build(); + DomainNameResolutionRequired response = + DomainNameResolutionRequired.builder() + .data( body ) + .build(); + testkitState.getResponseWriter().accept( response ); + testkitState.getProcessor().get(); + return testkitState.getIdToResolvedAddresses().remove( callbackId ); + }; + } + + private org.neo4j.driver.Driver driver( URI uri, AuthToken authToken, Config config, DomainNameResolver domainNameResolver ) + { + RoutingSettings routingSettings = RoutingSettings.DEFAULT; + RetrySettings retrySettings = RetrySettings.DEFAULT; + return new DriverFactoryWithDomainNameResolver( domainNameResolver ) + .newInstance( uri, authToken, routingSettings, retrySettings, config, SecurityPlanImpl.insecure() ); + } + @Setter @Getter @NoArgsConstructor @@ -99,5 +142,19 @@ public static class NewDriverBody private AuthorizationToken authorizationToken; private String userAgent; private boolean resolverRegistered; + private boolean domainNameResolverRegistered; + private Long connectionTimeoutMs; + } + + @RequiredArgsConstructor + private static class DriverFactoryWithDomainNameResolver extends DriverFactory + { + private final DomainNameResolver domainNameResolver; + + @Override + protected DomainNameResolver getDomainNameResolver() + { + return domainNameResolver; + } } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java index 7d74966fa5..4f7d053948 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java @@ -33,7 +33,8 @@ @JsonSubTypes.Type( TransactionRun.class ), @JsonSubTypes.Type( RetryablePositive.class ), @JsonSubTypes.Type( SessionBeginTransaction.class ), @JsonSubTypes.Type( TransactionCommit.class ), @JsonSubTypes.Type( SessionLastBookmarks.class ), @JsonSubTypes.Type( SessionWriteTransaction.class ), - @JsonSubTypes.Type( ResolverResolutionCompleted.class ), @JsonSubTypes.Type( CheckMultiDBSupport.class ) + @JsonSubTypes.Type( ResolverResolutionCompleted.class ), @JsonSubTypes.Type( CheckMultiDBSupport.class ), + @JsonSubTypes.Type( DomainNameResolutionCompleted.class ) } ) public interface TestkitRequest { diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/DomainNameResolutionRequired.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/DomainNameResolutionRequired.java new file mode 100644 index 0000000000..3f803cc021 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/DomainNameResolutionRequired.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.messages.responses; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +@Setter +@Getter +@Builder +public class DomainNameResolutionRequired implements TestkitResponse +{ + private DomainNameResolutionRequiredBody data; + + @Override + public String testkitName() + { + return "DomainNameResolutionRequired"; + } + + @Setter + @Getter + @Builder + public static class DomainNameResolutionRequiredBody + { + private String id; + + private String name; + } +}