Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support CASE statements returning NULL #5703

Merged
merged 2 commits into from
Jun 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;

public final class SearchedCaseFunction {
Expand All @@ -32,11 +33,17 @@ public static <T> T searchedCaseFunction(
if (whenClauses.isEmpty()) {
throw new KsqlException("When clause cannot be empty.");
}
return whenClauses.stream()

final Optional<Optional<T>> found = whenClauses.stream()
.filter(clause -> clause.operand.get())
.map(clause -> clause.result.get())
.findFirst()
.orElseGet(defaultValue);
.map(clause -> Optional.ofNullable(clause.result.get()))
.findFirst();

if (found.isPresent()) {
return found.get().orElse(null);
}

return defaultValue.get();
}

public static <T> LazyWhenClause<T> whenClause(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package io.confluent.ksql.execution.codegen.helpers;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -200,5 +202,23 @@ public void shouldReturnDefaultIfNoMatch() {
// Then:
assertThat(result, equalTo(10));
}


@Test
public void shouldHandleNullReturnValues() {
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
// Given:
final List<SearchedCaseFunction.LazyWhenClause<Integer>> lazyWhenClauses = ImmutableList.of(
SearchedCaseFunction.whenClause(() -> true, () -> null)
);

// When:
final Integer result = SearchedCaseFunction.searchedCaseFunction(
lazyWhenClauses,
() -> {
throw new AssertionError("Should not be called");
}
);

// Then:
assertThat(result, is(nullValue()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM RUSSELL_3K_TRADES (TICKERID STRING KEY, QUANTITY INTEGER, PRICE INTEGER, BUY BOOLEAN) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='json');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "RUSSELL_3K_TRADES",
"schema" : "`TICKERID` STRING KEY, `QUANTITY` INTEGER, `PRICE` INTEGER, `BUY` BOOLEAN",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE TABLE RUSSELL_3K_HOURLY AS SELECT\n RUSSELL_3K_TRADES.TICKERID TICKERID,\n COUNT(*) TRADECOUNT,\n SUM(RUSSELL_3K_TRADES.QUANTITY) TRADEVOLUMN,\n MIN((CASE WHEN RUSSELL_3K_TRADES.BUY THEN RUSSELL_3K_TRADES.PRICE ELSE null END)) MINBUYPRICE,\n MAX((CASE WHEN RUSSELL_3K_TRADES.BUY THEN RUSSELL_3K_TRADES.PRICE ELSE null END)) MAXBUYPRICE,\n MIN((CASE WHEN RUSSELL_3K_TRADES.BUY THEN null ELSE RUSSELL_3K_TRADES.PRICE END)) MINSELLPRICE,\n MAX((CASE WHEN RUSSELL_3K_TRADES.BUY THEN null ELSE RUSSELL_3K_TRADES.PRICE END)) MAXSELLPRICE\nFROM RUSSELL_3K_TRADES RUSSELL_3K_TRADES\nWINDOW TUMBLING ( SIZE 1 HOURS ) \nGROUP BY RUSSELL_3K_TRADES.TICKERID\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createTableV1",
"sourceName" : "RUSSELL_3K_HOURLY",
"schema" : "`TICKERID` STRING KEY, `TRADECOUNT` BIGINT, `TRADEVOLUMN` INTEGER, `MINBUYPRICE` INTEGER, `MAXBUYPRICE` INTEGER, `MINSELLPRICE` INTEGER, `MAXSELLPRICE` INTEGER",
"topicName" : "RUSSELL_3K_HOURLY",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"windowInfo" : {
"type" : "TUMBLING",
"size" : 3600.000000000
}
},
"queryPlan" : {
"sources" : [ "RUSSELL_3K_TRADES" ],
"sink" : "RUSSELL_3K_HOURLY",
"physicalPlan" : {
"@type" : "tableSinkV1",
"properties" : {
"queryContext" : "RUSSELL_3K_HOURLY"
},
"source" : {
"@type" : "tableSelectV1",
"properties" : {
"queryContext" : "Aggregate/Project"
},
"source" : {
"@type" : "streamWindowedAggregateV1",
"properties" : {
"queryContext" : "Aggregate/Aggregate"
},
"source" : {
"@type" : "streamGroupByKeyV1",
"properties" : {
"queryContext" : "Aggregate/GroupBy"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Aggregate/Prepare"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`TICKERID` STRING KEY, `QUANTITY` INTEGER, `PRICE` INTEGER, `BUY` BOOLEAN"
},
"keyColumnNames" : [ "TICKERID" ],
"selectExpressions" : [ "TICKERID AS TICKERID", "ROWTIME AS ROWTIME", "QUANTITY AS QUANTITY", "BUY AS BUY", "PRICE AS PRICE", "(CASE WHEN BUY THEN PRICE ELSE null END) AS KSQL_INTERNAL_COL_5", "(CASE WHEN BUY THEN null ELSE PRICE END) AS KSQL_INTERNAL_COL_6" ]
},
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"nonAggregateColumns" : [ "TICKERID", "ROWTIME", "QUANTITY", "BUY", "PRICE" ],
"aggregationFunctions" : [ "COUNT(ROWTIME)", "SUM(QUANTITY)", "MIN(KSQL_INTERNAL_COL_5)", "MAX(KSQL_INTERNAL_COL_5)", "MIN(KSQL_INTERNAL_COL_6)", "MAX(KSQL_INTERNAL_COL_6)" ],
"windowExpression" : " TUMBLING ( SIZE 1 HOURS ) "
},
"keyColumnNames" : [ "TICKERID" ],
"selectExpressions" : [ "KSQL_AGG_VARIABLE_0 AS TRADECOUNT", "KSQL_AGG_VARIABLE_1 AS TRADEVOLUMN", "KSQL_AGG_VARIABLE_2 AS MINBUYPRICE", "KSQL_AGG_VARIABLE_3 AS MAXBUYPRICE", "KSQL_AGG_VARIABLE_4 AS MINSELLPRICE", "KSQL_AGG_VARIABLE_5 AS MAXSELLPRICE" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "RUSSELL_3K_HOURLY"
},
"queryId" : "CTAS_RUSSELL_3K_HOURLY_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.error.classifier.regex" : ""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
{
"version" : "6.1.0",
"timestamp" : 1593191100221,
"path" : "query-validation-tests/case-expression.json",
"schemas" : {
"CTAS_RUSSELL_3K_HOURLY_0.KsqlTopic.Source" : "STRUCT<QUANTITY INT, PRICE INT, BUY BOOLEAN> NOT NULL",
"CTAS_RUSSELL_3K_HOURLY_0.Aggregate.GroupBy" : "STRUCT<TICKERID VARCHAR, ROWTIME BIGINT, QUANTITY INT, BUY BOOLEAN, PRICE INT, KSQL_INTERNAL_COL_5 INT, KSQL_INTERNAL_COL_6 INT> NOT NULL",
"CTAS_RUSSELL_3K_HOURLY_0.Aggregate.Aggregate.Materialize" : "STRUCT<TICKERID VARCHAR, ROWTIME BIGINT, QUANTITY INT, BUY BOOLEAN, PRICE INT, KSQL_AGG_VARIABLE_0 BIGINT, KSQL_AGG_VARIABLE_1 INT, KSQL_AGG_VARIABLE_2 INT, KSQL_AGG_VARIABLE_3 INT, KSQL_AGG_VARIABLE_4 INT, KSQL_AGG_VARIABLE_5 INT> NOT NULL",
"CTAS_RUSSELL_3K_HOURLY_0.RUSSELL_3K_HOURLY" : "STRUCT<TRADECOUNT BIGINT, TRADEVOLUMN INT, MINBUYPRICE INT, MAXBUYPRICE INT, MINSELLPRICE INT, MAXSELLPRICE INT> NOT NULL"
},
"testCase" : {
"name" : "as param to UDAF",
"inputs" : [ {
"topic" : "test_topic",
"key" : "AEIS",
"value" : {
"userId" : "bob67",
"quantity" : 76,
"price" : 120125102,
"buy" : true
}
}, {
"topic" : "test_topic",
"key" : "AEIS",
"value" : {
"userId" : "bob67",
"quantity" : 10,
"price" : 100125102,
"buy" : false
}
} ],
"outputs" : [ {
"topic" : "RUSSELL_3K_HOURLY",
"key" : "AEIS",
"value" : {
"TRADECOUNT" : 1,
"TRADEVOLUMN" : 76,
"MINBUYPRICE" : 120125102,
"MAXBUYPRICE" : 120125102,
"MINSELLPRICE" : null,
"MAXSELLPRICE" : null
},
"window" : {
"start" : 0,
"end" : 3600000,
"type" : "TIME"
}
}, {
"topic" : "RUSSELL_3K_HOURLY",
"key" : "AEIS",
"value" : {
"TRADECOUNT" : 2,
"TRADEVOLUMN" : 86,
"MINBUYPRICE" : 120125102,
"MAXBUYPRICE" : 120125102,
"MINSELLPRICE" : 100125102,
"MAXSELLPRICE" : 100125102
},
"window" : {
"start" : 0,
"end" : 3600000,
"type" : "TIME"
}
} ],
"topics" : [ {
"name" : "RUSSELL_3K_HOURLY",
"replicas" : 1,
"numPartitions" : 4
}, {
"name" : "test_topic",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM russell_3k_trades (tickerId VARCHAR KEY, quantity INT, price INT, buy BOOLEAN) WITH (kafka_topic='test_topic',value_format='json');", "CREATE TABLE russell_3k_hourly AS SELECT tickerId, count(*) as tradeCount, sum(quantity) as tradeVolumn, min(CASE WHEN buy THEN price ELSE null END) as minBuyPrice, max(CASE WHEN buy THEN price ELSE null END) as maxBuyPrice, min(CASE WHEN buy THEN null ELSE price END) as minSellPrice, max(CASE WHEN buy THEN null ELSE price END) as maxSellPrice FROM russell_3k_trades WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY tickerId;" ],
"post" : {
"topics" : {
"topics" : [ {
"name" : "RUSSELL_3K_HOURLY",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
},
"windowInfo" : {
"type" : "TUMBLING",
"size" : 3600.000000000
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
}, {
"name" : "_confluent-ksql-some.ksql.service.idquery_CTAS_RUSSELL_3K_HOURLY_0-Aggregate-Aggregate-Materialize-changelog",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
},
"windowInfo" : {
"type" : "TUMBLING",
"size" : 3600.000000000
}
},
"valueFormat" : {
"format" : "JSON"
}
}, {
"name" : "test_topic",
"keyFormat" : {
"formatInfo" : {
"format" : "KAFKA"
}
},
"valueFormat" : {
"format" : "JSON"
},
"partitions" : 4
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Aggregate-Prepare
<-- KSTREAM-SOURCE-0000000000
Processor: Aggregate-Prepare (stores: [])
--> KSTREAM-AGGREGATE-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-AGGREGATE-0000000003 (stores: [Aggregate-Aggregate-Materialize])
--> Aggregate-Aggregate-ToOutputSchema
<-- Aggregate-Prepare
Processor: Aggregate-Aggregate-ToOutputSchema (stores: [])
--> Aggregate-Aggregate-WindowSelect
<-- KSTREAM-AGGREGATE-0000000003
Processor: Aggregate-Aggregate-WindowSelect (stores: [])
--> Aggregate-Project
<-- Aggregate-Aggregate-ToOutputSchema
Processor: Aggregate-Project (stores: [])
--> KTABLE-TOSTREAM-0000000007
<-- Aggregate-Aggregate-WindowSelect
Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
--> KSTREAM-SINK-0000000008
<-- Aggregate-Project
Sink: KSTREAM-SINK-0000000008 (topic: RUSSELL_3K_HOURLY)
<-- KTABLE-TOSTREAM-0000000007

Loading