Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for IN clause to pull queries #6409

Merged
merged 18 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_QUERY_PULL_MAX_QPS_DOC = "The maximum qps allowed for pull "
+ "queries. Once the limit is hit, queries will fail immediately";

public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG
= "ksql.query.pull.thread.pool.size";
public static final Integer KSQL_QUERY_PULL_THREAD_POOL_SIZE_DEFAULT = 100;
public static final String KSQL_QUERY_PULL_THREAD_POOL_SIZE_DOC =
"Size of thread pool used for sending/executing pull queries";

public static final String KSQL_STRING_CASE_CONFIG_TOGGLE = "ksql.cast.strings.preserve.nulls";
public static final String KSQL_STRING_CASE_CONFIG_TOGGLE_DOC =
"When casting a SQLType to string, if false, use String.valueof(), else if true use"
Expand Down Expand Up @@ -746,6 +752,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PULL_MAX_QPS_DOC
)
.define(
KSQL_QUERY_PULL_THREAD_POOL_SIZE_CONFIG,
Type.INT,
KSQL_QUERY_PULL_THREAD_POOL_SIZE_DEFAULT,
Importance.LOW,
KSQL_QUERY_PULL_THREAD_POOL_SIZE_DOC
)
.define(
KSQL_ERROR_CLASSIFIER_REGEX_PREFIX,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public class KsqlRequestConfig extends AbstractConfig {
private static final String KSQL_DEBUG_REQUEST_DOC =
"Indicates whether a KsqlRequest should contain debugging information.";

public static final String KSQL_REQUEST_QUERY_PULL_PARTITIONS =
"request.ksql.query.pull.partition";
public static final String KSQL_REQUEST_QUERY_PULL_PARTITIONS_DEFAULT = "";
private static final String KSQL_REQUEST_QUERY_PULL_PARTITIONS_DOC =
"Indicates which partitions to limit pull queries to.";

private static ConfigDef buildConfigDef() {
final ConfigDef configDef = new ConfigDef()
.define(
Expand All @@ -64,6 +70,12 @@ private static ConfigDef buildConfigDef() {
KSQL_DEBUG_REQUEST_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_DEBUG_REQUEST_DOC
).define(
KSQL_REQUEST_QUERY_PULL_PARTITIONS,
Type.LIST,
KSQL_REQUEST_QUERY_PULL_PARTITIONS_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_REQUEST_QUERY_PULL_PARTITIONS_DOC
);
return configDef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class KsMaterializationFunctionalTest {
private static final Format VALUE_FORMAT = JSON;
private static final UserDataProvider USER_DATA_PROVIDER = new UserDataProvider();
private static final PageViewDataProvider PAGE_VIEW_DATA_PROVIDER = new PageViewDataProvider();
private static final int PARTITION = 0;

private static final Duration WINDOW_SIZE = Duration.ofSeconds(5);
private static final Duration WINDOW_SEGMENT_DURATION = Duration.ofSeconds(60);
Expand Down Expand Up @@ -253,14 +254,14 @@ public void shouldQueryMaterializedTableForAggregatedTable() {
rows.forEach((rowKey, value) -> {
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema());

final Optional<Row> row = withRetry(() -> table.get(key));
final Optional<Row> row = withRetry(() -> table.get(key, PARTITION));
assertThat(row.map(Row::schema), is(Optional.of(schema)));
assertThat(row.map(Row::key), is(Optional.of(key)));
assertThat(row.map(Row::value), is(Optional.of(value)));
});

final Struct key = asKeyStruct("Won't find me", query.getPhysicalSchema());
assertThat("unknown key", withRetry(() -> table.get(key)), is(Optional.empty()));
assertThat("unknown key", withRetry(() -> table.get(key, PARTITION)), is(Optional.empty()));
}

@Test
Expand All @@ -287,14 +288,14 @@ public void shouldQueryMaterializedTableForAggregatedStream() {
rows.forEach((rowKey, value) -> {
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema());

final Optional<Row> row = withRetry(() -> table.get(key));
final Optional<Row> row = withRetry(() -> table.get(key, PARTITION));
assertThat(row.map(Row::schema), is(Optional.of(schema)));
assertThat(row.map(Row::key), is(Optional.of(key)));
assertThat(row.map(Row::value), is(Optional.of(value)));
});

final Struct key = asKeyStruct("Won't find me", query.getPhysicalSchema());
assertThat("unknown key", withRetry(() -> table.get(key)), is(Optional.empty()));
assertThat("unknown key", withRetry(() -> table.get(key, PARTITION)), is(Optional.empty()));
}

@Test
Expand Down Expand Up @@ -325,7 +326,7 @@ public void shouldQueryMaterializedTableForTumblingWindowed() {
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart =
withRetry(() -> table.get(key, Range.singleton(w.start()), Range.all()));
withRetry(() -> table.get(key, PARTITION, Range.singleton(w.start()), Range.all()));

assertThat("at exact window start", resultAtWindowStart, hasSize(1));
assertThat(resultAtWindowStart.get(0).schema(), is(schema));
Expand All @@ -334,16 +335,18 @@ public void shouldQueryMaterializedTableForTumblingWindowed() {
assertThat(resultAtWindowStart.get(0).value(), is(v));

final List<WindowedRow> resultAtWindowEnd =
withRetry(() -> table.get(key, Range.all(), Range.singleton(w.end())));
withRetry(() -> table.get(key, PARTITION, Range.all(), Range.singleton(w.end())));
assertThat("at exact window end", resultAtWindowEnd, hasSize(1));

final List<WindowedRow> resultFromRange = withRetry(() -> withRetry(() -> table
.get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)), Range.all())));
.get(key, PARTITION, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)),
Range.all())));

assertThat("range including window start", resultFromRange, is(resultAtWindowStart));

final List<WindowedRow> resultPast = withRetry(() -> table
.get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)), Range.all()));
.get(key, PARTITION, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)),
Range.all()));
assertThat("past start", resultPast, is(empty())
);
});
Expand Down Expand Up @@ -378,7 +381,7 @@ public void shouldQueryMaterializedTableForHoppingWindowed() {
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart =
withRetry(() -> table.get(key, Range.singleton(w.start()), Range.all()));
withRetry(() -> table.get(key, PARTITION, Range.singleton(w.start()), Range.all()));

assertThat("at exact window start", resultAtWindowStart, hasSize(1));
assertThat(resultAtWindowStart.get(0).schema(), is(schema));
Expand All @@ -387,16 +390,18 @@ public void shouldQueryMaterializedTableForHoppingWindowed() {
assertThat(resultAtWindowStart.get(0).value(), is(v));

final List<WindowedRow> resultAtWindowEnd =
withRetry(() -> table.get(key, Range.all(), Range.singleton(w.end())));
withRetry(() -> table.get(key, PARTITION, Range.all(), Range.singleton(w.end())));
assertThat("at exact window end", resultAtWindowEnd, hasSize(1));

final List<WindowedRow> resultFromRange = withRetry(() -> table
.get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)), Range.all()));
.get(key, PARTITION, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)),
Range.all()));

assertThat("range including window start", resultFromRange, is(resultAtWindowStart));

final List<WindowedRow> resultPast = withRetry(() -> table
.get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)), Range.all()));
.get(key, PARTITION, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)),
Range.all()));

assertThat("past start", resultPast, is(empty()));
});
Expand Down Expand Up @@ -430,7 +435,7 @@ public void shouldQueryMaterializedTableForSessionWindowed() {
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart =
withRetry(() -> table.get(key, Range.singleton(w.start()), Range.all()));
withRetry(() -> table.get(key, PARTITION, Range.singleton(w.start()), Range.all()));

assertThat("at exact window start", resultAtWindowStart, hasSize(1));
assertThat(resultAtWindowStart.get(0).schema(), is(schema));
Expand All @@ -439,15 +444,17 @@ public void shouldQueryMaterializedTableForSessionWindowed() {
assertThat(resultAtWindowStart.get(0).value(), is(v));

final List<WindowedRow> resultAtWindowEnd =
withRetry(() -> table.get(key, Range.all(), Range.singleton(w.end())));
withRetry(() -> table.get(key, PARTITION, Range.all(), Range.singleton(w.end())));
assertThat("at exact window end", resultAtWindowEnd, hasSize(1));

final List<WindowedRow> resultFromRange = withRetry(() -> table
.get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)), Range.all()));
.get(key, PARTITION, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)),
Range.all()));
assertThat("range including window start", resultFromRange, is(resultAtWindowStart));

final List<WindowedRow> resultPast = withRetry(() -> table
.get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)), Range.all()));
.get(key, PARTITION, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)),
Range.all()));
assertThat("past start", resultPast, is(empty()));
});
}
Expand Down Expand Up @@ -577,7 +584,7 @@ public void shouldQueryMaterializedTableWithKeyFieldsInProjection() {
rows.forEach((rowKey, value) -> {
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema());

final Optional<Row> row = withRetry(() -> table.get(key));
final Optional<Row> row = withRetry(() -> table.get(key, PARTITION));
assertThat(row.map(Row::schema), is(Optional.of(schema)));
assertThat(row.map(Row::key), is(Optional.of(key)));
assertThat(row.map(Row::value), is(Optional.of(value)));
Expand Down Expand Up @@ -611,7 +618,7 @@ public void shouldQueryMaterializedTableWithMultipleAggregationColumns() {
rows.forEach((rowKey, value) -> {
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema());

final Optional<Row> row = withRetry(() -> table.get(key));
final Optional<Row> row = withRetry(() -> table.get(key, PARTITION));
assertThat(row.map(Row::schema), is(Optional.of(schema)));
assertThat(row.map(Row::key), is(Optional.of(key)));
assertThat(row.map(Row::value), is(Optional.of(value)));
Expand Down Expand Up @@ -649,7 +656,7 @@ public void shouldHandleHavingClause() {
// Rows passing the HAVING clause:
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema());

final Optional<Row> row = withRetry(() -> table.get(key));
final Optional<Row> row = withRetry(() -> table.get(key, PARTITION));
assertThat(row.map(Row::schema), is(Optional.of(schema)));
assertThat(row.map(Row::key), is(Optional.of(key)));
assertThat(row.map(Row::value), is(Optional.of(value)));
Expand All @@ -659,7 +666,7 @@ public void shouldHandleHavingClause() {
.filter(e -> !rows.containsKey(e.getKey().getString("USERID")))
.forEach(e -> {
// Rows filtered by the HAVING clause:
final Optional<Row> row = withRetry(() -> table.get(e.getKey()));
final Optional<Row> row = withRetry(() -> table.get(e.getKey(), PARTITION));
assertThat(row, is(Optional.empty()));
});
}
Expand All @@ -673,7 +680,7 @@ private static void verifyRetainedWindows(
rows.forEach(record -> {
final Struct key = asKeyStruct(record.key().key(), query.getPhysicalSchema());
final List<WindowedRow> resultAtWindowStart =
withRetry(() -> table.get(key, Range.all(), Range.all()));
withRetry(() -> table.get(key, PARTITION, Range.all(), Range.all()));

assertThat("Should have fewer windows retained",
resultAtWindowStart,
Expand Down
Loading