From c360c9c389bff435c5727b2a1f0373f112cb0f68 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg <57688982+AlanConfluent@users.noreply.github.com> Date: Tue, 26 Jan 2021 17:05:26 -0800 Subject: [PATCH] fix: Fixes auth for forwarded requests for pull queries (#6895) --- .../confluent/ksql/engine/EngineExecutor.java | 1 + .../ksql/physical/pull/HARouting.java | 7 +- .../ksql/physical/pull/HARoutingTest.java | 15 +- .../ksql/rest/server/KsqlRestApplication.java | 2 +- .../integration/HighAvailabilityTestUtil.java | 33 ++++- .../PullQueryRoutingFunctionalTest.java | 132 +++++++++++++----- .../ksql/rest/server/TestKsqlRestApp.java | 15 +- 7 files changed, 154 insertions(+), 51 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index 057ce9c022ca..65383a423f75 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -169,6 +169,7 @@ PullQueryResult executePullQuery( analysis ); return routing.handlePullQuery( + serviceContext, physicalPlan, statement, routingOptions, physicalPlan.getOutputSchema(), physicalPlan.getQueryId()); } catch (final Exception e) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java index 8448c84c8009..22cb6303a946 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/physical/pull/HARouting.java @@ -60,17 +60,15 @@ public final class HARouting implements AutoCloseable { private final ExecutorService executorService; private final RoutingFilterFactory routingFilterFactory; - private final ServiceContext serviceContext; private final Optional pullQueryMetrics; private final RouteQuery routeQuery; public HARouting( final RoutingFilterFactory routingFilterFactory, - final ServiceContext serviceContext, final Optional pullQueryMetrics, final KsqlConfig ksqlConfig ) { - this(routingFilterFactory, serviceContext, pullQueryMetrics, ksqlConfig, + this(routingFilterFactory, pullQueryMetrics, ksqlConfig, HARouting::executeOrRouteQuery); } @@ -78,14 +76,12 @@ public HARouting( @VisibleForTesting HARouting( final RoutingFilterFactory routingFilterFactory, - final ServiceContext serviceContext, final Optional pullQueryMetrics, final KsqlConfig ksqlConfig, final RouteQuery routeQuery ) { this.routingFilterFactory = Objects.requireNonNull(routingFilterFactory, "routingFilterFactory"); - this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); this.executorService = Executors.newFixedThreadPool( ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG), new ThreadFactoryBuilder().setNameFormat("pull-query-executor-%d").build()); @@ -99,6 +95,7 @@ public void close() { } public PullQueryResult handlePullQuery( + final ServiceContext serviceContext, final PullPhysicalPlan pullPhysicalPlan, final ConfiguredStatement statement, final RoutingOptions routingOptions, diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java index 6c592220325a..2bd061037fc3 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/physical/pull/HARoutingTest.java @@ -103,8 +103,7 @@ public void setUp() { when(location4.getNodes()).thenReturn(ImmutableList.of(node2, node1)); when(ksqlConfig.getInt(KsqlConfig.KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG)).thenReturn(1); haRouting = new HARouting( - routingFilterFactory, serviceContext, Optional.empty(), ksqlConfig, routeQuery); - + routingFilterFactory, Optional.empty(), ksqlConfig, routeQuery); } @After @@ -142,7 +141,8 @@ public void shouldCallRouteQuery_success() throws InterruptedException { }); // When: - PullQueryResult result = haRouting.handlePullQuery(pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId); + PullQueryResult result = haRouting.handlePullQuery(serviceContext, pullPhysicalPlan, statement, + routingOptions, logicalSchema, queryId); // Then: verify(routeQuery).routeQuery(eq(node1), any(), any(), any(), any(), any(), any(), any(), any()); @@ -192,7 +192,8 @@ public Object answer(InvocationOnMock invocation) { }); // When: - PullQueryResult result = haRouting.handlePullQuery(pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId); + PullQueryResult result = haRouting.handlePullQuery(serviceContext, pullPhysicalPlan, statement, + routingOptions, logicalSchema, queryId); // Then: verify(routeQuery).routeQuery(eq(node1), any(), any(), any(), any(), any(), any(), any(), any()); @@ -244,7 +245,8 @@ public Object answer(InvocationOnMock invocation) { // When: final Exception e = assertThrows( MaterializationException.class, - () -> haRouting.handlePullQuery(pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId) + () -> haRouting.handlePullQuery(serviceContext, pullPhysicalPlan, statement, routingOptions, + logicalSchema, queryId) ); // Then: @@ -277,7 +279,8 @@ public void shouldCallRouteQuery_allFiltered() { // When: final Exception e = assertThrows( MaterializationException.class, - () -> haRouting.handlePullQuery(pullPhysicalPlan, statement, routingOptions, logicalSchema, queryId) + () -> haRouting.handlePullQuery(serviceContext, pullPhysicalPlan, statement, routingOptions, + logicalSchema, queryId) ); // Then: diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 5cce27b4cbe8..e4c939e7e4d5 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -753,7 +753,7 @@ static KsqlRestApplication buildApplication( final HARouting pullQueryRouting = new HARouting( - routingFilterFactory, serviceContext, pullQueryMetrics, ksqlConfig); + routingFilterFactory, pullQueryMetrics, ksqlConfig); final Optional localCommands = createLocalCommands(restConfig, ksqlEngine); diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java index e2bb1f58b5ef..c9f011d284b4 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/HighAvailabilityTestUtil.java @@ -139,9 +139,17 @@ static void waitForClusterToBeDiscovered( static void waitForStreamsMetadataToInitialize( final TestKsqlRestApp restApp, List hosts, String queryId + ) { + waitForStreamsMetadataToInitialize(restApp, hosts, queryId, Optional.empty()); + } + + static void waitForStreamsMetadataToInitialize( + final TestKsqlRestApp restApp, List hosts, String queryId, + final Optional credentials ) { while (true) { - ClusterStatusResponse clusterStatusResponse = HighAvailabilityTestUtil.sendClusterStatusRequest(restApp); + ClusterStatusResponse clusterStatusResponse + = HighAvailabilityTestUtil.sendClusterStatusRequest(restApp, credentials); List initialized = hosts.stream() .filter(hostInfo -> Optional.ofNullable( clusterStatusResponse @@ -392,11 +400,24 @@ public static void makeAdminRequest(TestKsqlRestApp restApp, final String sql) { RestIntegrationTestUtil.makeKsqlRequest(restApp, sql, Optional.empty()); } + public static void makeAdminRequest( + final TestKsqlRestApp restApp, + final String sql, + final Optional userCreds + ) { + RestIntegrationTestUtil.makeKsqlRequest(restApp, sql, userCreds); + } + public static List makeAdminRequestWithResponse( TestKsqlRestApp restApp, final String sql) { return RestIntegrationTestUtil.makeKsqlRequest(restApp, sql, Optional.empty()); } + public static List makeAdminRequestWithResponse( + TestKsqlRestApp restApp, final String sql, final Optional userCreds) { + return RestIntegrationTestUtil.makeKsqlRequest(restApp, sql, userCreds); + } + public static List makePullQueryRequest( final TestKsqlRestApp target, final String sql @@ -412,5 +433,15 @@ public static List makePullQueryRequest( return RestIntegrationTestUtil.makeQueryRequest(target, sql, Optional.empty(), properties, ImmutableMap.of(KsqlRequestConfig.KSQL_DEBUG_REQUEST, true)); } + + public static List makePullQueryRequest( + final TestKsqlRestApp target, + final String sql, + final Map properties, + final Optional userCreds + ) { + return RestIntegrationTestUtil.makeQueryRequest(target, sql, userCreds, + properties, ImmutableMap.of(KsqlRequestConfig.KSQL_DEBUG_REQUEST, true)); + } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java index dc21ea825b10..ed920002ab91 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java @@ -33,9 +33,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.common.utils.IntegrationTest; +import io.confluent.ksql.api.auth.AuthenticationPlugin; import io.confluent.ksql.integration.IntegrationTestHarness; import io.confluent.ksql.integration.Retry; import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.rest.client.BasicCredentials; import io.confluent.ksql.rest.entity.ActiveStandbyEntity; import io.confluent.ksql.rest.entity.ClusterStatusResponse; import io.confluent.ksql.rest.entity.KsqlEntity; @@ -55,12 +57,17 @@ import io.confluent.ksql.serde.FormatFactory; import io.confluent.ksql.serde.SerdeFeatures; import io.confluent.ksql.test.util.KsqlIdentifierTestUtil; +import io.confluent.ksql.test.util.TestBasicJaasConfig; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.UserDataProvider; +import io.vertx.core.WorkerExecutor; +import io.vertx.ext.web.RoutingContext; import java.io.IOException; +import java.security.Principal; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -122,6 +129,19 @@ public class PullQueryRoutingFunctionalTest { SerdeFeatures.of() ); + private static final String PROPS_JAAS_REALM = "KsqlServer-Props"; + private static final String KSQL_RESOURCE = "ksql-user"; + private static final String USER_WITH_ACCESS = "harry"; + private static final String USER_WITH_ACCESS_PWD = "changeme"; + private static final Optional USER_CREDS + = Optional.of(BasicCredentials.of(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD)); + + @ClassRule + public static final TestBasicJaasConfig JAAS_CONFIG = TestBasicJaasConfig + .builder(PROPS_JAAS_REALM) + .addUser(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD, KSQL_RESOURCE) + .build(); + private static final Map COMMON_CONFIG = ImmutableMap.builder() .put(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) .put(KsqlRestConfig.KSQL_HEARTBEAT_ENABLE_CONFIG, true) @@ -133,6 +153,14 @@ public class PullQueryRoutingFunctionalTest { .put(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STANDBY_READS, true) .put(KsqlConfig.KSQL_STREAMS_PREFIX + "num.standby.replicas", 1) .put(KsqlConfig.KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG, 1000) + .put(KsqlRestConfig.AUTHENTICATION_METHOD_CONFIG, KsqlRestConfig.AUTHENTICATION_METHOD_BASIC) + .put(KsqlRestConfig.AUTHENTICATION_REALM_CONFIG, PROPS_JAAS_REALM) + .put(KsqlRestConfig.AUTHENTICATION_ROLES_CONFIG, KSQL_RESOURCE) + .put(KsqlRestConfig.AUTHENTICATION_SKIP_PATHS_CONFIG, "/heartbeat,/lag") + // In order to whitelist the above paths for auth, we need to install a noop authentication + // plugin. In practice, these are internal paths so we're not interested in testing auth + // for them in these tests. + .put(KsqlRestConfig.KSQL_AUTHENTICATION_PLUGIN_CLASS, NoAuthPlugin.class) .build(); private static final Shutoffs APP_SHUTOFFS_0 = new Shutoffs(); @@ -192,7 +220,9 @@ public class PullQueryRoutingFunctionalTest { @ClassRule public static final RuleChain CHAIN = RuleChain .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) - .around(TEST_HARNESS).around(TMP); + .around(TEST_HARNESS) + .around(JAAS_CONFIG) + .around(TMP); @Rule public final Timeout timeout = Timeout.builder() @@ -228,7 +258,8 @@ public void setUp() { + " (" + USER_PROVIDER.ksqlSchemaString(false) + ")" + " WITH (" + " kafka_topic='" + topic + "', " - + " value_format='JSON');" + + " value_format='JSON');", + USER_CREDS ); //Create table output = KsqlIdentifierTestUtil.uniqueIdentifierName(); @@ -239,20 +270,21 @@ public void setUp() { REST_APP_0, "CREATE TABLE " + output + " AS" + " SELECT " + USER_PROVIDER.key() + ", COUNT(1) AS COUNT FROM " + USERS_STREAM - + " GROUP BY " + USER_PROVIDER.key() + ";" + + " GROUP BY " + USER_PROVIDER.key() + ";", + USER_CREDS ); queryId = extractQueryId(res.get(0).toString()); queryId = queryId.substring(0, queryId.length() - 1); waitForTableRows(); waitForStreamsMetadataToInitialize( - REST_APP_0, ImmutableList.of(HOST0, HOST1, HOST2), queryId); + REST_APP_0, ImmutableList.of(HOST0, HOST1, HOST2), queryId, USER_CREDS); } @After public void cleanUp() { - REST_APP_0.closePersistentQueries(); - REST_APP_0.dropSourcesExcept(); + REST_APP_0.closePersistentQueries(USER_CREDS); + REST_APP_0.dropSourcesExcept(USER_CREDS); APP_SHUTOFFS_0.reset(); APP_SHUTOFFS_1.reset(); APP_SHUTOFFS_2.reset(); @@ -262,18 +294,19 @@ public void cleanUp() { public void shouldQueryActiveWhenActiveAliveQueryIssuedToStandby() throws Exception { // Given: ClusterFormation clusterFormation = findClusterFormation(TEST_APP_0, TEST_APP_1, TEST_APP_2); - waitForClusterToBeDiscovered(clusterFormation.standBy.getApp(), 3); + waitForClusterToBeDiscovered(clusterFormation.standBy.getApp(), 3, USER_CREDS); waitForRemoteServerToChangeStatus(clusterFormation.router.getApp(), - clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(3)); + clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(3), USER_CREDS); waitForRemoteServerToChangeStatus( clusterFormation.standBy.getApp(), clusterFormation.active.getHost(), - HighAvailabilityTestUtil::remoteServerIsUp); + HighAvailabilityTestUtil::remoteServerIsUp, + USER_CREDS); // When: List rows_0 = - makePullQueryRequest(clusterFormation.standBy.getApp(), sql); + makePullQueryRequest(clusterFormation.standBy.getApp(), sql, null, USER_CREDS); // Then: assertThat(rows_0, hasSize(HEADER + 1)); @@ -289,9 +322,9 @@ public void shouldQueryActiveWhenActiveAliveQueryIssuedToStandby() throws Except public void shouldQueryActiveWhenActiveAliveStandbyDeadQueryIssuedToRouter() { // Given: ClusterFormation clusterFormation = findClusterFormation(TEST_APP_0, TEST_APP_1, TEST_APP_2); - waitForClusterToBeDiscovered(clusterFormation.router.getApp(), 3); + waitForClusterToBeDiscovered(clusterFormation.router.getApp(), 3, USER_CREDS); waitForRemoteServerToChangeStatus(clusterFormation.router.getApp(), - clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(3)); + clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(3), USER_CREDS); // Partition off the standby clusterFormation.standBy.getShutoffs().shutOffAll(); @@ -299,14 +332,17 @@ public void shouldQueryActiveWhenActiveAliveStandbyDeadQueryIssuedToRouter() { waitForRemoteServerToChangeStatus( clusterFormation.router.getApp(), clusterFormation.active.getHost(), - HighAvailabilityTestUtil::remoteServerIsUp); + HighAvailabilityTestUtil::remoteServerIsUp, + USER_CREDS); waitForRemoteServerToChangeStatus( clusterFormation.router.getApp(), clusterFormation.standBy.getHost(), - HighAvailabilityTestUtil::remoteServerIsDown); + HighAvailabilityTestUtil::remoteServerIsDown, + USER_CREDS); // When: - final List rows_0 = makePullQueryRequest(clusterFormation.router.getApp(), sql); + final List rows_0 = makePullQueryRequest(clusterFormation.router.getApp(), sql, + null, USER_CREDS); // Then: assertThat(rows_0, hasSize(HEADER + 1)); @@ -321,9 +357,9 @@ public void shouldQueryActiveWhenActiveAliveStandbyDeadQueryIssuedToRouter() { public void shouldQueryStandbyWhenActiveDeadStandbyAliveQueryIssuedToRouter() throws Exception { // Given: ClusterFormation clusterFormation = findClusterFormation(TEST_APP_0, TEST_APP_1, TEST_APP_2); - waitForClusterToBeDiscovered(clusterFormation.router.getApp(), 3); + waitForClusterToBeDiscovered(clusterFormation.router.getApp(), 3, USER_CREDS); waitForRemoteServerToChangeStatus(clusterFormation.router.getApp(), - clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(3)); + clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(3), USER_CREDS); // Partition off the active clusterFormation.active.getShutoffs().shutOffAll(); @@ -331,14 +367,17 @@ public void shouldQueryStandbyWhenActiveDeadStandbyAliveQueryIssuedToRouter() th waitForRemoteServerToChangeStatus( clusterFormation.router.getApp(), clusterFormation.standBy.getHost(), - HighAvailabilityTestUtil::remoteServerIsUp); + HighAvailabilityTestUtil::remoteServerIsUp, + USER_CREDS); waitForRemoteServerToChangeStatus( clusterFormation.router.getApp(), clusterFormation.active.getHost(), - HighAvailabilityTestUtil::remoteServerIsDown); + HighAvailabilityTestUtil::remoteServerIsDown, + USER_CREDS); // When: - final List rows_0 = makePullQueryRequest(clusterFormation.router.getApp(), sql); + final List rows_0 = makePullQueryRequest(clusterFormation.router.getApp(), sql, + null, USER_CREDS); // Then: assertThat(rows_0, hasSize(HEADER + 1)); @@ -355,9 +394,9 @@ public void shouldQueryStandbyWhenActiveDeadStandbyAliveQueryIssuedToRouter_mult throws Exception { // Given: ClusterFormation clusterFormation = findClusterFormation(TEST_APP_0, TEST_APP_1, TEST_APP_2); - waitForClusterToBeDiscovered(clusterFormation.router.getApp(), 3); + waitForClusterToBeDiscovered(clusterFormation.router.getApp(), 3, USER_CREDS); waitForRemoteServerToChangeStatus(clusterFormation.router.getApp(), - clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(3)); + clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(3), USER_CREDS); // Partition off the active clusterFormation.active.getShutoffs().shutOffAll(); @@ -365,15 +404,17 @@ public void shouldQueryStandbyWhenActiveDeadStandbyAliveQueryIssuedToRouter_mult waitForRemoteServerToChangeStatus( clusterFormation.router.getApp(), clusterFormation.standBy.getHost(), - HighAvailabilityTestUtil::remoteServerIsUp); + HighAvailabilityTestUtil::remoteServerIsUp, + USER_CREDS); waitForRemoteServerToChangeStatus( clusterFormation.router.getApp(), clusterFormation.active.getHost(), - HighAvailabilityTestUtil::remoteServerIsDown); + HighAvailabilityTestUtil::remoteServerIsDown, + USER_CREDS); // When: final List rows_0 = makePullQueryRequest(clusterFormation.router.getApp(), - sqlMultipleKeys); + sqlMultipleKeys, null, USER_CREDS); // Then: assertThat(rows_0, hasSize(HEADER + 2)); @@ -398,22 +439,23 @@ public void shouldQueryStandbyWhenActiveDeadStandbyAliveQueryIssuedToRouter_mult public void shouldFilterLaggyServers() throws Exception { // Given: ClusterFormation clusterFormation = findClusterFormation(TEST_APP_0, TEST_APP_1, TEST_APP_2); - waitForClusterToBeDiscovered(clusterFormation.router.getApp(), 3); + waitForClusterToBeDiscovered(clusterFormation.router.getApp(), 3, USER_CREDS); waitForRemoteServerToChangeStatus(clusterFormation.router.getApp(), - clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(3)); + clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(3), USER_CREDS); waitForRemoteServerToChangeStatus( clusterFormation.router.getApp(), clusterFormation.active.getHost(), - HighAvailabilityTestUtil::remoteServerIsUp); + HighAvailabilityTestUtil::remoteServerIsUp, USER_CREDS); waitForRemoteServerToChangeStatus( clusterFormation.router.getApp(), clusterFormation.standBy.getHost(), - HighAvailabilityTestUtil::remoteServerIsUp); + HighAvailabilityTestUtil::remoteServerIsUp, USER_CREDS); waitForRemoteServerToChangeStatus(clusterFormation.router.getApp(), clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(clusterFormation.standBy.getHost(), - Optional.empty(), 5)); + Optional.empty(), 5), + USER_CREDS); // Cut off standby from Kafka to simulate lag clusterFormation.standBy.getShutoffs().setKafkaPauseOffset(0); @@ -432,7 +474,8 @@ public void shouldFilterLaggyServers() throws Exception { waitForRemoteServerToChangeStatus(clusterFormation.router.getApp(), clusterFormation.router.getHost(), HighAvailabilityTestUtil.lagsReported(clusterFormation.active.getHost(), Optional.empty(), - 10)); + 10), + USER_CREDS); // Partition active off clusterFormation.active.getShutoffs().shutOffAll(); @@ -440,15 +483,17 @@ public void shouldFilterLaggyServers() throws Exception { waitForRemoteServerToChangeStatus( clusterFormation.router.getApp(), clusterFormation.standBy.getHost(), - HighAvailabilityTestUtil::remoteServerIsUp); + HighAvailabilityTestUtil::remoteServerIsUp, + USER_CREDS); waitForRemoteServerToChangeStatus( clusterFormation.router.getApp(), clusterFormation.active.getHost(), - HighAvailabilityTestUtil::remoteServerIsDown); + HighAvailabilityTestUtil::remoteServerIsDown, + USER_CREDS); // When: final List rows_0 = makePullQueryRequest( - clusterFormation.router.getApp(), sql, LAG_FILTER_6); + clusterFormation.router.getApp(), sql, LAG_FILTER_6, USER_CREDS); // Then: assertThat(rows_0, hasSize(HEADER + 1)); @@ -471,14 +516,15 @@ private static KsqlErrorMessage makePullQueryRequestWithError( final String sql, final Map properties ) { - return RestIntegrationTestUtil.makeQueryRequestWithError(target, sql, Optional.empty(), + return RestIntegrationTestUtil.makeQueryRequestWithError(target, sql, USER_CREDS, properties); } private ClusterFormation findClusterFormation( TestApp testApp0, TestApp testApp1, TestApp testApp2) { ClusterFormation clusterFormation = new ClusterFormation(); - ClusterStatusResponse clusterStatusResponse = HighAvailabilityTestUtil.sendClusterStatusRequest(testApp0.getApp()); + ClusterStatusResponse clusterStatusResponse + = HighAvailabilityTestUtil.sendClusterStatusRequest(testApp0.getApp(), USER_CREDS); ActiveStandbyEntity entity0 = clusterStatusResponse.getClusterStatus().get(testApp0.getHost()) .getActiveStandbyPerQuery().get(queryId); ActiveStandbyEntity entity1 = clusterStatusResponse.getClusterStatus().get(testApp1.getHost()) @@ -585,5 +631,19 @@ public Shutoffs getShutoffs() { return shutoffs; } } + + // AuthenticationPlugin which never returns a Principal + public static class NoAuthPlugin implements AuthenticationPlugin { + + @Override + public void configure(Map map) { + } + + @Override + public CompletableFuture handleAuth(RoutingContext routingContext, + WorkerExecutor workerExecutor) { + return CompletableFuture.completedFuture(null); + } + } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java index 2cb14cb64a0b..06f0daadb2f6 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java @@ -227,13 +227,24 @@ public Set getTransientQueries() { } public void closePersistentQueries() { - try (final KsqlRestClient client = buildKsqlClient()) { + closePersistentQueries(Optional.empty()); + } + + public void closePersistentQueries(final Optional credentials) { + try (final KsqlRestClient client = buildKsqlClient(credentials)) { terminateQueries(getPersistentQueries(client), client); } } public void dropSourcesExcept(final String... exceptSources) { - try (final KsqlRestClient client = buildKsqlClient()) { + dropSourcesExcept(Optional.empty(), exceptSources); + } + + public void dropSourcesExcept( + final Optional credential, + final String... exceptSources + ) { + try (final KsqlRestClient client = buildKsqlClient(credential)) { final Set except = Arrays.stream(exceptSources) .map(String::toUpperCase)