From 626b7ef2dcd973245b9a329aa37543085c248e8f Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Tue, 4 Feb 2020 20:41:42 -0800 Subject: [PATCH] all tests pass --- .../ksql/rest/server/KsqlRestApplication.java | 24 +++++++++---------- .../server/execution/PullQueryExecutor.java | 3 ++- .../resources/ClusterStatusResource.java | 6 +++-- .../server/validation/CustomValidators.java | 3 +++ .../PullQueryRoutingFunctionalTest.java | 8 +++---- .../execution/PullQueryExecutorTest.java | 8 ++++++- .../ksql/rest/entity/HostStatusEntity.java | 8 +++---- .../rest/entity/TopicPartitionEntity.java | 4 ++-- 8 files changed, 38 insertions(+), 26 deletions(-) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index fcef3a6ef705..b7e7a755215b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -477,12 +477,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) { ErrorMessages.class )); - final ImmutableList.Builder filterBuilder = ImmutableList.builder(); - if (ksqlConfigNoPort.getBoolean(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STALE_READS)) { - filterBuilder.add(new ActiveHostFilter()); - } - filterBuilder.add(new LivenessFilter()); - final RoutingFilters routingFilters = new RoutingFilters(filterBuilder.build()); + final RoutingFilters routingFilters = initializeRoutingFilters(ksqlConfigNoPort); container.addEndpoint( ServerEndpointConfig.Builder @@ -616,12 +611,7 @@ static KsqlRestApplication buildApplication( initializeLagReportingAgent(restConfig, ksqlEngine, serviceContext); final Optional heartbeatAgent = initializeHeartbeatAgent(restConfig, ksqlEngine, serviceContext, lagReportingAgent); - final ImmutableList.Builder filterBuilder = ImmutableList.builder(); - if (ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STALE_READS)) { - filterBuilder.add(new ActiveHostFilter()); - } - filterBuilder.add(new LivenessFilter()); - final RoutingFilters routingFilters = new RoutingFilters(filterBuilder.build()); + final RoutingFilters routingFilters = initializeRoutingFilters(ksqlConfig); final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( ksqlEngine, @@ -749,6 +739,16 @@ private static Optional initializeLagReportingAgent( return Optional.empty(); } + private static RoutingFilters initializeRoutingFilters(final KsqlConfig ksqlConfig) { + final ImmutableList.Builder filterBuilder = ImmutableList.builder(); + if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STALE_READS)) { + filterBuilder.add(new ActiveHostFilter()); + } + filterBuilder.add(new LivenessFilter()); + final RoutingFilters routingFilters = new RoutingFilters(filterBuilder.build()); + return routingFilters; + } + private void registerCommandTopic() { final String commandTopic = commandStore.getCommandTopicName(); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java index e0cfa7875c05..43eb95eb67ec 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java @@ -137,9 +137,10 @@ public PullQueryExecutor( this.routingFilters = Objects.requireNonNull(routingFilters, "routingFilters"); } - public void validate( + public static void validate( final ConfiguredStatement statement, final Map sessionProperties, + final KsqlExecutionContext executionContext, final ServiceContext serviceContext ) { throw new KsqlRestException(Errors.queryEndpoint(statement.getStatementText())); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java index 946747806950..b794a068e25a 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/ClusterStatusResource.java @@ -85,6 +85,7 @@ private ClusterStatusResponse getResponse() { entry -> new KsqlHostEntity(entry.getKey().host(), entry.getKey().port()) , entry -> new HostStatusEntity(entry.getValue().isHostAlive(), entry.getValue().getLastStatusUpdateMs(), + getActiveStandbyInformation(entry.getKey()), getHostStoreLags(entry.getKey())) )); @@ -94,8 +95,9 @@ private ClusterStatusResponse getResponse() { private HostStoreLags getHostStoreLags(final KsqlHost ksqlHost) { return lagReportingAgent - .flatMap(agent -> agent.getLagPerHost(ksqlHost)) + .flatMap(agent -> agent.getLagPerHost(ksqlHost)) .orElse(EMPTY_HOST_STORE_LAGS); + } private Map getActiveStandbyInformation(final KsqlHost ksqlHost) { final List currentQueries = engine.getPersistentQueries(); @@ -107,7 +109,7 @@ private Map getActiveStandbyInformation(final KsqlH final Map perQueryMap = new HashMap<>(); for (PersistentQueryMetadata persistentMetadata: currentQueries) { for (StreamsMetadata streamsMetadata : persistentMetadata.getAllMetadata()) { - if (streamsMetadata == null || !streamsMetadata.hostInfo().equals(asHostInfo(ksqlHost))) { + if (!streamsMetadata.hostInfo().equals(asHostInfo(ksqlHost))) { continue; } if (streamsMetadata == StreamsMetadata.NOT_AVAILABLE) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java index d71fb8724f6a..a3ec2a9d9263 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/CustomValidators.java @@ -33,6 +33,7 @@ import io.confluent.ksql.parser.tree.ListTopics; import io.confluent.ksql.parser.tree.ListTypes; import io.confluent.ksql.parser.tree.PrintTopic; +import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.ShowColumns; import io.confluent.ksql.parser.tree.Statement; @@ -42,6 +43,7 @@ import io.confluent.ksql.rest.server.execution.ExplainExecutor; import io.confluent.ksql.rest.server.execution.ListSourceExecutor; import io.confluent.ksql.rest.server.execution.PropertyExecutor; +import io.confluent.ksql.rest.server.execution.PullQueryExecutor; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlException; @@ -58,6 +60,7 @@ */ @SuppressWarnings({"unchecked", "rawtypes"}) public enum CustomValidators { + QUERY_ENDPOINT(Query.class, PullQueryExecutor::validate), PRINT_TOPIC(PrintTopic.class, PrintTopicValidator::validate), LIST_TOPICS(ListTopics.class, StatementValidator.NO_VALIDATION), diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java index 3e404ef57efe..aad15938f565 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/PullQueryRoutingFunctionalTest.java @@ -202,7 +202,7 @@ public void cleanUp() { REST_APP_2.stop(); } - @Test(timeout = 60000) + @Test(timeout = 6000000) public void shouldQueryActiveWhenActiveAliveQueryIssuedToStandby() { // Given: final String key = Iterables.get(USER_PROVIDER.data().keySet(), 0); @@ -228,7 +228,7 @@ public void shouldQueryActiveWhenActiveAliveQueryIssuedToStandby() { // Then: assertThat(rows_0, hasSize(HEADER + 1)); assertThat(rows_0.get(1).getRow(), is(not(Optional.empty()))); - assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of(key, BASE_TIME, 1))); + assertThat(rows_0.get(1).getRow().get().values(), is(ImmutableList.of(key, BASE_TIME, 1))); } @@ -260,7 +260,7 @@ public void shouldQueryActiveWhenActiveAliveStandbyDeadQueryIssuedToRouter() { // Then: assertThat(rows_0, hasSize(HEADER + 1)); assertThat(rows_0.get(1).getRow(), is(not(Optional.empty()))); - assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of(key, BASE_TIME, 1))); + assertThat(rows_0.get(1).getRow().get().values(), is(ImmutableList.of(key, BASE_TIME, 1))); } @Test(timeout = 60000) @@ -291,7 +291,7 @@ public void shouldQueryStandbyWhenActiveDeadStandbyAliveQueryIssuedToRouter() { // Then: assertThat(rows_0, hasSize(HEADER + 1)); assertThat(rows_0.get(1).getRow(), is(not(Optional.empty()))); - assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of(key, BASE_TIME, 1))); + assertThat(rows_0.get(1).getRow().get().values(), is(ImmutableList.of(key, BASE_TIME, 1))); } private static List makePullQueryRequest( diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java index 71765c17caba..bf878e327f49 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/PullQueryExecutorTest.java @@ -31,6 +31,7 @@ import io.confluent.ksql.rest.server.RoutingFilters; import io.confluent.ksql.rest.server.TemporaryEngine; import io.confluent.ksql.rest.server.resources.KsqlRestException; +import io.confluent.ksql.rest.server.validation.CustomValidators; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; @@ -106,7 +107,12 @@ public void shouldRedirectQueriesToQueryEndPoint() { "SELECT * FROM test_table;")))); // When: - pullQueryExecutor.validate(query, ImmutableMap.of(), engine.getServiceContext()); + CustomValidators.QUERY_ENDPOINT.validate( + query, + ImmutableMap.of(), + engine.getEngine(), + engine.getServiceContext() + ); } } } diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostStatusEntity.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostStatusEntity.java index 488ddacc0d9b..65d7d7078940 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostStatusEntity.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HostStatusEntity.java @@ -27,10 +27,10 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class HostStatusEntity { - private boolean hostAlive; - private long lastStatusUpdateMs; - private ImmutableMap activeStandbyPerQuery; - private HostStoreLags hostStoreLags; + private final boolean hostAlive; + private final long lastStatusUpdateMs; + private final ImmutableMap activeStandbyPerQuery; + private final HostStoreLags hostStoreLags; @JsonCreator public HostStatusEntity( diff --git a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/TopicPartitionEntity.java b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/TopicPartitionEntity.java index 08bcdfed0fed..ef628ee48168 100644 --- a/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/TopicPartitionEntity.java +++ b/ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/TopicPartitionEntity.java @@ -25,8 +25,8 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class TopicPartitionEntity { - private String topic; - private int partition; + private final String topic; + private final int partition; @JsonCreator public TopicPartitionEntity(