Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: long-running queries shouldn't block the main event loop #7420

Merged
merged 4 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import static io.confluent.ksql.util.KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG;
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -69,8 +72,11 @@
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.client.StreamPublisher;
import io.confluent.ksql.rest.entity.ConnectorList;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.ConnectExecutable;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
Expand Down Expand Up @@ -203,10 +209,18 @@ public class ClientIntegrationTest {

private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();

// these properties are set together to allow us to verify that we can handle push queries
// in the worker pool without blocking the event loop.
private static final int EVENT_LOOP_POOL_SIZE = 1;
private static final int NUM_CONCURRENT_REQUESTS_TO_TEST = 5;
private static final int WORKER_POOL_SIZE = 10;

private static final TestKsqlRestApp REST_APP = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KSQL_DEFAULT_KEY_FORMAT_CONFIG, "JSON")
.withProperty("ksql.verticle.instances", EVENT_LOOP_POOL_SIZE)
.withProperty("ksql.worker.pool.size", WORKER_POOL_SIZE)
.build();

@ClassRule
Expand Down Expand Up @@ -296,6 +310,30 @@ public void tearDown() {
REST_APP.getServiceContext().close();
}

@Test
public void shouldStreamMultiplePushQueries() throws Exception {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works with and without my patch, but it doesn't hurt to have a test for it.

// When
final StreamedQueryResult[] streamedQueryResults = new StreamedQueryResult[NUM_CONCURRENT_REQUESTS_TO_TEST];
for (int i = 0; i < streamedQueryResults.length; i++) {
streamedQueryResults[i] = client.streamQuery(PUSH_QUERY).get();
}

// Then
for (final StreamedQueryResult streamedQueryResult : streamedQueryResults) {
assertThat(streamedQueryResult.columnNames(), is(TEST_COLUMN_NAMES));
assertThat(streamedQueryResult.columnTypes(), is(TEST_COLUMN_TYPES));
assertThat(streamedQueryResult.queryID(), is(notNullValue()));
}

for (final StreamedQueryResult streamedQueryResult : streamedQueryResults) {
shouldReceiveStreamRows(streamedQueryResult, false);
}

for (final StreamedQueryResult streamedQueryResult : streamedQueryResults) {
assertThat(streamedQueryResult.isComplete(), is(false));
}
}

@Test
public void shouldStreamPushQueryAsync() throws Exception {
// When
Expand Down Expand Up @@ -958,29 +996,24 @@ public void shouldListTopics() throws Exception {
}

@Test
public void shouldListQueries() {
public void shouldListQueries() throws ExecutionException, InterruptedException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was flake-prone, since it assumes that it could serialize itself after other tests (i.e., wait until all other tests' queries have stopped) and that other tests wouldn't start new queries during its execution.

There doesn't seem to be a reason for this, so I changed the test just to verify the thing it wanted to verify, regardless of what other queries are running.

// When
// Try multiple times to allow time for queries started by the other tests to finish terminating
final List<QueryInfo> queries = assertThatEventually(() -> {
try {
return client.listQueries().get();
} catch (Exception e) {
return Collections.emptyList();
}
}, hasSize(1));
final List<QueryInfo> queries = client.listQueries().get();

// Then
assertThat(queries.get(0).getQueryType(), is(QueryType.PERSISTENT));
assertThat(queries.get(0).getId(), is("CTAS_" + AGG_TABLE + "_5"));
assertThat(queries.get(0).getSql(), is(
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n"
+ " " + TEST_STREAM + ".K K,\n"
+ " LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\n"
+ "FROM " + TEST_STREAM + " " + TEST_STREAM + "\n"
+ "GROUP BY " + TEST_STREAM + ".K\n"
+ "EMIT CHANGES;"));
assertThat(queries.get(0).getSink(), is(Optional.of(AGG_TABLE)));
assertThat(queries.get(0).getSinkTopic(), is(Optional.of(AGG_TABLE)));
assertThat(queries, hasItem(allOf(
hasProperty("queryType", is(QueryType.PERSISTENT)),
hasProperty("id", is("CTAS_" + AGG_TABLE + "_5")),
hasProperty("sql", is(
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n"
+ " " + TEST_STREAM + ".K K,\n"
+ " LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\n"
+ "FROM " + TEST_STREAM + " " + TEST_STREAM + "\n"
+ "GROUP BY " + TEST_STREAM + ".K\n"
+ "EMIT CHANGES;")),
hasProperty("sink", is(Optional.of(AGG_TABLE))),
hasProperty("sinkTopic", is(Optional.of(AGG_TABLE)))
)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.confluent.ksql.api.client.integration;

import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.client.StreamPublisher;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.util.StructuredTypesDataProvider;
import io.confluent.ksql.util.TestDataProvider;
import io.vertx.core.Vertx;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;

import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static io.confluent.ksql.util.KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG;
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;

/**
* This integration test is displaced from the rest-client package
* to make use of utilities that are not available there.
*/
Comment on lines +70 to +73
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty obnoxious.

The integration test needs to start a REST server, but in the rest-client test module, we don't have a dependency on the rest-server test module. And we can't add one because rest-server test module already depends on rest-client main module.

I can think of two solutions that are potentially better than putting it here: create a new "integration-test" module that depends on all the other modules, or relocate this integration test to the rest-server test module.

@Category(IntegrationTest.class)
public class RestClientIntegrationTest {

private static final StructuredTypesDataProvider TEST_DATA_PROVIDER = new StructuredTypesDataProvider();
private static final String TEST_TOPIC = TEST_DATA_PROVIDER.topicName();
private static final String TEST_STREAM = TEST_DATA_PROVIDER.sourceName();

private static final Format KEY_FORMAT = FormatFactory.JSON;
private static final Format VALUE_FORMAT = FormatFactory.JSON;

private static final String AGG_TABLE = "AGG_TABLE";
private static final PhysicalSchema AGG_SCHEMA = PhysicalSchema.from(
LogicalSchema.builder()
.keyColumn(ColumnName.of("K"), SqlTypes.struct()
.field("F1", SqlTypes.array(SqlTypes.STRING))
.build())
.valueColumn(ColumnName.of("LONG"), SqlTypes.BIGINT)
.build(),
SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES),
SerdeFeatures.of()
);

private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER = new TestDataProvider(
"EMPTY_STRUCTURED_TYPES", TEST_DATA_PROVIDER.schema(), ImmutableListMultimap.of());
private static final String EMPTY_TEST_TOPIC = EMPTY_TEST_DATA_PROVIDER.topicName();

private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER_2 = new TestDataProvider(
"EMPTY_STRUCTURED_TYPES_2", TEST_DATA_PROVIDER.schema(), ImmutableListMultimap.of());
private static final String EMPTY_TEST_TOPIC_2 = EMPTY_TEST_DATA_PROVIDER_2.topicName();

private static final String PUSH_QUERY = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES;";

private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();

// these properties are set together to allow us to verify that we can handle push queries
// in the worker pool without blocking the event loop.
private static final int EVENT_LOOP_POOL_SIZE = 1;
private static final int NUM_CONCURRENT_REQUESTS_TO_TEST = 5;
private static final int WORKER_POOL_SIZE = 10;

private static final TestKsqlRestApp REST_APP = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KSQL_DEFAULT_KEY_FORMAT_CONFIG, "JSON")
.withProperty("ksql.verticle.instances", EVENT_LOOP_POOL_SIZE)
.withProperty("ksql.worker.pool.size", WORKER_POOL_SIZE)
.build();

@ClassRule
public static final RuleChain CHAIN = RuleChain
.outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS))
.around(TEST_HARNESS)
.around(REST_APP);

@BeforeClass
public static void setUpClass() throws Exception {
TEST_HARNESS.ensureTopics(TEST_TOPIC, EMPTY_TEST_TOPIC, EMPTY_TEST_TOPIC_2);
TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, KEY_FORMAT, VALUE_FORMAT);
RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER);
RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER);
RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER_2);

makeKsqlRequest("CREATE TABLE " + AGG_TABLE + " AS "
+ "SELECT K, LATEST_BY_OFFSET(LONG) AS LONG FROM " + TEST_STREAM + " GROUP BY K;"
);

TEST_HARNESS.verifyAvailableUniqueRows(
AGG_TABLE,
4, // Only unique keys are counted
KEY_FORMAT,
VALUE_FORMAT,
AGG_SCHEMA
);

final String testDir = Paths.get(TestUtils.tempDirectory().getAbsolutePath(), "client_integ_test").toString();
final String connectFilePath = Paths.get(testDir, "connect.properties").toString();
Files.createDirectories(Paths.get(testDir));

writeConnectConfigs(connectFilePath, ImmutableMap.<String, String>builder()
.put("bootstrap.servers", TEST_HARNESS.kafkaBootstrapServers())
.put("group.id", UUID.randomUUID().toString())
.put("key.converter", StringConverter.class.getName())
.put("value.converter", JsonConverter.class.getName())
.put("offset.storage.topic", "connect-offsets")
.put("status.storage.topic", "connect-status")
.put("config.storage.topic", "connect-config")
.put("offset.storage.replication.factor", "1")
.put("status.storage.replication.factor", "1")
.put("config.storage.replication.factor", "1")
.put("value.converter.schemas.enable", "false")
.build()
);

}

private static void writeConnectConfigs(final String path, final Map<String, String> configs) throws Exception {
try (PrintWriter out = new PrintWriter(new OutputStreamWriter(
new FileOutputStream(path, true), StandardCharsets.UTF_8))) {
for (Map.Entry<String, String> entry : configs.entrySet()) {
out.println(entry.getKey() + "=" + entry.getValue());
}
}
}

@AfterClass
public static void classTearDown() {
REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";"));
}

private Vertx vertx;

@Before
public void setUp() {
vertx = Vertx.vertx();
}

@After
public void tearDown() {
if (vertx != null) {
vertx.close();
}
REST_APP.getServiceContext().close();
}

@Test(timeout = 120000L)
public void shouldStreamMultiplePushQueriesRest() {
final List<RestResponse<StreamPublisher<StreamedRow>>> responses = new ArrayList<>(NUM_CONCURRENT_REQUESTS_TO_TEST);

// We should be able to serve multiple pull queries at once, even
// though we only have one event-loop thread, because we have enough
// workers in the worker pool.
for(long i = 0; i < NUM_CONCURRENT_REQUESTS_TO_TEST; i++) {
responses.add(REST_APP.buildKsqlClient().makeQueryRequestStreamed(PUSH_QUERY,i));
}

assertThat(responses, everyItem(hasProperty("successful", is(true))));

for (final RestResponse<StreamPublisher<StreamedRow>> response : responses) {
response.getResponse().close();
}
}

private static List<KsqlEntity> makeKsqlRequest(final String sql) {
return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,30 +132,34 @@ private static void streamEndpointResponse(final Server server,
final long startTimeNanos) {
final WorkerExecutor workerExecutor = server.getWorkerExecutor();
final VertxCompletableFuture<Void> vcf = new VertxCompletableFuture<>();
workerExecutor.executeBlocking(promise -> {
final OutputStream ros = new ResponseOutputStream(routingContext.response(),
streamingOutput.getWriteTimeoutMs());
routingContext.request().connection().closeHandler(v -> {
// Close the OutputStream on close of the HTTP connection
workerExecutor.executeBlocking(
promise -> {
final OutputStream ros = new ResponseOutputStream(routingContext.response(),
streamingOutput.getWriteTimeoutMs());
routingContext.request().connection().closeHandler(v -> {
// Close the OutputStream on close of the HTTP connection
try {
ros.close();
} catch (IOException e) {
promise.fail(e);
}
});
try {
ros.close();
} catch (IOException e) {
streamingOutput.write(new BufferedOutputStream(ros));
promise.complete();
} catch (Exception e) {
promise.fail(e);
} finally {
try {
ros.close();
} catch (IOException ignore) {
// Ignore - it might already be closed
}
}
});
try {
streamingOutput.write(new BufferedOutputStream(ros));
promise.complete();
} catch (Exception e) {
promise.fail(e);
} finally {
try {
ros.close();
} catch (IOException ignore) {
// Ignore - it might already be closed
}
}
}, vcf);
},
false /*if this is true, worker execution blocks the main event loop*/,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of lines changed because I fixed the indentation, but this is what actually changed here.

vcf
);
vcf.handle((v, throwable) -> {
reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos);
return null;
Expand Down
Loading