Skip to content

Commit

Permalink
fix: logicalSchema toString() to include key fields (MINOR) (#3123)
Browse files Browse the repository at this point in the history
  • Loading branch information
big-andy-coates authored Jul 24, 2019
1 parent af0e5a6 commit 0984529
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,16 +319,12 @@ public String toString() {
return toString(FormatOptions.none());
}

@SuppressWarnings("ConstantConditions")
public String toString(final FormatOptions formatOptions) {
// Meta fields deliberately excluded.

// Key fields excluded for now:
// final String keys = keyFields.stream()
// .map(f -> f.toString(formatOptions) + " " + KEY_KEYWORD)
// .collect(Collectors.joining(", "));

final String keys = "";
final String keys = keyFields.stream()
.map(f -> f.toString(formatOptions) + " " + KEY_KEYWORD)
.collect(Collectors.joining(", "));

final String values = valueFields.stream()
.map(f -> f.toString(formatOptions))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,40 +537,29 @@ public void shouldExposeAliasedAllFields() {
@Test
public void shouldConvertSchemaToString() {
// Given:
final LogicalSchema schema = LogicalSchema.of(
SchemaBuilder.struct()
.field("f0", SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA)
.field("f1", SchemaBuilder.OPTIONAL_INT32_SCHEMA)
.field("f2", SchemaBuilder.OPTIONAL_INT64_SCHEMA)
.field("f4", SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA)
.field("f5", SchemaBuilder.OPTIONAL_STRING_SCHEMA)
.field("f6", SchemaBuilder
.struct()
.field("a", Schema.OPTIONAL_INT64_SCHEMA)
.optional()
.build())
.field("f7", SchemaBuilder
.array(
SchemaBuilder.OPTIONAL_STRING_SCHEMA
)
.optional()
.build())
.field("f8", SchemaBuilder
.map(
SchemaBuilder.OPTIONAL_STRING_SCHEMA,
SchemaBuilder.OPTIONAL_STRING_SCHEMA
)
.optional()
.build())
.build()
);
final LogicalSchema schema = LogicalSchema.builder()
.valueField("f0", SqlTypes.BOOLEAN)
.valueField("f1", SqlTypes.INTEGER)
.valueField("f2", SqlTypes.BIGINT)
.valueField("f4", SqlTypes.DOUBLE)
.valueField("f5", SqlTypes.STRING)
.valueField("f6", SqlTypes.struct()
.field("a", SqlTypes.BIGINT)
.build())
.valueField("f7", SqlTypes.array(SqlTypes.STRING))
.valueField("f8", SqlTypes.map(SqlTypes.STRING))
.keyField("k0", SqlTypes.BIGINT)
.keyField("k1", SqlTypes.DOUBLE)
.build();

// When:
final String s = schema.toString();

// Then:
assertThat(s, is(
"["
+ "`k0` BIGINT KEY, "
+ "`k1` DOUBLE KEY, "
+ "`f0` BOOLEAN, "
+ "`f1` INTEGER, "
+ "`f2` BIGINT, "
Expand Down Expand Up @@ -606,6 +595,7 @@ public void shouldConvertSchemaToStringWithReservedWords() {
// Then:
assertThat(s, is(
"["
+ "ROWKEY STRING KEY, "
+ "`f0` BOOLEAN, "
+ "f1 STRUCT<`f0` BIGINT, f1 BIGINT>"
+ "]"));
Expand All @@ -626,6 +616,7 @@ public void shouldConvertAliasedSchemaToString() {
// Then:
assertThat(s, is(
"["
+ "`t.ROWKEY` STRING KEY, "
+ "`t.f0` BOOLEAN"
+ "]"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ public class InsertValuesExecutorTest {
@Mock
private Serializer<String> keySerializer;
@Mock
private Deserializer<String> keyDeserializer;
@Mock
private Serde<Object> valueSerDe;
@Mock
private Serializer<Object> valueSerializer;
Expand All @@ -134,7 +132,6 @@ public void setup() {
when(valueSerde.createSerde(any(), any(), any())).thenReturn(valueSerDe);

when(keySerDe.serializer()).thenReturn(keySerializer);
when(keySerDe.deserializer()).thenReturn(keyDeserializer);
when(valueSerDe.serializer()).thenReturn(valueSerializer);
when(valueSerDe.deserializer()).thenReturn(valueDeserializer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,21 +325,21 @@ public void shouldCreateExecutionPlan() {
final String planText = metadata.getExecutionPlan();
final String[] lines = planText.split("\n");
assertThat(lines[0], startsWith(
" > [ SINK ] | Schema: [COL0 BIGINT, KSQL_COL_1 DOUBLE, KSQL_COL_2 BIGINT] |"));
" > [ SINK ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, KSQL_COL_1 DOUBLE, KSQL_COL_2 BIGINT] |"));
assertThat(lines[1], startsWith(
"\t\t > [ AGGREGATE ] | Schema: [KSQL_INTERNAL_COL_0 BIGINT, "
"\t\t > [ AGGREGATE ] | Schema: [ROWKEY STRING KEY, KSQL_INTERNAL_COL_0 BIGINT, "
+ "KSQL_INTERNAL_COL_1 DOUBLE, KSQL_AGG_VARIABLE_0 DOUBLE, "
+ "KSQL_AGG_VARIABLE_1 BIGINT] |"));
assertThat(lines[2], startsWith(
"\t\t\t\t > [ PROJECT ] | Schema: [KSQL_INTERNAL_COL_0 BIGINT, "
"\t\t\t\t > [ PROJECT ] | Schema: [ROWKEY STRING KEY, KSQL_INTERNAL_COL_0 BIGINT, "
+ "KSQL_INTERNAL_COL_1 DOUBLE] |"));
assertThat(lines[3], startsWith(
"\t\t\t\t\t\t > [ FILTER ] | Schema: [TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, "
"\t\t\t\t\t\t > [ FILTER ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, "
+ "TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 STRING, "
+ "TEST1.COL3 DOUBLE, TEST1.COL4 ARRAY<DOUBLE>, "
+ "TEST1.COL5 MAP<STRING, DOUBLE>] |"));
assertThat(lines[4], startsWith(
"\t\t\t\t\t\t\t\t > [ SOURCE ] | Schema: [TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, "
"\t\t\t\t\t\t\t\t > [ SOURCE ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, "
+ "TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 STRING, "
+ "TEST1.COL3 DOUBLE, TEST1.COL4 ARRAY<DOUBLE>, "
+ "TEST1.COL5 MAP<STRING, DOUBLE>] |"));
Expand All @@ -360,11 +360,11 @@ public void shouldCreateExecutionPlanForInsert() {
final String[] lines = planText.split("\n");
Assert.assertTrue(lines.length == 3);
Assert.assertEquals(lines[0],
" > [ SINK ] | Schema: [COL0 BIGINT, COL1 STRING, COL2 DOUBLE] | Logger: InsertQuery_1.S1");
" > [ SINK ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE] | Logger: InsertQuery_1.S1");
Assert.assertEquals(lines[1],
"\t\t > [ PROJECT ] | Schema: [COL0 BIGINT, COL1 STRING, COL2 DOUBLE] | Logger: InsertQuery_1.Project");
"\t\t > [ PROJECT ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE] | Logger: InsertQuery_1.Project");
Assert.assertEquals(lines[2],
"\t\t\t\t > [ SOURCE ] | Schema: [TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 DOUBLE] | Logger: InsertQuery_1.KsqlTopic");
"\t\t\t\t > [ SOURCE ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 BIGINT, TEST1.COL1 STRING, TEST1.COL2 DOUBLE] | Logger: InsertQuery_1.KsqlTopic");
assertThat(queryMetadataList.get(1), instanceOf(PersistentQueryMetadata.class));
final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata)
queryMetadataList.get(1);
Expand Down Expand Up @@ -398,9 +398,9 @@ public void shouldFailInsertIfTheResultSchemaDoesNotMatch() {
expectedException.expect(KsqlStatementException.class);
expectedException.expect(rawMessage(is(
"Incompatible schema between results and sink. Result schema is "
+ "[`COL0` BIGINT, `COL1` STRING, `COL2` DOUBLE], "
+ "[`ROWKEY` STRING KEY, `COL0` BIGINT, `COL1` STRING, `COL2` DOUBLE], "
+ "but the sink schema is "
+ "[`COL0` BIGINT, `COL1` STRING].")));
+ "[`ROWKEY` STRING KEY, `COL0` BIGINT, `COL1` STRING].")));

// When:
execute(CREATE_STREAM_TEST1 + csasQuery + insertIntoQuery);
Expand Down Expand Up @@ -444,13 +444,13 @@ public void shouldCreatePlanForInsertIntoStreamFromStream() {
final String[] lines = planText.split("\n");
assertThat(lines.length, equalTo(3));
assertThat(lines[0], containsString(
"> [ SINK ] | Schema: [ROWTIME BIGINT, ROWKEY STRING, COL0 INTEGER]"));
"> [ SINK ] | Schema: [ROWKEY STRING KEY, ROWTIME BIGINT, ROWKEY STRING, COL0 INTEGER]"));

assertThat(lines[1], containsString(
"> [ PROJECT ] | Schema: [ROWTIME BIGINT, ROWKEY STRING, COL0 INTEGER]"));
"> [ PROJECT ] | Schema: [ROWKEY STRING KEY, ROWTIME BIGINT, ROWKEY STRING, COL0 INTEGER]"));

assertThat(lines[2], containsString(
"> [ SOURCE ] | Schema: [TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 INTEGER]"));
"> [ SOURCE ] | Schema: [TEST1.ROWKEY STRING KEY, TEST1.ROWTIME BIGINT, TEST1.ROWKEY STRING, TEST1.COL0 INTEGER]"));
}

@Test
Expand Down Expand Up @@ -489,11 +489,11 @@ public void shouldCheckSinkAndResultKeysDoNotMatch() {
final String[] lines = planText.split("\n");
assertThat(lines.length, equalTo(4));
assertThat(lines[0],
equalTo(" > [ REKEY ] | Schema: [COL0 BIGINT, COL1 STRING, COL2 DOUBLE] "
equalTo(" > [ REKEY ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 DOUBLE] "
+ "| Logger: InsertQuery_1.S1"));
assertThat(lines[1], equalTo("\t\t > [ SINK ] | Schema: [COL0 BIGINT, COL1 STRING, COL2 "
assertThat(lines[1], equalTo("\t\t > [ SINK ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING, COL2 "
+ "DOUBLE] | Logger: InsertQuery_1.S1"));
assertThat(lines[2], equalTo("\t\t\t\t > [ PROJECT ] | Schema: [COL0 BIGINT, COL1 STRING"
assertThat(lines[2], equalTo("\t\t\t\t > [ PROJECT ] | Schema: [ROWKEY STRING KEY, COL0 BIGINT, COL1 STRING"
+ ", COL2 DOUBLE] | Logger: InsertQuery_1.Project"));
}

Expand Down Expand Up @@ -760,8 +760,7 @@ public void shouldRepartitionLeftStreamIfNotCorrectKey() {
.get(0);

// Then:
assertThat(result.getExecutionPlan(),
containsString("[ REKEY ] | Schema: [TEST2.ROWTIME BIGINT"));
assertThat(result.getExecutionPlan(), containsString("[ REKEY ] | Schema: [TEST2."));
}

@Test
Expand All @@ -777,8 +776,7 @@ public void shouldRepartitionRightStreamIfNotCorrectKey() {
.get(0);

// Then:
assertThat(result.getExecutionPlan(),
containsString("[ REKEY ] | Schema: [TEST3.ROWTIME BIGINT"));
assertThat(result.getExecutionPlan(), containsString("[ REKEY ] | Schema: [TEST3."));
}

@Test
Expand Down Expand Up @@ -908,7 +906,7 @@ public void shouldRepartitionLeftStreamIfNotCorrectKey_Legacy() {

// Then:
assertThat(result.getExecutionPlan(),
containsString("[ REKEY ] | Schema: [TEST2.ROWTIME BIGINT"));
containsString("[ REKEY ] | Schema: [TEST2."));
}

@Test
Expand All @@ -926,7 +924,7 @@ public void shouldRepartitionRightStreamIfNotCorrectKey_Legacy() {

// Then:
assertThat(result.getExecutionPlan(),
containsString("[ REKEY ] | Schema: [TEST3.ROWTIME BIGINT"));
containsString("[ REKEY ] | Schema: [TEST3."));
}

@Test
Expand Down Expand Up @@ -982,9 +980,9 @@ public void shouldRepartitionBothStreamsIfJoiningOnRowKey_Legacy() {

// Then:
assertThat(result.getExecutionPlan(),
containsString("[ REKEY ] | Schema: [TEST2.ROWTIME BIGINT"));
containsString("[ REKEY ] | Schema: [TEST2."));
assertThat(result.getExecutionPlan(),
containsString("[ REKEY ] | Schema: [TEST3.ROWTIME BIGINT"));
containsString("[ REKEY ] | Schema: [TEST3."));
}

@Test
Expand All @@ -1002,9 +1000,9 @@ public void shouldRepartitionBothStreamsIfJoiningOnRowKeyWhenStreamsHaveNoKeyFie

// Then:
assertThat(result.getExecutionPlan(),
containsString("[ REKEY ] | Schema: [TEST7.ROWTIME BIGINT"));
containsString("[ REKEY ] | Schema: [TEST7."));
assertThat(result.getExecutionPlan(),
containsString("[ REKEY ] | Schema: [TEST7.ROWTIME BIGINT"));
containsString("[ REKEY ] | Schema: [TEST7."));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,10 +766,10 @@ public void shouldSummarizeExecutionPlanCorrectly() {
queryContext.push("source").getQueryContext());

// When/Then:
final String expected =
" > [ SOURCE ] | Schema: [key STRING, val BIGINT] | Logger: query.node.source\n\t"
+ "parent plan";
assertThat(schemaKtream.getExecutionPlan(""), equalTo(expected));
assertThat(schemaKtream.getExecutionPlan(""), equalTo(
" > [ SOURCE ] | Schema: [ROWKEY STRING KEY, key STRING, val BIGINT] | "
+ "Logger: query.node.source\n"
+ "\tparent plan"));
}

@Test
Expand All @@ -787,9 +787,9 @@ public void shouldSummarizeExecutionPlanCorrectlyForRoot() {
queryContext.push("source").getQueryContext());

// When/Then:
final String expected =
" > [ SOURCE ] | Schema: [key STRING, val BIGINT] | Logger: query.node.source\n";
assertThat(schemaKtream.getExecutionPlan(""), equalTo(expected));
assertThat(schemaKtream.getExecutionPlan(""), equalTo(
" > [ SOURCE ] | Schema: [ROWKEY STRING KEY, key STRING, val BIGINT] | "
+ "Logger: query.node.source\n"));
}

@Test
Expand All @@ -813,11 +813,11 @@ public void shouldSummarizeExecutionPlanCorrectlyWhenMultipleParents() {
queryContext.push("source").getQueryContext());

// When/Then:
final String expected =
" > [ SOURCE ] | Schema: [key STRING, val BIGINT] | Logger: query.node.source\n"
assertThat(schemaKtream.getExecutionPlan(""), equalTo(
" > [ SOURCE ] | Schema: [ROWKEY STRING KEY, key STRING, val BIGINT] | "
+ "Logger: query.node.source\n"
+ "\tparent 1 plan"
+ "\tparent 2 plan";
assertThat(schemaKtream.getExecutionPlan(""), equalTo(expected));
+ "\tparent 2 plan"));
}

private void whenCreateJoined() {
Expand Down

0 comments on commit 0984529

Please sign in to comment.