-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: long-running queries shouldn't block the main event loop #7420
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,10 +21,13 @@ | |
import static io.confluent.ksql.util.KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG; | ||
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.allOf; | ||
import static org.hamcrest.Matchers.contains; | ||
import static org.hamcrest.Matchers.containsInAnyOrder; | ||
import static org.hamcrest.Matchers.containsString; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.hasItem; | ||
import static org.hamcrest.Matchers.hasProperty; | ||
import static org.hamcrest.Matchers.hasSize; | ||
import static org.hamcrest.Matchers.instanceOf; | ||
import static org.hamcrest.Matchers.is; | ||
|
@@ -69,8 +72,11 @@ | |
import io.confluent.ksql.integration.IntegrationTestHarness; | ||
import io.confluent.ksql.integration.Retry; | ||
import io.confluent.ksql.name.ColumnName; | ||
import io.confluent.ksql.rest.client.RestResponse; | ||
import io.confluent.ksql.rest.client.StreamPublisher; | ||
import io.confluent.ksql.rest.entity.ConnectorList; | ||
import io.confluent.ksql.rest.entity.KsqlEntity; | ||
import io.confluent.ksql.rest.entity.StreamedRow; | ||
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; | ||
import io.confluent.ksql.rest.server.ConnectExecutable; | ||
import io.confluent.ksql.rest.server.TestKsqlRestApp; | ||
|
@@ -203,10 +209,18 @@ public class ClientIntegrationTest { | |
|
||
private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); | ||
|
||
// these properties are set together to allow us to verify that we can handle push queries | ||
// in the worker pool without blocking the event loop. | ||
private static final int EVENT_LOOP_POOL_SIZE = 1; | ||
private static final int NUM_CONCURRENT_REQUESTS_TO_TEST = 5; | ||
private static final int WORKER_POOL_SIZE = 10; | ||
|
||
private static final TestKsqlRestApp REST_APP = TestKsqlRestApp | ||
.builder(TEST_HARNESS::kafkaBootstrapServers) | ||
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) | ||
.withProperty(KSQL_DEFAULT_KEY_FORMAT_CONFIG, "JSON") | ||
.withProperty("ksql.verticle.instances", EVENT_LOOP_POOL_SIZE) | ||
.withProperty("ksql.worker.pool.size", WORKER_POOL_SIZE) | ||
.build(); | ||
|
||
@ClassRule | ||
|
@@ -296,6 +310,30 @@ public void tearDown() { | |
REST_APP.getServiceContext().close(); | ||
} | ||
|
||
@Test | ||
public void shouldStreamMultiplePushQueries() throws Exception { | ||
// When | ||
final StreamedQueryResult[] streamedQueryResults = new StreamedQueryResult[NUM_CONCURRENT_REQUESTS_TO_TEST]; | ||
for (int i = 0; i < streamedQueryResults.length; i++) { | ||
streamedQueryResults[i] = client.streamQuery(PUSH_QUERY).get(); | ||
} | ||
|
||
// Then | ||
for (final StreamedQueryResult streamedQueryResult : streamedQueryResults) { | ||
assertThat(streamedQueryResult.columnNames(), is(TEST_COLUMN_NAMES)); | ||
assertThat(streamedQueryResult.columnTypes(), is(TEST_COLUMN_TYPES)); | ||
assertThat(streamedQueryResult.queryID(), is(notNullValue())); | ||
} | ||
|
||
for (final StreamedQueryResult streamedQueryResult : streamedQueryResults) { | ||
shouldReceiveStreamRows(streamedQueryResult, false); | ||
} | ||
|
||
for (final StreamedQueryResult streamedQueryResult : streamedQueryResults) { | ||
assertThat(streamedQueryResult.isComplete(), is(false)); | ||
} | ||
} | ||
|
||
@Test | ||
public void shouldStreamPushQueryAsync() throws Exception { | ||
// When | ||
|
@@ -958,29 +996,24 @@ public void shouldListTopics() throws Exception { | |
} | ||
|
||
@Test | ||
public void shouldListQueries() { | ||
public void shouldListQueries() throws ExecutionException, InterruptedException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test was flake-prone, since it assumes that it could serialize itself after other tests (i.e., wait until all other tests' queries have stopped) and that other tests wouldn't start new queries during its execution. There doesn't seem to be a reason for this, so I changed the test just to verify the thing it wanted to verify, regardless of what other queries are running. |
||
// When | ||
// Try multiple times to allow time for queries started by the other tests to finish terminating | ||
final List<QueryInfo> queries = assertThatEventually(() -> { | ||
try { | ||
return client.listQueries().get(); | ||
} catch (Exception e) { | ||
return Collections.emptyList(); | ||
} | ||
}, hasSize(1)); | ||
final List<QueryInfo> queries = client.listQueries().get(); | ||
|
||
// Then | ||
assertThat(queries.get(0).getQueryType(), is(QueryType.PERSISTENT)); | ||
assertThat(queries.get(0).getId(), is("CTAS_" + AGG_TABLE + "_5")); | ||
assertThat(queries.get(0).getSql(), is( | ||
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n" | ||
+ " " + TEST_STREAM + ".K K,\n" | ||
+ " LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\n" | ||
+ "FROM " + TEST_STREAM + " " + TEST_STREAM + "\n" | ||
+ "GROUP BY " + TEST_STREAM + ".K\n" | ||
+ "EMIT CHANGES;")); | ||
assertThat(queries.get(0).getSink(), is(Optional.of(AGG_TABLE))); | ||
assertThat(queries.get(0).getSinkTopic(), is(Optional.of(AGG_TABLE))); | ||
assertThat(queries, hasItem(allOf( | ||
hasProperty("queryType", is(QueryType.PERSISTENT)), | ||
hasProperty("id", is("CTAS_" + AGG_TABLE + "_5")), | ||
hasProperty("sql", is( | ||
"CREATE TABLE " + AGG_TABLE + " WITH (KAFKA_TOPIC='" + AGG_TABLE + "', PARTITIONS=1, REPLICAS=1) AS SELECT\n" | ||
+ " " + TEST_STREAM + ".K K,\n" | ||
+ " LATEST_BY_OFFSET(" + TEST_STREAM + ".LONG) LONG\n" | ||
+ "FROM " + TEST_STREAM + " " + TEST_STREAM + "\n" | ||
+ "GROUP BY " + TEST_STREAM + ".K\n" | ||
+ "EMIT CHANGES;")), | ||
hasProperty("sink", is(Optional.of(AGG_TABLE))), | ||
hasProperty("sinkTopic", is(Optional.of(AGG_TABLE))) | ||
))); | ||
} | ||
|
||
@Test | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
/* | ||
* Copyright 2020 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
package io.confluent.ksql.api.client.integration; | ||
|
||
import com.google.common.collect.ImmutableListMultimap; | ||
import com.google.common.collect.ImmutableMap; | ||
import io.confluent.common.utils.IntegrationTest; | ||
import io.confluent.ksql.integration.IntegrationTestHarness; | ||
import io.confluent.ksql.integration.Retry; | ||
import io.confluent.ksql.name.ColumnName; | ||
import io.confluent.ksql.rest.client.RestResponse; | ||
import io.confluent.ksql.rest.client.StreamPublisher; | ||
import io.confluent.ksql.rest.entity.KsqlEntity; | ||
import io.confluent.ksql.rest.entity.StreamedRow; | ||
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; | ||
import io.confluent.ksql.rest.server.TestKsqlRestApp; | ||
import io.confluent.ksql.schema.ksql.LogicalSchema; | ||
import io.confluent.ksql.schema.ksql.PhysicalSchema; | ||
import io.confluent.ksql.schema.ksql.types.SqlTypes; | ||
import io.confluent.ksql.serde.Format; | ||
import io.confluent.ksql.serde.FormatFactory; | ||
import io.confluent.ksql.serde.SerdeFeature; | ||
import io.confluent.ksql.serde.SerdeFeatures; | ||
import io.confluent.ksql.util.StructuredTypesDataProvider; | ||
import io.confluent.ksql.util.TestDataProvider; | ||
import io.vertx.core.Vertx; | ||
import kafka.zookeeper.ZooKeeperClientException; | ||
import org.apache.kafka.connect.json.JsonConverter; | ||
import org.apache.kafka.connect.storage.StringConverter; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.apache.kafka.test.TestUtils; | ||
import org.junit.After; | ||
import org.junit.AfterClass; | ||
import org.junit.Before; | ||
import org.junit.BeforeClass; | ||
import org.junit.ClassRule; | ||
import org.junit.Test; | ||
import org.junit.experimental.categories.Category; | ||
import org.junit.rules.RuleChain; | ||
|
||
import java.io.FileOutputStream; | ||
import java.io.OutputStreamWriter; | ||
import java.io.PrintWriter; | ||
import java.nio.charset.StandardCharsets; | ||
import java.nio.file.Files; | ||
import java.nio.file.Paths; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.UUID; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static io.confluent.ksql.util.KsqlConfig.KSQL_DEFAULT_KEY_FORMAT_CONFIG; | ||
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; | ||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.*; | ||
|
||
/** | ||
* This integration test is displaced from the rest-client package | ||
* to make use of utilities that are not available there. | ||
*/ | ||
Comment on lines
+70
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is pretty obnoxious. The integration test needs to start a REST server, but in the rest-client test module, we don't have a dependency on the rest-server test module. And we can't add one because rest-server test module already depends on rest-client main module. I can think of two solutions that are potentially better than putting it here: create a new "integration-test" module that depends on all the other modules, or relocate this integration test to the rest-server test module. |
||
@Category(IntegrationTest.class) | ||
public class RestClientIntegrationTest { | ||
|
||
private static final StructuredTypesDataProvider TEST_DATA_PROVIDER = new StructuredTypesDataProvider(); | ||
private static final String TEST_TOPIC = TEST_DATA_PROVIDER.topicName(); | ||
private static final String TEST_STREAM = TEST_DATA_PROVIDER.sourceName(); | ||
|
||
private static final Format KEY_FORMAT = FormatFactory.JSON; | ||
private static final Format VALUE_FORMAT = FormatFactory.JSON; | ||
|
||
private static final String AGG_TABLE = "AGG_TABLE"; | ||
private static final PhysicalSchema AGG_SCHEMA = PhysicalSchema.from( | ||
LogicalSchema.builder() | ||
.keyColumn(ColumnName.of("K"), SqlTypes.struct() | ||
.field("F1", SqlTypes.array(SqlTypes.STRING)) | ||
.build()) | ||
.valueColumn(ColumnName.of("LONG"), SqlTypes.BIGINT) | ||
.build(), | ||
SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES), | ||
SerdeFeatures.of() | ||
); | ||
|
||
private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER = new TestDataProvider( | ||
"EMPTY_STRUCTURED_TYPES", TEST_DATA_PROVIDER.schema(), ImmutableListMultimap.of()); | ||
private static final String EMPTY_TEST_TOPIC = EMPTY_TEST_DATA_PROVIDER.topicName(); | ||
|
||
private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER_2 = new TestDataProvider( | ||
"EMPTY_STRUCTURED_TYPES_2", TEST_DATA_PROVIDER.schema(), ImmutableListMultimap.of()); | ||
private static final String EMPTY_TEST_TOPIC_2 = EMPTY_TEST_DATA_PROVIDER_2.topicName(); | ||
|
||
private static final String PUSH_QUERY = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES;"; | ||
|
||
private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); | ||
|
||
// these properties are set together to allow us to verify that we can handle push queries | ||
// in the worker pool without blocking the event loop. | ||
private static final int EVENT_LOOP_POOL_SIZE = 1; | ||
private static final int NUM_CONCURRENT_REQUESTS_TO_TEST = 5; | ||
private static final int WORKER_POOL_SIZE = 10; | ||
|
||
private static final TestKsqlRestApp REST_APP = TestKsqlRestApp | ||
.builder(TEST_HARNESS::kafkaBootstrapServers) | ||
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) | ||
.withProperty(KSQL_DEFAULT_KEY_FORMAT_CONFIG, "JSON") | ||
.withProperty("ksql.verticle.instances", EVENT_LOOP_POOL_SIZE) | ||
.withProperty("ksql.worker.pool.size", WORKER_POOL_SIZE) | ||
.build(); | ||
|
||
@ClassRule | ||
public static final RuleChain CHAIN = RuleChain | ||
.outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) | ||
.around(TEST_HARNESS) | ||
.around(REST_APP); | ||
|
||
@BeforeClass | ||
public static void setUpClass() throws Exception { | ||
TEST_HARNESS.ensureTopics(TEST_TOPIC, EMPTY_TEST_TOPIC, EMPTY_TEST_TOPIC_2); | ||
TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, KEY_FORMAT, VALUE_FORMAT); | ||
RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER); | ||
RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER); | ||
RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER_2); | ||
|
||
makeKsqlRequest("CREATE TABLE " + AGG_TABLE + " AS " | ||
+ "SELECT K, LATEST_BY_OFFSET(LONG) AS LONG FROM " + TEST_STREAM + " GROUP BY K;" | ||
); | ||
|
||
TEST_HARNESS.verifyAvailableUniqueRows( | ||
AGG_TABLE, | ||
4, // Only unique keys are counted | ||
KEY_FORMAT, | ||
VALUE_FORMAT, | ||
AGG_SCHEMA | ||
); | ||
|
||
final String testDir = Paths.get(TestUtils.tempDirectory().getAbsolutePath(), "client_integ_test").toString(); | ||
final String connectFilePath = Paths.get(testDir, "connect.properties").toString(); | ||
Files.createDirectories(Paths.get(testDir)); | ||
|
||
writeConnectConfigs(connectFilePath, ImmutableMap.<String, String>builder() | ||
.put("bootstrap.servers", TEST_HARNESS.kafkaBootstrapServers()) | ||
.put("group.id", UUID.randomUUID().toString()) | ||
.put("key.converter", StringConverter.class.getName()) | ||
.put("value.converter", JsonConverter.class.getName()) | ||
.put("offset.storage.topic", "connect-offsets") | ||
.put("status.storage.topic", "connect-status") | ||
.put("config.storage.topic", "connect-config") | ||
.put("offset.storage.replication.factor", "1") | ||
.put("status.storage.replication.factor", "1") | ||
.put("config.storage.replication.factor", "1") | ||
.put("value.converter.schemas.enable", "false") | ||
.build() | ||
); | ||
|
||
} | ||
|
||
private static void writeConnectConfigs(final String path, final Map<String, String> configs) throws Exception { | ||
try (PrintWriter out = new PrintWriter(new OutputStreamWriter( | ||
new FileOutputStream(path, true), StandardCharsets.UTF_8))) { | ||
for (Map.Entry<String, String> entry : configs.entrySet()) { | ||
out.println(entry.getKey() + "=" + entry.getValue()); | ||
} | ||
} | ||
} | ||
|
||
@AfterClass | ||
public static void classTearDown() { | ||
REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";")); | ||
} | ||
|
||
private Vertx vertx; | ||
|
||
@Before | ||
public void setUp() { | ||
vertx = Vertx.vertx(); | ||
} | ||
|
||
@After | ||
public void tearDown() { | ||
if (vertx != null) { | ||
vertx.close(); | ||
} | ||
REST_APP.getServiceContext().close(); | ||
} | ||
|
||
@Test(timeout = 120000L) | ||
public void shouldStreamMultiplePushQueriesRest() { | ||
final List<RestResponse<StreamPublisher<StreamedRow>>> responses = new ArrayList<>(NUM_CONCURRENT_REQUESTS_TO_TEST); | ||
|
||
// We should be able to serve multiple pull queries at once, even | ||
// though we only have one event-loop thread, because we have enough | ||
// workers in the worker pool. | ||
for(long i = 0; i < NUM_CONCURRENT_REQUESTS_TO_TEST; i++) { | ||
responses.add(REST_APP.buildKsqlClient().makeQueryRequestStreamed(PUSH_QUERY,i)); | ||
} | ||
|
||
assertThat(responses, everyItem(hasProperty("successful", is(true)))); | ||
|
||
for (final RestResponse<StreamPublisher<StreamedRow>> response : responses) { | ||
response.getResponse().close(); | ||
} | ||
} | ||
|
||
private static List<KsqlEntity> makeKsqlRequest(final String sql) { | ||
return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -132,30 +132,34 @@ private static void streamEndpointResponse(final Server server, | |
final long startTimeNanos) { | ||
final WorkerExecutor workerExecutor = server.getWorkerExecutor(); | ||
final VertxCompletableFuture<Void> vcf = new VertxCompletableFuture<>(); | ||
workerExecutor.executeBlocking(promise -> { | ||
final OutputStream ros = new ResponseOutputStream(routingContext.response(), | ||
streamingOutput.getWriteTimeoutMs()); | ||
routingContext.request().connection().closeHandler(v -> { | ||
// Close the OutputStream on close of the HTTP connection | ||
workerExecutor.executeBlocking( | ||
promise -> { | ||
final OutputStream ros = new ResponseOutputStream(routingContext.response(), | ||
streamingOutput.getWriteTimeoutMs()); | ||
routingContext.request().connection().closeHandler(v -> { | ||
// Close the OutputStream on close of the HTTP connection | ||
try { | ||
ros.close(); | ||
} catch (IOException e) { | ||
promise.fail(e); | ||
} | ||
}); | ||
try { | ||
ros.close(); | ||
} catch (IOException e) { | ||
streamingOutput.write(new BufferedOutputStream(ros)); | ||
promise.complete(); | ||
} catch (Exception e) { | ||
promise.fail(e); | ||
} finally { | ||
try { | ||
ros.close(); | ||
} catch (IOException ignore) { | ||
// Ignore - it might already be closed | ||
} | ||
} | ||
}); | ||
try { | ||
streamingOutput.write(new BufferedOutputStream(ros)); | ||
promise.complete(); | ||
} catch (Exception e) { | ||
promise.fail(e); | ||
} finally { | ||
try { | ||
ros.close(); | ||
} catch (IOException ignore) { | ||
// Ignore - it might already be closed | ||
} | ||
} | ||
}, vcf); | ||
}, | ||
false /*if this is true, worker execution blocks the main event loop*/, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A lot of lines changed because I fixed the indentation, but this is what actually changed here. |
||
vcf | ||
); | ||
vcf.handle((v, throwable) -> { | ||
reportMetrics(routingContext, metricsCallbackHolder, startTimeNanos); | ||
return null; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works with and without my patch, but it doesn't hurt to have a test for it.