Skip to content

Commit

Permalink
fix: Reuse KsqlClient instance for inter node requests (#5742) (#5844)
Browse files Browse the repository at this point in the history
* Reuse ksqlClient between requests
  • Loading branch information
AlanConfluent authored Jul 17, 2020
1 parent b3db23c commit 7645abd
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class DefaultServiceContext implements ServiceContext {

private final KafkaClientSupplier kafkaClientSupplier;
private final MemoizedSupplier<Admin> adminClientSupplier;
private final MemoizedSupplier<KafkaTopicClient> topicClientSupplier;
private final MemoizedSupplier<KafkaTopicClient> topicClientSupplier;
private final Supplier<SchemaRegistryClient> srClientFactorySupplier;
private final MemoizedSupplier<SchemaRegistryClient> srClient;
private final MemoizedSupplier<ConnectClient> connectClientSupplier;
Expand Down Expand Up @@ -148,7 +148,6 @@ public void close() {
}
}


static final class MemoizedSupplier<T> implements Supplier<T> {

private final Supplier<T> supplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static ServiceContext create(
ksqlConfig,
Collections.emptyMap())::get,
() -> new DefaultConnectClient(ksqlConfig.getString(KsqlConfig.CONNECT_URL_PROPERTY),
Optional.empty()),
Optional.empty()),
ksqlClientSupplier
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.api.auth.ApiSecurityContext;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory.DefaultServiceContextFactory;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory.UserServiceContextFactory;
import io.confluent.ksql.security.KsqlSecurityContext;
Expand All @@ -33,18 +34,21 @@ public class DefaultKsqlSecurityContextProvider implements KsqlSecurityContextPr
private final UserServiceContextFactory userServiceContextFactory;
private final KsqlConfig ksqlConfig;
private final Supplier<SchemaRegistryClient> schemaRegistryClientFactory;
private final KsqlClient sharedClient;

public DefaultKsqlSecurityContextProvider(
final KsqlSecurityExtension securityExtension,
final DefaultServiceContextFactory defaultServiceContextFactory,
final UserServiceContextFactory userServiceContextFactory,
final KsqlConfig ksqlConfig,
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory) {
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory,
final KsqlClient sharedClient) {
this.securityExtension = securityExtension;
this.defaultServiceContextFactory = defaultServiceContextFactory;
this.userServiceContextFactory = userServiceContextFactory;
this.ksqlConfig = ksqlConfig;
this.schemaRegistryClientFactory = schemaRegistryClientFactory;
this.sharedClient = sharedClient;
}

@Override
Expand All @@ -56,7 +60,8 @@ public KsqlSecurityContext provide(final ApiSecurityContext apiSecurityContext)
if (securityExtension == null || !securityExtension.getUserContextProvider().isPresent()) {
return new KsqlSecurityContext(
principal,
defaultServiceContextFactory.create(ksqlConfig, authHeader, schemaRegistryClientFactory)
defaultServiceContextFactory.create(ksqlConfig, authHeader, schemaRegistryClientFactory,
sharedClient)
);
}

Expand All @@ -67,7 +72,8 @@ public KsqlSecurityContext provide(final ApiSecurityContext apiSecurityContext)
ksqlConfig,
authHeader,
provider.getKafkaClientSupplier(principal.orElse(null)),
provider.getSchemaRegistryClientFactory(principal.orElse(null)))))
provider.getSchemaRegistryClientFactory(principal.orElse(null)),
sharedClient)))
.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.ErrorMessages;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
Expand All @@ -72,6 +73,7 @@
import io.confluent.ksql.rest.server.resources.StatusResource;
import io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource;
import io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint;
import io.confluent.ksql.rest.server.services.InternalKsqlClientFactory;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory;
import io.confluent.ksql.rest.server.services.ServerInternalKsqlClient;
import io.confluent.ksql.rest.server.state.ServerState;
Expand Down Expand Up @@ -102,6 +104,7 @@
import io.confluent.ksql.version.metrics.collector.KsqlModuleType;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.dropwizard.DropwizardMetricsOptions;
import io.vertx.ext.dropwizard.Match;
import java.io.Console;
Expand All @@ -113,6 +116,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -211,7 +215,8 @@ public static SourceName getCommandsStreamName() {
final Consumer<KsqlConfig> rocksDBConfigSetterHandler,
final PullQueryExecutor pullQueryExecutor,
final Optional<HeartbeatAgent> heartbeatAgent,
final Optional<LagReportingAgent> lagReportingAgent
final Optional<LagReportingAgent> lagReportingAgent,
final Vertx vertx
) {
log.debug("Creating instance of ksqlDB API server");
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
Expand All @@ -237,12 +242,7 @@ public static SourceName getCommandsStreamName() {
this.pullQueryExecutor = requireNonNull(pullQueryExecutor, "pullQueryExecutor");
this.heartbeatAgent = requireNonNull(heartbeatAgent, "heartbeatAgent");
this.lagReportingAgent = requireNonNull(lagReportingAgent, "lagReportingAgent");
this.vertx = Vertx.vertx(
new VertxOptions()
.setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS)
.setMaxWorkerExecuteTime(Long.MAX_VALUE)
.setMetricsOptions(setUpHttpMetrics(ksqlConfig)));
this.vertx.exceptionHandler(t -> log.error("Unhandled exception in Vert.x", t));
this.vertx = requireNonNull(vertx, "vertx");

this.serverInfoResource = new ServerInfoResource(serviceContext, ksqlConfigNoPort);
if (heartbeatAgent.isPresent()) {
Expand Down Expand Up @@ -552,12 +552,23 @@ Optional<URL> getInternalListener() {
public static KsqlRestApplication buildApplication(final KsqlRestConfig restConfig) {
final Map<String, Object> updatedRestProps = restConfig.getOriginals();
final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());
final Vertx vertx = Vertx.vertx(
new VertxOptions()
.setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS)
.setMaxWorkerExecuteTime(Long.MAX_VALUE)
.setMetricsOptions(setUpHttpMetrics(ksqlConfig)));
vertx.exceptionHandler(t -> log.error("Unhandled exception in Vert.x", t));
final KsqlClient sharedClient = InternalKsqlClientFactory.createInternalClient(
toClientProps(ksqlConfig.originals()),
SocketAddress::inetSocketAddress,
vertx
);
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory =
new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get;

final ServiceContext tempServiceContext = new LazyServiceContext(() ->
RestServiceContextFactory.create(ksqlConfig, Optional.empty(),
schemaRegistryClientFactory));
schemaRegistryClientFactory, sharedClient));
final String kafkaClusterId = KafkaClusterUtil.getKafkaClusterId(tempServiceContext);
final String ksqlServerId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
updatedRestProps.putAll(
Expand All @@ -568,15 +579,17 @@ public static KsqlRestApplication buildApplication(final KsqlRestConfig restConf
RestServiceContextFactory.create(
new KsqlConfig(updatedRestConfig.getKsqlConfigProperties()),
Optional.empty(),
schemaRegistryClientFactory));
schemaRegistryClientFactory, sharedClient));

return buildApplication(
"",
updatedRestConfig,
KsqlVersionCheckerAgent::new,
Integer.MAX_VALUE,
serviceContext,
schemaRegistryClientFactory
schemaRegistryClientFactory,
vertx,
sharedClient
);
}

Expand All @@ -587,7 +600,9 @@ static KsqlRestApplication buildApplication(
final Function<Supplier<Boolean>, VersionCheckerAgent> versionCheckerFactory,
final int maxStatementRetries,
final ServiceContext serviceContext,
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory) {
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory,
final Vertx vertx,
final KsqlClient sharedClient) {
final String ksqlInstallDir = restConfig.getString(KsqlRestConfig.INSTALL_DIR_CONFIG);

final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());
Expand Down Expand Up @@ -644,7 +659,8 @@ static KsqlRestApplication buildApplication(
new DefaultKsqlSecurityContextProvider(
securityExtension,
RestServiceContextFactory::create,
RestServiceContextFactory::create, ksqlConfig, schemaRegistryClientFactory);
RestServiceContextFactory::create, ksqlConfig, schemaRegistryClientFactory,
sharedClient);

final Optional<AuthenticationPlugin> securityHandlerPlugin = loadAuthenticationPlugin(
restConfig);
Expand Down Expand Up @@ -741,8 +757,8 @@ static KsqlRestApplication buildApplication(
rocksDBConfigSetterHandler,
pullQueryExecutor,
heartbeatAgent,
lagReportingAgent
);
lagReportingAgent,
vertx);
}

private static Optional<HeartbeatAgent> initializeHeartbeatAgent(
Expand Down Expand Up @@ -1010,4 +1026,13 @@ private static KsqlRestConfig injectPathsWithoutAuthentication(final KsqlRestCon
return new KsqlRestConfig(restConfigs);
}

@VisibleForTesting
static Map<String, String> toClientProps(final Map<String, Object> config) {
final Map<String, String> clientProps = new HashMap<>();
for (Map.Entry<String, Object> entry : config.entrySet()) {
clientProps.put(entry.getKey(), entry.getValue().toString());
}
return clientProps;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@

package io.confluent.ksql.rest.server.services;

import static io.confluent.ksql.rest.server.services.InternalKsqlClientFactory.createInternalClient;
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.properties.LocalProperties;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.client.KsqlTarget;
import io.confluent.ksql.rest.client.RestResponse;
Expand All @@ -30,12 +29,14 @@
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.services.SimpleKsqlClient;
import io.confluent.ksql.util.KsqlHostInfo;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.Vertx;
import io.vertx.core.net.SocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,26 +46,33 @@ final class DefaultKsqlClient implements SimpleKsqlClient {

private final Optional<String> authHeader;
private final KsqlClient sharedClient;
private final boolean ownSharedClient;

DefaultKsqlClient(final Optional<String> authHeader, final Map<String, Object> clientProps) {
@VisibleForTesting
DefaultKsqlClient(final Optional<String> authHeader, final Map<String, Object> clientProps,
final BiFunction<Integer, String, SocketAddress> socketAddressFactory) {
this(
authHeader,
new KsqlClient(
toClientProps(clientProps),
Optional.empty(),
new LocalProperties(ImmutableMap.of()),
createClientOptions()
)
createInternalClient(toClientProps(clientProps), socketAddressFactory, Vertx.vertx()),
true
);
}

@VisibleForTesting
DefaultKsqlClient(
final Optional<String> authHeader,
final KsqlClient sharedClient
) {
this(authHeader, sharedClient, false);
}

DefaultKsqlClient(
final Optional<String> authHeader,
final KsqlClient sharedClient,
final boolean ownSharedClient
) {
this.authHeader = requireNonNull(authHeader, "authHeader");
this.sharedClient = requireNonNull(sharedClient, "sharedClient");
this.ownSharedClient = ownSharedClient;
}

@Override
Expand Down Expand Up @@ -143,7 +151,9 @@ public void makeAsyncLagReportRequest(

@Override
public void close() {
sharedClient.close();
if (ownSharedClient) {
sharedClient.close();
}
}

private KsqlTarget getTarget(final KsqlTarget target, final Optional<String> authHeader) {
Expand All @@ -152,17 +162,11 @@ private KsqlTarget getTarget(final KsqlTarget target, final Optional<String> aut
.orElse(target);
}

private static HttpClientOptions createClientOptions() {
return new HttpClientOptions().setMaxPoolSize(100);
}

private static Map<String, String> toClientProps(final Map<String, Object> config) {
final Map<String, String> clientProps = new HashMap<>();
for (Map.Entry<String, Object> entry : config.entrySet()) {
clientProps.put(entry.getKey(), entry.getValue().toString());
}
return clientProps;
}


}
Loading

0 comments on commit 7645abd

Please sign in to comment.