Skip to content

Commit

Permalink
Switch Serde classes to act on STRUCT not GenericRow. (#2793)
Browse files Browse the repository at this point in the history
* Switch Serde classes to act on STRUCT not GenericRow.
Also, add support for top-level primitives, maps and arrays. Fixes #1351
  • Loading branch information
big-andy-coates authored May 8, 2019
1 parent ecf7165 commit 82c0ed2
Show file tree
Hide file tree
Showing 72 changed files with 2,528 additions and 751 deletions.
7 changes: 7 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ KSQL 5.3.0 includes new features, including:
* ``CREATE STREAM`` and ``CREATE TABLE`` will now allow you to create the topic if it is missing.
To do this, specify the ``PARTITIONS`` and optionally ``REPLICAS`` in the ``WITH`` clause.

* KSQL now supports deserializing records where the value is:

#. A primitive, e.g. a ``STRING``, ``INT``, ``DOUBLE`` etc, in Avro, Json and Delimited formats.
#. An array, for both Avro and Json formats.
#. A map, for Avro formats.


KSQL 5.3.0 includes bug fixes, including:

* The ``ROWTIME`` of the row generated when a ``JOIN`` encounters late data was previous the ``ROWTIME`` of the late event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.ksql.datagen.RowGenerator;
import io.confluent.ksql.datagen.SessionManager;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.avro.KsqlAvroTopicSerDe;
import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe;
import io.confluent.ksql.util.KsqlConfig;
Expand Down Expand Up @@ -159,8 +160,10 @@ private static org.apache.kafka.connect.data.Schema convertFieldNamesToUppercase
}

private static Serde<GenericRow> getJsonSerdeHelper(
final org.apache.kafka.connect.data.Schema schema) {
return new KsqlJsonTopicSerDe().getGenericRowSerde(
final org.apache.kafka.connect.data.Schema schema
) {
return GenericRowSerDe.from(
new KsqlJsonTopicSerDe(),
schema,
new KsqlConfig(Collections.emptyMap()),
() -> null,
Expand All @@ -169,9 +172,11 @@ private static Serde<GenericRow> getJsonSerdeHelper(
}

private static Serde<GenericRow> getAvroSerde(
final org.apache.kafka.connect.data.Schema schema) {
final org.apache.kafka.connect.data.Schema schema
) {
final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
return new KsqlAvroTopicSerDe("benchmarkSchema").getGenericRowSerde(
return GenericRowSerDe.from(
new KsqlAvroTopicSerDe("benchmarkSchema"),
schema,
new KsqlConfig(Collections.emptyMap()),
() -> schemaRegistryClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private void setIntoTopicFormat(final Sink sink) {
format = Format.of(StringUtil.cleanQuotes(serdeProperty.toString()));
} else {
final DataSource<?> leftSource = analysis.getFromDataSource(0).left;
format = leftSource.getKsqlTopic().getKsqlTopicSerDe().getSerDe();
format = leftSource.getKsqlTopic().getKsqlTopicSerDe().getFormat();
}

analysis.setIntoFormat(format);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.planner.plan.PlanNodeId;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.structured.QueryContext;
Expand Down Expand Up @@ -125,7 +126,8 @@ public Serde<GenericRow> buildGenericRowSerde(

track(loggerNamePrefix, schema);

return topicSerDe.getGenericRowSerde(
return GenericRowSerDe.from(
topicSerDe,
schema,
ksqlConfig,
serviceContext.getSchemaRegistryClientFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public <T extends Statement> ConfiguredStatement<T> inject(
+ source.getKafkaTopicName(), e);
}

if (source.getKsqlTopicSerde().getSerDe() == Format.AVRO) {
if (source.getKsqlTopicSerde().getFormat() == Format.AVRO) {
try {
SchemaRegistryUtil.deleteSubjectWithRetries(
schemaRegistryClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public Set<String> getSinkNames() {
}

public Format getResultTopicSerde() {
return resultTopic.getKsqlTopicSerDe().getSerDe();
return resultTopic.getKsqlTopicSerDe().getFormat();
}

public String getSchemasDescription() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public void shouldHandleValueFormat() {
final Analysis analysis = queryAnalyzer.analyze("sqlExpression", query, sink);

// Then:
assertThat(analysis.getInto().get().getKsqlTopic().getKsqlTopicSerDe().getSerDe(),
assertThat(analysis.getInto().get().getKsqlTopic().getKsqlTopicSerDe().getFormat(),
is(Format.DELIMITED));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand Down Expand Up @@ -81,8 +79,6 @@
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.SchemaBuilder;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.After;
Expand Down Expand Up @@ -533,7 +529,7 @@ public void shouldUseSerdeSupplierToBuildQueries() {
"create table bar as select * from test2;", KSQL_CONFIG, Collections.emptyMap());

// Then:
verify(jsonKsqlSerde, atLeastOnce()).getGenericRowSerde(
verify(jsonKsqlSerde, atLeastOnce()).getStructSerde(
any(), any(), eq(schemaRegistryClientFactory), any(), any()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.schema.ksql.KsqlSchema;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.serde.avro.KsqlAvroTopicSerDe;
import io.confluent.ksql.serde.delimited.KsqlDelimitedTopicSerDe;
Expand Down Expand Up @@ -570,10 +571,11 @@ private static KsqlTopicSerDe getSerde(
}
}

private Serializer getSerializer(
private Serializer<GenericRow> getSerializer(
final Format dataSourceSerDe,
final Schema schema) {
return getSerde(dataSourceSerDe).getGenericRowSerde(
return GenericRowSerDe.from(
getSerde(dataSourceSerDe),
schema,
new KsqlConfig(Collections.emptyMap()),
serviceContext.get().getSchemaRegistryClientFactory(),
Expand All @@ -585,7 +587,8 @@ private Serializer getSerializer(
private Deserializer<GenericRow> getDeserializer(
final Format dataSourceSerDe,
final Schema schema) {
return getSerde(dataSourceSerDe).getGenericRowSerde(
return GenericRowSerDe.from(
getSerde(dataSourceSerDe),
schema,
new KsqlConfig(Collections.emptyMap()),
serviceContext.get().getSchemaRegistryClientFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.planner.plan.PlanNodeId;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.structured.QueryContext;
Expand All @@ -38,6 +39,7 @@
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.StreamsBuilder;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -65,15 +67,15 @@ public class KsqlQueryBuilderTest {
@Mock
private KsqlTopicSerDe topicSerDe;
@Mock
private Serde<GenericRow> rowSerde;
private Serde<Struct> rowSerde;
@Mock
private Supplier<SchemaRegistryClient> srClientFactory;
private QueryContext queryContext;
private KsqlQueryBuilder ksqlQueryBuilder;

@Before
public void setUp() {
when(topicSerDe.getGenericRowSerde(any(), any(), any(), any(), any())).thenReturn(rowSerde);
when(topicSerDe.getStructSerde(any(), any(), any(), any(), any())).thenReturn(rowSerde);
when(serviceContext.getSchemaRegistryClientFactory()).thenReturn(srClientFactory);

queryContext = new QueryContext.Stacker(QUERY_ID).push("context").getQueryContext();
Expand Down Expand Up @@ -135,13 +137,21 @@ public void shouldBuildGenericRowSerde() {
);

// Then:
assertThat(result, is(rowSerde));
verify(topicSerDe).getGenericRowSerde(
verify(topicSerDe).getStructSerde(
SOME_SCHEMA,
ksqlConfig,
srClientFactory,
QueryLoggerUtil.queryLoggerName(queryContext),
processingLogContext);

assertThat(result, is(GenericRowSerDe.from(
topicSerDe,
SOME_SCHEMA,
ksqlConfig,
srClientFactory,
QueryLoggerUtil.queryLoggerName(queryContext),
processingLogContext
)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ public void shouldCreateExecutionPlanForInsert() {
assertThat(queryMetadataList.get(1), instanceOf(PersistentQueryMetadata.class));
final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata)
queryMetadataList.get(1);
assertThat(persistentQuery.getResultTopic().getKsqlTopicSerDe().getSerDe(),
assertThat(persistentQuery.getResultTopic().getKsqlTopicSerDe().getFormat(),
equalTo(Format.DELIMITED));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.KsqlSchema;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe;
import io.confluent.ksql.streams.MaterializedFactory;
Expand Down Expand Up @@ -97,7 +98,7 @@ public void init() {
final StreamsBuilder builder = new StreamsBuilder();
kTable = builder
.table(ksqlTable.getKsqlTopic().getKafkaTopicName(), Consumed.with(Serdes.String()
, ksqlTable.getKsqlTopic().getKsqlTopicSerDe().getGenericRowSerde(
, ksqlTable.getKsqlTopic().getKsqlTopicSerDe().getStructSerde(
ksqlTable.getSchema().getSchema(),
new KsqlConfig(Collections.emptyMap()),
MockSchemaRegistryClient::new,
Expand Down Expand Up @@ -126,7 +127,8 @@ private SchemaKGroupedTable buildSchemaKGroupedTableFromQuery(
.map(c -> new DereferenceExpression(new QualifiedNameReference(QualifiedName.of("TEST1")), c))
.collect(Collectors.toList());
final KsqlTopicSerDe ksqlTopicSerDe = new KsqlJsonTopicSerDe();
final Serde<GenericRow> rowSerde = ksqlTopicSerDe.getGenericRowSerde(
final Serde<GenericRow> rowSerde = GenericRowSerDe.from(
ksqlTopicSerDe,
SchemaTestUtil.getSchemaWithNoAlias(initialSchemaKTable.getSchema().getSchema()),
null,
() -> null,
Expand All @@ -153,7 +155,8 @@ public void shouldFailWindowedTableAggregation() {
functionRegistry.getAggregate("SUM", Schema.OPTIONAL_INT64_SCHEMA)),
Collections.singletonMap(0, 0),
windowExpression,
new KsqlJsonTopicSerDe().getGenericRowSerde(
GenericRowSerDe.from(
new KsqlJsonTopicSerDe(),
ksqlTable.getSchema().getSchema(),
ksqlConfig,
() -> null,
Expand Down Expand Up @@ -183,7 +186,8 @@ public void shouldFailUnsupportedAggregateFunction() {
aggValToFunctionMap,
Collections.singletonMap(0, 0),
null,
new KsqlJsonTopicSerDe().getGenericRowSerde(
GenericRowSerDe.from(
new KsqlJsonTopicSerDe(),
ksqlTable.getSchema().getSchema(),
ksqlConfig,
() -> null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.confluent.ksql.planner.plan.ProjectNode;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.KsqlSchema;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe;
import io.confluent.ksql.streams.GroupedFactory;
import io.confluent.ksql.streams.JoinedFactory;
Expand Down Expand Up @@ -200,7 +201,8 @@ public void init() {
}

private static Serde<GenericRow> getRowSerde(final KsqlTopic topic, final Schema schema) {
return topic.getKsqlTopicSerDe().getGenericRowSerde(
return GenericRowSerDe.from(
topic.getKsqlTopicSerDe(),
schema,
new KsqlConfig(Collections.emptyMap()),
MockSchemaRegistryClient::new,
Expand Down Expand Up @@ -933,7 +935,8 @@ private PlanNode givenInitialKStreamOf(final String selectQuery) {
functionRegistry,
queryContext.push("source").getQueryContext());

rowSerde = new KsqlJsonTopicSerDe().getGenericRowSerde(
rowSerde = GenericRowSerDe.from(
new KsqlJsonTopicSerDe(),
SchemaTestUtil.getSchemaWithNoAlias(initialSchemaKStream.getSchema().getSchema()),
null,
() -> null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.confluent.ksql.planner.plan.ProjectNode;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.KsqlSchema;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.serde.json.KsqlJsonTopicSerDe;
import io.confluent.ksql.streams.GroupedFactory;
Expand Down Expand Up @@ -192,7 +193,8 @@ private SchemaKTable buildSchemaKTableForJoin(final KsqlTable ksqlTable, final K
}

private Serde<GenericRow> getRowSerde(final KsqlTopic topic, final Schema schema) {
return topic.getKsqlTopicSerDe().getGenericRowSerde(
return GenericRowSerDe.from(
topic.getKsqlTopicSerDe(),
schema,
new KsqlConfig(Collections.emptyMap()),
MockSchemaRegistryClient::new,
Expand Down Expand Up @@ -325,7 +327,8 @@ public void testGroupBy() {
parentContext);

final KsqlTopicSerDe ksqlTopicSerDe = new KsqlJsonTopicSerDe();
final Serde<GenericRow> rowSerde = ksqlTopicSerDe.getGenericRowSerde(
final Serde<GenericRow> rowSerde = GenericRowSerDe.from(
ksqlTopicSerDe,
SchemaTestUtil.getSchemaWithNoAlias(initialSchemaKTable.getSchema().getSchema()),
null,
() -> null,
Expand Down Expand Up @@ -400,7 +403,8 @@ public void shouldGroupKeysCorrectly() {
parentContext);

final List<Expression> groupByExpressions = Arrays.asList(TEST_2_COL_2, TEST_2_COL_1);
final Serde<GenericRow> rowSerde = new KsqlJsonTopicSerDe().getGenericRowSerde(
final Serde<GenericRow> rowSerde = GenericRowSerDe.from(
new KsqlJsonTopicSerDe(),
SchemaTestUtil.getSchemaWithNoAlias(initialSchemaKTable.getSchema().getSchema()),
null,
() -> null,
Expand Down Expand Up @@ -642,7 +646,8 @@ private List<SelectExpression> givenInitialKTableOf(final String selectQuery) {
functionRegistry,
parentContext);

rowSerde = new KsqlJsonTopicSerDe().getGenericRowSerde(
rowSerde = GenericRowSerDe.from(
new KsqlJsonTopicSerDe(),
SchemaTestUtil.getSchemaWithNoAlias(initialSchemaKTable.getSchema().getSchema()),
null,
() -> null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void setUp() {
when(persistentQuery.getResultSchema()).thenReturn(RESULT_SCHEMA);
when(persistentQuery.getResultTopic()).thenReturn(RESULT_TOPIC);
when(persistentQuery.getResultTopicSerde())
.thenReturn(RESULT_TOPIC.getKsqlTopicSerDe().getSerDe());
.thenReturn(RESULT_TOPIC.getKsqlTopicSerDe().getFormat());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.schema.ksql.KsqlSchema;
import io.confluent.ksql.serde.GenericRowSerDe.GenericRowDeserializer;
import io.confluent.ksql.serde.json.KsqlJsonDeserializer;
import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster;
import java.time.Duration;
Expand Down Expand Up @@ -92,12 +93,15 @@ public <K> Map<K, GenericRow> readResults(
final int expectedNumMessages,
final Deserializer<K> keyDeserializer
) {
final Deserializer<GenericRow> deserializer = new GenericRowDeserializer(
new KsqlJsonDeserializer(
schema.getSchema(),
processingLogContext.getLoggerFactory().getLogger("consumer")));

return readResults(
topic,
greaterThanOrEqualTo(expectedNumMessages),
new KsqlJsonDeserializer(
schema.getSchema(),
processingLogContext.getLoggerFactory().getLogger("consumer")),
deserializer,
keyDeserializer
);
}
Expand Down
Loading

0 comments on commit 82c0ed2

Please sign in to comment.