Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(static): support ROWKEY in the projection of static queries #3439

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public Void visitColumnReference(
final ColumnReferenceExp node,
final ExpressionTypeContext expressionTypeContext
) {
final Column schemaColumn = schema.findValueColumn(node.getReference().toString())
final Column schemaColumn = schema.findColumn(node.getReference().name())
.orElseThrow(() ->
new KsqlException(String.format("Invalid Expression %s.", node.toString())));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.execution.util;

import static io.confluent.ksql.execution.testutil.TestExpressions.ADDRESS;
import static io.confluent.ksql.execution.testutil.TestExpressions.COL0;
import static io.confluent.ksql.execution.testutil.TestExpressions.COL1;
import static io.confluent.ksql.execution.testutil.TestExpressions.COL2;
import static io.confluent.ksql.execution.testutil.TestExpressions.COL3;
Expand Down Expand Up @@ -54,6 +53,7 @@
import io.confluent.ksql.execution.expression.tree.TimestampLiteral;
import io.confluent.ksql.execution.expression.tree.WhenClause;
import io.confluent.ksql.execution.function.udf.structfieldextractor.FetchFieldFromStruct;
import io.confluent.ksql.execution.testutil.TestExpressions;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlFunction;
import io.confluent.ksql.function.UdfFactory;
Expand All @@ -79,6 +79,10 @@

@SuppressWarnings("OptionalGetWithoutIsPresent")
public class ExpressionTypeManagerTest {

private static final SourceName TEST1 = SourceName.of("TEST1");
private static final ColumnName COL0 = ColumnName.of("COL0");

@Mock
private FunctionRegistry functionRegistry;
@Mock
Expand Down Expand Up @@ -116,7 +120,8 @@ private void givenUdfWithNameAndReturnType(

@Test
public void shouldResolveTypeForAddBigIntDouble() {
final Expression expression = new ArithmeticBinaryExpression(Operator.ADD, COL0, COL3);
final Expression expression = new ArithmeticBinaryExpression(Operator.ADD, TestExpressions.COL0,
COL3);

final SqlType type = expressionTypeManager.getExpressionSqlType(expression);

Expand All @@ -134,7 +139,8 @@ public void shouldResolveTypeForAddDoubleIntegerLiteral() {

@Test
public void shouldResolveTypeForAddBigintIntegerLiteral() {
final Expression expression = new ArithmeticBinaryExpression(Operator.ADD, COL0, literal(10));
final Expression expression = new ArithmeticBinaryExpression(Operator.ADD, TestExpressions.COL0,
literal(10));

final SqlType type = expressionTypeManager.getExpressionSqlType(expression);

Expand All @@ -144,7 +150,7 @@ public void shouldResolveTypeForAddBigintIntegerLiteral() {
@Test
public void shouldResolveTypeForMultiplyBigintIntegerLiteral() {
final Expression expression =
new ArithmeticBinaryExpression(Operator.MULTIPLY, COL0, literal(10));
new ArithmeticBinaryExpression(Operator.MULTIPLY, TestExpressions.COL0, literal(10));

final SqlType type = expressionTypeManager.getExpressionSqlType(expression);

Expand All @@ -153,7 +159,8 @@ public void shouldResolveTypeForMultiplyBigintIntegerLiteral() {

@Test
public void testComparisonExpr() {
final Expression expression = new ComparisonExpression(Type.GREATER_THAN, COL0, COL3);
final Expression expression = new ComparisonExpression(Type.GREATER_THAN, TestExpressions.COL0,
COL3);

final SqlType exprType = expressionTypeManager.getExpressionSqlType(expression);

Expand All @@ -163,7 +170,8 @@ public void testComparisonExpr() {
@Test
public void shouldFailIfComparisonOperandsAreIncompatible() {
// Given:
final ComparisonExpression expr = new ComparisonExpression(Type.GREATER_THAN, COL0, COL1);
final ComparisonExpression expr = new ComparisonExpression(Type.GREATER_THAN,
TestExpressions.COL0, COL1);
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Operator GREATER_THAN cannot be used to compare BIGINT and STRING");

Expand Down Expand Up @@ -283,7 +291,7 @@ public void shouldThrowOnStructFieldDereference() {
// Given:
final Expression expression = new DereferenceExpression(
Optional.empty(),
new ColumnReferenceExp(ColumnRef.of(SourceName.of("TEST1"), ColumnName.of("COL6"))),
new ColumnReferenceExp(ColumnRef.of(TEST1, ColumnName.of("COL6"))),
"STREET"
);

Expand Down Expand Up @@ -322,10 +330,11 @@ public void shouldEvaluateTypeForStructDereferenceInArray() {
// Given:
final SqlStruct inner = SqlTypes.struct().field("IN0", SqlTypes.INTEGER).build();
final LogicalSchema schema = LogicalSchema.builder()
.valueColumn(ColumnName.of("TEST1.COL0"), SqlTypes.array(inner))
.valueColumn(TEST1, COL0, SqlTypes.array(inner))
.build();
expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);
final Expression arrayRef = new SubscriptExpression(COL0, new IntegerLiteral(1));
final Expression arrayRef = new SubscriptExpression(TestExpressions.COL0,
new IntegerLiteral(1));
final Expression expression = new FunctionCall(
FunctionName.of(FetchFieldFromStruct.FUNCTION_NAME),
ImmutableList.of(arrayRef, new StringLiteral("IN0"))
Expand All @@ -343,12 +352,12 @@ public void shouldEvaluateTypeForArrayReferenceInStruct() {
.field("IN0", SqlTypes.array(SqlTypes.INTEGER))
.build();
final LogicalSchema schema = LogicalSchema.builder()
.valueColumn(ColumnName.of("TEST1.COL0"), inner)
.valueColumn(TEST1, COL0, inner)
.build();
expressionTypeManager = new ExpressionTypeManager(schema, functionRegistry);
final Expression structRef = new FunctionCall(
FunctionName.of(FetchFieldFromStruct.FUNCTION_NAME),
ImmutableList.of(COL0, new StringLiteral("IN0"))
ImmutableList.of(TestExpressions.COL0, new StringLiteral("IN0"))
);
final Expression expression = new SubscriptExpression(structRef, new IntegerLiteral(1));

Expand Down Expand Up @@ -387,7 +396,7 @@ public void shouldGetCorrectSchemaForSearchedCaseWhenStruct() {
final Expression expression = new SearchedCaseExpression(
ImmutableList.of(
new WhenClause(
new ComparisonExpression(Type.EQUAL, COL0, new IntegerLiteral(10)),
new ComparisonExpression(Type.EQUAL, TestExpressions.COL0, new IntegerLiteral(10)),
ADDRESS)
),
Optional.empty()
Expand All @@ -407,7 +416,8 @@ public void shouldFailIfWhenIsNotBoolean() {
final Expression expression = new SearchedCaseExpression(
ImmutableList.of(
new WhenClause(
new ArithmeticBinaryExpression(Operator.ADD, COL0, new IntegerLiteral(10)),
new ArithmeticBinaryExpression(Operator.ADD, TestExpressions.COL0,
new IntegerLiteral(10)),
new StringLiteral("foo"))
),
Optional.empty()
Expand All @@ -425,10 +435,10 @@ public void shouldFailOnInconsistentWhenResultType() {
final Expression expression = new SearchedCaseExpression(
ImmutableList.of(
new WhenClause(
new ComparisonExpression(Type.EQUAL, COL0, new IntegerLiteral(100)),
new ComparisonExpression(Type.EQUAL, TestExpressions.COL0, new IntegerLiteral(100)),
new StringLiteral("one-hundred")),
new WhenClause(
new ComparisonExpression(Type.EQUAL, COL0, new IntegerLiteral(10)),
new ComparisonExpression(Type.EQUAL, TestExpressions.COL0, new IntegerLiteral(10)),
new IntegerLiteral(10))
),
Optional.empty()
Expand All @@ -447,7 +457,7 @@ public void shouldFailIfDefaultHasDifferentTypeToWhen() {
final Expression expression = new SearchedCaseExpression(
ImmutableList.of(
new WhenClause(
new ComparisonExpression(Type.EQUAL, COL0, new IntegerLiteral(10)),
new ComparisonExpression(Type.EQUAL, TestExpressions.COL0, new IntegerLiteral(10)),
new StringLiteral("good"))
),
Optional.of(new BooleanLiteral("true"))
Expand Down Expand Up @@ -481,7 +491,7 @@ public void shouldThrowOnTimestampLiteral() {
public void shouldThrowOnIn() {
// Given:
final Expression expression = new InPredicate(
COL0,
TestExpressions.COL0,
new InListExpression(ImmutableList.of(new IntegerLiteral(1), new IntegerLiteral(2)))
);

Expand All @@ -495,7 +505,7 @@ public void shouldThrowOnIn() {
@Test
public void shouldThrowOnSimpleCase() {
final Expression expression = new SimpleCaseExpression(
COL0,
TestExpressions.COL0,
ImmutableList.of(new WhenClause(new IntegerLiteral(10), new StringLiteral("ten"))),
Optional.empty()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,6 @@
{"@type": "rows", "rows": []}
]
},
{
"name": "non-windowed with projection",
"statements": [
"CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT ROWKEY AS ID, COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;",
"SELECT COUNT, CONCAT(ID, 'x') AS ID, COUNT * 2 FROM AGGREGATE WHERE ROWKEY='10';"
],
"inputs": [
{"topic": "test_topic", "key": "11", "value": {}},
{"topic": "test_topic", "key": "10", "value": {}}
],
"responses": [
{"@type": "currentStatus"},
{"@type": "currentStatus"},
{"@type": "rows", "rows": [
["10", 1, "10x", 2]
]}
]
},
{
"name": "windowed with projection",
"statements": [
"CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT ROWKEY AS ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;",
"SELECT COUNT, CONCAT(ID, 'x') AS ID, COUNT * 2 FROM AGGREGATE WHERE ROWKEY='10' AND WindowStart=12000;"
],
"inputs": [
{"topic": "test_topic", "timestamp": 12345, "key": "11", "value": {}},
{"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}},
{"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}
],
"responses": [
{"@type": "currentStatus"},
{"@type": "currentStatus"},
{"@type": "rows", "rows": [
["10", 12000, 1, "10x", 2]
]}
]
},
{
"name": "tumbling windowed single key lookup with exact window start",
"statements": [
Expand Down Expand Up @@ -235,6 +196,92 @@
}
]
},
{
"name": "non-windowed with projection",
"statements": [
"CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT ROWKEY AS ID, COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;",
"SELECT COUNT, CONCAT(ID, 'x') AS ID, COUNT * 2 FROM AGGREGATE WHERE ROWKEY='10';"
],
"inputs": [
{"topic": "test_topic", "key": "11", "value": {}},
{"topic": "test_topic", "key": "10", "value": {}}
],
"responses": [
{"@type": "currentStatus"},
{"@type": "currentStatus"},
{
"@type": "rows",
"schema": "`COUNT` BIGINT, `ID` STRING, `KSQL_COL_2` BIGINT",
"rows": [[1, "10x", 2]]
}
]
},
{
"name": "windowed with projection",
"statements": [
"CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT ROWKEY AS ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;",
"SELECT COUNT, CONCAT(ID, 'x') AS ID, COUNT * 2 FROM AGGREGATE WHERE ROWKEY='10' AND WindowStart=12000;"
],
"inputs": [
{"topic": "test_topic", "timestamp": 12345, "key": "11", "value": {}},
{"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}},
{"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}
],
"responses": [
{"@type": "currentStatus"},
{"@type": "currentStatus"},
{
"@type": "rows",
"schema": "`COUNT` BIGINT, `ID` STRING, `KSQL_COL_2` BIGINT",
"rows": [[1, "10x", 2]]
}
]
},
{
"name": "non-windowed projection WITH ROWKEY",
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
"statements": [
"CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;",
"SELECT ROWKEY, COUNT FROM AGGREGATE WHERE ROWKEY='10';"
],
"inputs": [
{"topic": "test_topic", "key": "11", "value": {}},
{"topic": "test_topic", "key": "10", "value": {}}
],
"responses": [
{"@type": "currentStatus"},
{"@type": "currentStatus"},
{
"@type": "rows",
"schema": "`ROWKEY` STRING KEY, `COUNT` BIGINT",
"rows": [["10", 1]]
}
]
},
{
"name": "windowed with projection with ROWKEY",
"statements": [
"CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ROWKEY;",
"SELECT COUNT, ROWKEY FROM AGGREGATE WHERE ROWKEY='10' AND WindowStart=12000;"
],
"inputs": [
{"topic": "test_topic", "timestamp": 12345, "key": "11", "value": {}},
{"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}},
{"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}
],
"responses": [
{"@type": "currentStatus"},
{"@type": "currentStatus"},
{
"@type": "rows",
"schema": "`COUNT` BIGINT, `ROWKEY` STRING KEY",
"rows": [[1, "10"]]
}
]
},
{
"name": "non-windowed projection WITH ROWTIME",
"statements": [
Expand All @@ -261,6 +308,49 @@
"status": 400
}
},
{
"name": "non-windowed projection with ROWMEY and more columns in aggregate",
"statements": [
"CREATE STREAM INPUT (VAL INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT, SUM(VAL) AS SUM, MIN(VAL) AS MIN FROM INPUT GROUP BY ROWKEY;",
"SELECT ROWKEY, COUNT FROM AGGREGATE WHERE ROWKEY='10';"
],
"inputs": [
{"topic": "test_topic", "key": "11", "value": {"val": 1}},
{"topic": "test_topic", "key": "10", "value": {"val": 2}}
],
"responses": [
{"@type": "currentStatus"},
{"@type": "currentStatus"},
{
"@type": "rows",
"schema": "`ROWKEY` STRING KEY, `COUNT` BIGINT",
"rows": [["10", 1]]
}
]
},
{
"name": "non-windowed projection with ROWMEY and more columns in lookup",
"statements": [
"CREATE STREAM INPUT (IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT COUNT(1) AS COUNT FROM INPUT GROUP BY ROWKEY;",
"SELECT COUNT, ROWKEY, COUNT AS COUNT2 FROM AGGREGATE WHERE ROWKEY='10';"
],
"inputs": [
{"topic": "test_topic", "timestamp": 12345, "key": "11", "value": {}},
{"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}},
{"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}
],
"responses": [
{"@type": "currentStatus"},
{"@type": "currentStatus"},
{
"@type": "rows",
"schema": "`COUNT` BIGINT, `ROWKEY` STRING KEY, `COUNT2` BIGINT",
"rows": [[2,"10",2]]
}
]
},
{
"name": "text datetime window bounds",
"statements": [
Expand Down
Loading