Skip to content

Commit

Permalink
all tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
vpapavas committed Feb 5, 2020
1 parent 6f33186 commit 626b7ef
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -477,12 +477,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) {
ErrorMessages.class
));

final ImmutableList.Builder<RoutingFilter> filterBuilder = ImmutableList.builder();
if (ksqlConfigNoPort.getBoolean(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STALE_READS)) {
filterBuilder.add(new ActiveHostFilter());
}
filterBuilder.add(new LivenessFilter());
final RoutingFilters routingFilters = new RoutingFilters(filterBuilder.build());
final RoutingFilters routingFilters = initializeRoutingFilters(ksqlConfigNoPort);

container.addEndpoint(
ServerEndpointConfig.Builder
Expand Down Expand Up @@ -616,12 +611,7 @@ static KsqlRestApplication buildApplication(
initializeLagReportingAgent(restConfig, ksqlEngine, serviceContext);
final Optional<HeartbeatAgent> heartbeatAgent =
initializeHeartbeatAgent(restConfig, ksqlEngine, serviceContext, lagReportingAgent);
final ImmutableList.Builder<RoutingFilter> filterBuilder = ImmutableList.builder();
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STALE_READS)) {
filterBuilder.add(new ActiveHostFilter());
}
filterBuilder.add(new LivenessFilter());
final RoutingFilters routingFilters = new RoutingFilters(filterBuilder.build());
final RoutingFilters routingFilters = initializeRoutingFilters(ksqlConfig);

final StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
ksqlEngine,
Expand Down Expand Up @@ -749,6 +739,16 @@ private static Optional<LagReportingAgent> initializeLagReportingAgent(
return Optional.empty();
}

private static RoutingFilters initializeRoutingFilters(final KsqlConfig ksqlConfig) {
final ImmutableList.Builder<RoutingFilter> filterBuilder = ImmutableList.builder();
if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_QUERY_PULL_ENABLE_STALE_READS)) {
filterBuilder.add(new ActiveHostFilter());
}
filterBuilder.add(new LivenessFilter());
final RoutingFilters routingFilters = new RoutingFilters(filterBuilder.build());
return routingFilters;
}

private void registerCommandTopic() {

final String commandTopic = commandStore.getCommandTopicName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ public PullQueryExecutor(
this.routingFilters = Objects.requireNonNull(routingFilters, "routingFilters");
}

public void validate(
public static void validate(
final ConfiguredStatement<Query> statement,
final Map<String, ?> sessionProperties,
final KsqlExecutionContext executionContext,
final ServiceContext serviceContext
) {
throw new KsqlRestException(Errors.queryEndpoint(statement.getStatementText()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private ClusterStatusResponse getResponse() {
entry -> new KsqlHostEntity(entry.getKey().host(), entry.getKey().port()) ,
entry -> new HostStatusEntity(entry.getValue().isHostAlive(),
entry.getValue().getLastStatusUpdateMs(),
getActiveStandbyInformation(entry.getKey()),
getHostStoreLags(entry.getKey()))
));

Expand All @@ -94,8 +95,9 @@ private ClusterStatusResponse getResponse() {

private HostStoreLags getHostStoreLags(final KsqlHost ksqlHost) {
return lagReportingAgent
.flatMap(agent -> agent.getLagPerHost(ksqlHost))
.flatMap(agent -> agent.getLagPerHost(ksqlHost))
.orElse(EMPTY_HOST_STORE_LAGS);
}

private Map<String, ActiveStandbyEntity> getActiveStandbyInformation(final KsqlHost ksqlHost) {
final List<PersistentQueryMetadata> currentQueries = engine.getPersistentQueries();
Expand All @@ -107,7 +109,7 @@ private Map<String, ActiveStandbyEntity> getActiveStandbyInformation(final KsqlH
final Map<String, ActiveStandbyEntity> perQueryMap = new HashMap<>();
for (PersistentQueryMetadata persistentMetadata: currentQueries) {
for (StreamsMetadata streamsMetadata : persistentMetadata.getAllMetadata()) {
if (streamsMetadata == null || !streamsMetadata.hostInfo().equals(asHostInfo(ksqlHost))) {
if (!streamsMetadata.hostInfo().equals(asHostInfo(ksqlHost))) {
continue;
}
if (streamsMetadata == StreamsMetadata.NOT_AVAILABLE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.parser.tree.ListTopics;
import io.confluent.ksql.parser.tree.ListTypes;
import io.confluent.ksql.parser.tree.PrintTopic;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.ShowColumns;
import io.confluent.ksql.parser.tree.Statement;
Expand All @@ -42,6 +43,7 @@
import io.confluent.ksql.rest.server.execution.ExplainExecutor;
import io.confluent.ksql.rest.server.execution.ListSourceExecutor;
import io.confluent.ksql.rest.server.execution.PropertyExecutor;
import io.confluent.ksql.rest.server.execution.PullQueryExecutor;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlException;
Expand All @@ -58,6 +60,7 @@
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public enum CustomValidators {
QUERY_ENDPOINT(Query.class, PullQueryExecutor::validate),
PRINT_TOPIC(PrintTopic.class, PrintTopicValidator::validate),

LIST_TOPICS(ListTopics.class, StatementValidator.NO_VALIDATION),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void cleanUp() {
REST_APP_2.stop();
}

@Test(timeout = 60000)
@Test(timeout = 6000000)
public void shouldQueryActiveWhenActiveAliveQueryIssuedToStandby() {
// Given:
final String key = Iterables.get(USER_PROVIDER.data().keySet(), 0);
Expand All @@ -228,7 +228,7 @@ public void shouldQueryActiveWhenActiveAliveQueryIssuedToStandby() {
// Then:
assertThat(rows_0, hasSize(HEADER + 1));
assertThat(rows_0.get(1).getRow(), is(not(Optional.empty())));
assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of(key, BASE_TIME, 1)));
assertThat(rows_0.get(1).getRow().get().values(), is(ImmutableList.of(key, BASE_TIME, 1)));
}


Expand Down Expand Up @@ -260,7 +260,7 @@ public void shouldQueryActiveWhenActiveAliveStandbyDeadQueryIssuedToRouter() {
// Then:
assertThat(rows_0, hasSize(HEADER + 1));
assertThat(rows_0.get(1).getRow(), is(not(Optional.empty())));
assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of(key, BASE_TIME, 1)));
assertThat(rows_0.get(1).getRow().get().values(), is(ImmutableList.of(key, BASE_TIME, 1)));
}

@Test(timeout = 60000)
Expand Down Expand Up @@ -291,7 +291,7 @@ public void shouldQueryStandbyWhenActiveDeadStandbyAliveQueryIssuedToRouter() {
// Then:
assertThat(rows_0, hasSize(HEADER + 1));
assertThat(rows_0.get(1).getRow(), is(not(Optional.empty())));
assertThat(rows_0.get(1).getRow().get().getColumns(), is(ImmutableList.of(key, BASE_TIME, 1)));
assertThat(rows_0.get(1).getRow().get().values(), is(ImmutableList.of(key, BASE_TIME, 1)));
}

private static List<StreamedRow> makePullQueryRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.rest.server.RoutingFilters;
import io.confluent.ksql.rest.server.TemporaryEngine;
import io.confluent.ksql.rest.server.resources.KsqlRestException;
import io.confluent.ksql.rest.server.validation.CustomValidators;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
Expand Down Expand Up @@ -106,7 +107,12 @@ public void shouldRedirectQueriesToQueryEndPoint() {
"SELECT * FROM test_table;"))));

// When:
pullQueryExecutor.validate(query, ImmutableMap.of(), engine.getServiceContext());
CustomValidators.QUERY_ENDPOINT.validate(
query,
ImmutableMap.of(),
engine.getEngine(),
engine.getServiceContext()
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class HostStatusEntity {

private boolean hostAlive;
private long lastStatusUpdateMs;
private ImmutableMap<String, ActiveStandbyEntity> activeStandbyPerQuery;
private HostStoreLags hostStoreLags;
private final boolean hostAlive;
private final long lastStatusUpdateMs;
private final ImmutableMap<String, ActiveStandbyEntity> activeStandbyPerQuery;
private final HostStoreLags hostStoreLags;

@JsonCreator
public HostStatusEntity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class TopicPartitionEntity {

private String topic;
private int partition;
private final String topic;
private final int partition;

@JsonCreator
public TopicPartitionEntity(
Expand Down

0 comments on commit 626b7ef

Please sign in to comment.