Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that Mongo DNS lookup does not happen on the event loop #27896

Merged
merged 1 commit into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import io.quarkus.mongodb.runtime.dns.MongoDnsClientProvider;
import io.quarkus.runtime.metrics.MetricsFactory;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;
import io.quarkus.vertx.deployment.VertxBuildItem;

public class MongoClientProcessor {
private static final String MONGODB_TRACING_COMMANDLISTENER_CLASSNAME = "io.quarkus.mongodb.tracing.MongoTracingCommandListener";
Expand Down Expand Up @@ -281,7 +282,8 @@ void generateClientBeans(MongoClientRecorder recorder,
MongoClientBuildTimeConfig mongoClientBuildTimeConfig,
MongodbConfig mongodbConfig,
List<MongoUnremovableClientsBuildItem> mongoUnremovableClientsBuildItem,
BuildProducer<SyntheticBeanBuildItem> syntheticBeanBuildItemBuildProducer) {
BuildProducer<SyntheticBeanBuildItem> syntheticBeanBuildItemBuildProducer,
VertxBuildItem vertxBuildItem) {

boolean makeUnremovable = !mongoUnremovableClientsBuildItem.isEmpty();

Expand Down Expand Up @@ -328,6 +330,8 @@ void generateClientBeans(MongoClientRecorder recorder,
.produce(createReactiveSyntheticBean(recorder, mongodbConfig, makeUnremovable, mongoClientName.getName(),
mongoClientName.isAddQualifier()));
}

recorder.performInitialization(mongodbConfig, vertxBuildItem.getVertx());
}

private SyntheticBeanBuildItem createBlockingSyntheticBean(MongoClientRecorder recorder, MongodbConfig mongodbConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
import javax.enterprise.inject.literal.NamedLiteral;
import javax.enterprise.util.AnnotationLiteral;

import com.mongodb.ConnectionString;
import com.mongodb.client.MongoClient;
import com.mongodb.event.ConnectionPoolListener;

import io.quarkus.arc.Arc;
import io.quarkus.mongodb.metrics.MicrometerConnectionPoolListener;
import io.quarkus.mongodb.metrics.MongoMetricsConnectionPoolListener;
import io.quarkus.mongodb.reactive.ReactiveMongoClient;
import io.quarkus.mongodb.runtime.dns.MongoDnsClientProvider;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
import io.vertx.core.Vertx;

@Recorder
public class MongoClientRecorder {
Expand Down Expand Up @@ -104,4 +107,25 @@ public ConnectionPoolListener get() {
}
};
}

/**
* We need to perform some initialization work on the main thread to ensure that reactive operations (such as DNS
* resolution)
* don't end up being performed on the event loop
*/
public void performInitialization(MongodbConfig config, RuntimeValue<Vertx> vertx) {
MongoDnsClientProvider.vertx = vertx.getValue();
initializeDNSLookup(config.defaultMongoClientConfig);
for (MongoClientConfig mongoClientConfig : config.mongoClientConfigs.values()) {
initializeDNSLookup(mongoClientConfig);
}
}

private void initializeDNSLookup(MongoClientConfig mongoClientConfig) {
if (mongoClientConfig.connectionString.isEmpty()) {
return;
}
// this ensures that DNS resolution will take place if necessary
new ConnectionString(mongoClientConfig.connectionString.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -20,7 +23,6 @@
import com.mongodb.spi.dns.DnsClient;
import com.mongodb.spi.dns.DnsException;

import io.quarkus.arc.Arc;
import io.quarkus.mongodb.runtime.MongodbConfig;
import io.quarkus.runtime.annotations.RegisterForReflection;
import io.vertx.core.dns.DnsClientOptions;
Expand All @@ -46,8 +48,14 @@ public class MongoDnsClient implements DnsClient {

private final io.vertx.mutiny.core.dns.DnsClient dnsClient;

MongoDnsClient() {
Vertx vertx = Arc.container().instance(Vertx.class).get();
// the static fields are used in order to hold DNS resolution result that has been performed on the main thread
// at application startup
// the reason we need this is to ensure that no blocking of event loop threads will occur due to DNS resolution
private static final Map<String, List<SrvRecord>> SRV_CACHE = new ConcurrentHashMap<>();
private static final Map<String, List<String>> TXT_CACHE = new ConcurrentHashMap<>();

MongoDnsClient(io.vertx.core.Vertx vertx) {
Vertx mutinyVertx = new io.vertx.mutiny.core.Vertx(vertx);

boolean activity = config.getOptionalValue(DNS_LOG_ACTIVITY, Boolean.class).orElse(false);

Expand All @@ -69,7 +77,7 @@ public class MongoDnsClient implements DnsClient {
.setHost(server)
.setPort(port);
}
dnsClient = vertx.createDnsClient(dnsClientOptions);
dnsClient = mutinyVertx.createDnsClient(dnsClientOptions);
}

private static List<String> nameServers() {
Expand Down Expand Up @@ -118,7 +126,17 @@ private List<String> resolveSrvRequest(final String srvHost) {
.orElse(Duration.ofSeconds(5));

try {
List<SrvRecord> srvRecords = dnsClient.resolveSRV(srvHost).await().atMost(timeout);
List<SrvRecord> srvRecords;
if (SRV_CACHE.containsKey(srvHost)) {
srvRecords = SRV_CACHE.get(srvHost);
} else {
srvRecords = dnsClient.resolveSRV(srvHost).invoke(new Consumer<>() {
@Override
public void accept(List<SrvRecord> srvRecords) {
SRV_CACHE.put(srvHost, srvRecords);
}
}).await().atMost(timeout);
}

if (srvRecords.isEmpty()) {
throw new MongoConfigurationException("No SRV records available for host " + srvHost);
Expand All @@ -143,11 +161,18 @@ private List<String> resolveSrvRequest(final String srvHost) {
* Here we concatenate TXT records together with a '&' separator as required by connection strings
*/
public List<String> resolveTxtRequest(final String host) {
if (TXT_CACHE.containsKey(host)) {
return TXT_CACHE.get(host);
}
try {
Duration timeout = config.getOptionalValue(DNS_LOOKUP_TIMEOUT, Duration.class)
.orElse(Duration.ofSeconds(5));

return dnsClient.resolveTXT(host).await().atMost(timeout);
return dnsClient.resolveTXT(host).invoke(new Consumer<>() {
@Override
public void accept(List<String> strings) {
TXT_CACHE.put(host, strings);
}
}).await().atMost(timeout);
} catch (Throwable e) {
throw new MongoConfigurationException("Unable to look up TXT record for host " + host, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
import com.mongodb.spi.dns.DnsClientProvider;

import io.quarkus.runtime.annotations.RegisterForReflection;
import io.vertx.core.Vertx;

@RegisterForReflection
public class MongoDnsClientProvider implements DnsClientProvider {

public static volatile Vertx vertx;

@Override
public DnsClient create() {
return new MongoDnsClient();
return new MongoDnsClient(vertx);
}
}