Skip to content

Commit

Permalink
perf: Improves pull query performance by making the default schema se…
Browse files Browse the repository at this point in the history
…rvice a singleton (#4216)

* perf: Improves pull query performance by making the schema registry client a singleton.
  • Loading branch information
AlanConfluent authored Jan 9, 2020
1 parent de906c3 commit f991752
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.schema.registry;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
Expand Down Expand Up @@ -43,14 +44,21 @@ CachedSchemaRegistryClient create(RestService service,
Map<String, String> httpHeaders);
}

public KsqlSchemaRegistryClientFactory(
final KsqlConfig config,
final Map<String, String> schemaRegistryHttpHeaders
) {
this(config, newSchemaRegistrySslFactory(config), schemaRegistryHttpHeaders);
}

public KsqlSchemaRegistryClientFactory(
final KsqlConfig config,
final SslFactory sslFactory,
final Map<String, String> schemaRegistryHttpHeaders
) {
this(config,
() -> new RestService(config.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY)),
new SslFactory(Mode.CLIENT),
sslFactory,
CachedSchemaRegistryClient::new,
schemaRegistryHttpHeaders
);
Expand All @@ -59,6 +67,7 @@ public KsqlSchemaRegistryClientFactory(
config.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY);
}

@VisibleForTesting
KsqlSchemaRegistryClientFactory(final KsqlConfig config,
final Supplier<RestService> serviceSupplier,
final SslFactory sslFactory,
Expand All @@ -69,13 +78,25 @@ public KsqlSchemaRegistryClientFactory(
this.schemaRegistryClientConfigs = config.originalsWithPrefix(
KsqlConfig.KSQL_SCHEMA_REGISTRY_PREFIX);

this.sslFactory
.configure(config.valuesWithPrefixOverride(KsqlConfig.KSQL_SCHEMA_REGISTRY_PREFIX));

this.schemaRegistryClientFactory = schemaRegistryClientFactory;
this.httpHeaders = httpHeaders;
}

/**
* Creates an SslFactory configured to be used with the KsqlSchemaRegistryClient.
*/
public static SslFactory newSchemaRegistrySslFactory(final KsqlConfig config) {
final SslFactory sslFactory = new SslFactory(Mode.CLIENT);
configureSslFactory(config, sslFactory);
return sslFactory;
}

@VisibleForTesting
static void configureSslFactory(final KsqlConfig config, final SslFactory sslFactory) {
sslFactory
.configure(config.valuesWithPrefixOverride(KsqlConfig.KSQL_SCHEMA_REGISTRY_PREFIX));
}

public SchemaRegistryClient get() {
final RestService restService = serviceSupplier.get();
final SSLContext sslContext = sslFactory.sslEngineBuilder().sslContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,10 @@ public void shouldSetSocketFactoryWhenNoSpecificSslConfig() {
final Map<String, Object> expectedConfigs = defaultConfigs();

// When:
final SchemaRegistryClient client =
new KsqlSchemaRegistryClientFactory(config, restServiceSupplier, sslFactory,
srClientFactory, Collections.emptyMap()).get();
KsqlSchemaRegistryClientFactory.configureSslFactory(config, sslFactory);

// Then:
assertThat(client, is(notNullValue()));
verify(sslFactory).configure(expectedConfigs);
verify(restService).setSslSocketFactory(isA(SSL_CONTEXT.getSocketFactory().getClass()));
}

@Test
Expand All @@ -109,14 +105,10 @@ public void shouldPickUpNonPrefixedSslConfig() {
expectedConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "SSLv3");

// When:
final SchemaRegistryClient client =
new KsqlSchemaRegistryClientFactory(config, restServiceSupplier, sslFactory,
srClientFactory, Collections.emptyMap()).get();
KsqlSchemaRegistryClientFactory.configureSslFactory(config, sslFactory);

// Then:
assertThat(client, is(notNullValue()));
verify(sslFactory).configure(expectedConfigs);
verify(restService).setSslSocketFactory(isA(SSL_CONTEXT.getSocketFactory().getClass()));
}

@Test
Expand All @@ -130,15 +122,11 @@ public void shouldPickUpPrefixedSslConfig() {
expectedConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "SSLv3");

// When:
final SchemaRegistryClient client =
new KsqlSchemaRegistryClientFactory(config, restServiceSupplier, sslFactory,
srClientFactory, Collections.emptyMap()).get();
KsqlSchemaRegistryClientFactory.configureSslFactory(config, sslFactory);


// Then:
assertThat(client, is(notNullValue()));
verify(sslFactory).configure(expectedConfigs);
verify(restService).setSslSocketFactory(isA(SSL_CONTEXT.getSocketFactory().getClass()));
}

@Test
Expand All @@ -160,6 +148,7 @@ public void shouldPassBasicAuthCredentialsToSchemaRegistryClient() {
config, restServiceSupplier, sslFactory, srClientFactory, Collections.emptyMap()).get();

// Then:
verify(restService).setSslSocketFactory(isA(SSL_CONTEXT.getSocketFactory().getClass()));
srClientFactory.create(same(restService), anyInt(), eq(expectedConfigs), any());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.function.InternalFunctionRegistry;
Expand Down Expand Up @@ -69,6 +70,7 @@
import io.confluent.ksql.rest.util.KsqlUncaughtExceptionHandler;
import io.confluent.ksql.rest.util.ProcessingLogServerUtils;
import io.confluent.ksql.rest.util.RocksDBConfigSetterHandler;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlAuthorizationValidatorFactory;
import io.confluent.ksql.security.KsqlDefaultSecurityExtension;
Expand Down Expand Up @@ -96,6 +98,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -444,7 +447,8 @@ public <T> T getEndpointInstance(final Class<T> endpointClass) {
authorizationValidator,
errorHandler,
securityExtension,
serverState
serverState,
serviceContext.getSchemaRegistryClientFactory()
);
}
})
Expand All @@ -460,16 +464,20 @@ static KsqlRestApplication buildApplication(
final Function<Supplier<Boolean>, VersionCheckerAgent> versionCheckerFactory
) {
final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory =
new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get;
final ServiceContext serviceContext = new LazyServiceContext(() ->
RestServiceContextFactory.create(ksqlConfig, Optional.empty()));
RestServiceContextFactory.create(ksqlConfig, Optional.empty(),
schemaRegistryClientFactory));

return buildApplication(
"",
restConfig,
versionCheckerFactory,
Integer.MAX_VALUE,
serviceContext,
KsqlSecurityContextBinder::new);
(config, securityExtension) ->
new KsqlSecurityContextBinder(config, securityExtension, schemaRegistryClientFactory));
}

static KsqlRestApplication buildApplication(
Expand All @@ -478,8 +486,7 @@ static KsqlRestApplication buildApplication(
final Function<Supplier<Boolean>, VersionCheckerAgent> versionCheckerFactory,
final int maxStatementRetries,
final ServiceContext serviceContext,
final BiFunction<KsqlConfig, KsqlSecurityExtension, Binder> serviceContextBinderFactory
) {
final BiFunction<KsqlConfig, KsqlSecurityExtension, Binder> serviceContextBinderFactory) {
final String ksqlInstallDir = restConfig.getString(KsqlRestConfig.INSTALL_DIR_CONFIG);

final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

package io.confluent.ksql.rest.server.context;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.security.KsqlSecurityExtension;
import io.confluent.ksql.util.KsqlConfig;
import java.util.function.Supplier;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.process.internal.RequestScoped;

Expand All @@ -31,9 +33,11 @@
public class KsqlSecurityContextBinder extends AbstractBinder {
public KsqlSecurityContextBinder(
final KsqlConfig ksqlConfig,
final KsqlSecurityExtension securityExtension
final KsqlSecurityExtension securityExtension,
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory
) {
KsqlSecurityContextBinderFactory.configure(ksqlConfig, securityExtension);
KsqlSecurityContextBinderFactory.configure(ksqlConfig, securityExtension,
schemaRegistryClientFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory.DefaultServiceContextFactory;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory.UserServiceContextFactory;
Expand All @@ -26,6 +27,7 @@
import io.confluent.ksql.util.KsqlConfig;
import java.security.Principal;
import java.util.Optional;
import java.util.function.Supplier;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.HttpHeaders;
Expand All @@ -39,14 +41,18 @@
public class KsqlSecurityContextBinderFactory implements Factory<KsqlSecurityContext> {
private static KsqlConfig ksqlConfig;
private static KsqlSecurityExtension securityExtension;
private static Supplier<SchemaRegistryClient> schemaRegistryClientFactory;

public static void configure(
final KsqlConfig ksqlConfig,
final KsqlSecurityExtension securityExtension
final KsqlSecurityExtension securityExtension,
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory
) {
KsqlSecurityContextBinderFactory.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig");
KsqlSecurityContextBinderFactory.securityExtension
= requireNonNull(securityExtension, "securityExtension");
KsqlSecurityContextBinderFactory.schemaRegistryClientFactory
= requireNonNull(schemaRegistryClientFactory, "schemaRegistryClientFactory");
}

private final SecurityContext securityContext;
Expand Down Expand Up @@ -91,7 +97,7 @@ public KsqlSecurityContext provide() {
if (!securityExtension.getUserContextProvider().isPresent()) {
return new KsqlSecurityContext(
Optional.ofNullable(principal),
defaultServiceContextFactory.create(ksqlConfig, authHeader)
defaultServiceContextFactory.create(ksqlConfig, authHeader, schemaRegistryClientFactory)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.PrintTopic;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;
import javax.websocket.EndpointConfig;
Expand Down Expand Up @@ -97,6 +99,7 @@ public class WSQueryEndpoint {
private final DefaultServiceContextFactory defaultServiceContextFactory;
private final ServerState serverState;
private final Errors errorHandler;
private final Supplier<SchemaRegistryClient> schemaRegistryClientFactory;

private WebSocketSubscriber<?> subscriber;
private KsqlSecurityContext securityContext;
Expand All @@ -115,7 +118,8 @@ public WSQueryEndpoint(
final Optional<KsqlAuthorizationValidator> authorizationValidator,
final Errors errorHandler,
final KsqlSecurityExtension securityExtension,
final ServerState serverState
final ServerState serverState,
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory
) {
this(ksqlConfig,
mapper,
Expand All @@ -133,7 +137,8 @@ public WSQueryEndpoint(
securityExtension,
RestServiceContextFactory::create,
RestServiceContextFactory::create,
serverState);
serverState,
schemaRegistryClientFactory);
}

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
Expand All @@ -155,7 +160,8 @@ public WSQueryEndpoint(
final KsqlSecurityExtension securityExtension,
final UserServiceContextFactory serviceContextFactory,
final DefaultServiceContextFactory defaultServiceContextFactory,
final ServerState serverState
final ServerState serverState,
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory
) {
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig");
this.mapper = Objects.requireNonNull(mapper, "mapper");
Expand All @@ -179,7 +185,9 @@ public WSQueryEndpoint(
this.defaultServiceContextFactory =
Objects.requireNonNull(defaultServiceContextFactory, "defaultServiceContextFactory");
this.serverState = Objects.requireNonNull(serverState, "serverState");
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");;
this.errorHandler = Objects.requireNonNull(errorHandler, "errorHandler");
this.schemaRegistryClientFactory =
Objects.requireNonNull(schemaRegistryClientFactory, "schemaRegistryClientFactory");
}

@SuppressWarnings("unused")
Expand Down Expand Up @@ -288,7 +296,8 @@ private KsqlSecurityContext createSecurityContext(final Principal principal) {
final ServiceContext serviceContext;

if (!securityExtension.getUserContextProvider().isPresent()) {
serviceContext = defaultServiceContextFactory.create(ksqlConfig, Optional.empty());
serviceContext = defaultServiceContextFactory.create(ksqlConfig, Optional.empty(),
schemaRegistryClientFactory);
} else {
// Creates a ServiceContext using the user's credentials, so the WS query topics are
// accessed with the user permission context (defaults to KSQL service context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
package io.confluent.ksql.rest.server.services;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.services.DefaultConnectClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.services.ServiceContextFactory;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.streams.KafkaClientSupplier;
Expand All @@ -36,7 +34,8 @@ public interface DefaultServiceContextFactory {

ServiceContext create(
KsqlConfig config,
Optional<String> authHeader
Optional<String> authHeader,
Supplier<SchemaRegistryClient> srClientFactory
);
}

Expand All @@ -52,13 +51,14 @@ ServiceContext create(

public static ServiceContext create(
final KsqlConfig ksqlConfig,
final Optional<String> authHeader
final Optional<String> authHeader,
final Supplier<SchemaRegistryClient> schemaRegistryClientFactory
) {
return create(
ksqlConfig,
authHeader,
new DefaultKafkaClientSupplier(),
new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap())::get
schemaRegistryClientFactory
);
}

Expand Down
Loading

0 comments on commit f991752

Please sign in to comment.