Skip to content

Commit

Permalink
fix: enable schema inference for timestamp/time/date (#7737)
Browse files Browse the repository at this point in the history
* fix: enable schema inference for timestamp/time/date

* generate plans, address review comments

* update test
  • Loading branch information
Zara Lim authored Jun 30, 2021
1 parent 95a3b21 commit 35b1cad
Show file tree
Hide file tree
Showing 16 changed files with 1,025 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

package io.confluent.ksql.schema.ksql.inference;

import static io.confluent.ksql.serde.connect.ConnectSchemaUtil.OPTIONAL_DATE_SCHEMA;
import static io.confluent.ksql.serde.connect.ConnectSchemaUtil.OPTIONAL_TIMESTAMP_SCHEMA;
import static io.confluent.ksql.serde.connect.ConnectSchemaUtil.OPTIONAL_TIME_SCHEMA;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -200,22 +203,22 @@ public void shouldInferEnumAsString() {
}

@Test
public void shouldInferDateAsIntegeer() {
public void shouldInferDateAsDate() {
shouldInferType(
org.apache.avro.LogicalTypes.date().addToSchema(
org.apache.avro.SchemaBuilder.builder().intType()
),
Schema.OPTIONAL_INT32_SCHEMA
OPTIONAL_DATE_SCHEMA
);
}

@Test
public void shouldInferTimeMillisAsInteger() {
public void shouldInferTimeMillisAsTime() {
shouldInferType(
org.apache.avro.LogicalTypes.timeMillis().addToSchema(
org.apache.avro.SchemaBuilder.builder().intType()
),
Schema.OPTIONAL_INT32_SCHEMA
OPTIONAL_TIME_SCHEMA
);
}

Expand All @@ -230,12 +233,12 @@ public void shouldInferTimeMicrosAsBigint() {
}

@Test
public void shouldInferTimestampMillisAsBigint() {
public void shouldInferTimestampMillisAsTimestamp() {
shouldInferType(
org.apache.avro.LogicalTypes.timestampMillis().addToSchema(
org.apache.avro.SchemaBuilder.builder().longType()
),
Schema.OPTIONAL_INT64_SCHEMA
OPTIONAL_TIMESTAMP_SCHEMA
);

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (C1 DATE) WITH (KAFKA_TOPIC='input', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=1);",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`C1` DATE",
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT CAST(INPUT.C1 AS STRING) C2\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`C2` STRING",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"sourceSchema" : "`C1` DATE"
},
"selectExpressions" : [ "CAST(C1 AS STRING) AS C2" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.persistence.default.format.key" : "KAFKA",
"ksql.query.persistent.max.bytes.buffering.total" : "-1",
"ksql.queryanonymizer.logs_enabled" : "true",
"ksql.query.error.max.queue.size" : "10",
"ksql.variable.substitution.enable" : "true",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.queryanonymizer.cluster_namespace" : null,
"ksql.query.pull.metrics.enabled" : "true",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.lambdas.enabled" : "true",
"ksql.suppress.enabled" : "false",
"ksql.query.push.scalable.enabled" : "false",
"ksql.query.push.scalable.interpreter.enabled" : "true",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.query.transient.max.bytes.buffering.total" : "-1",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.query.pull.max.concurrent.requests" : "2147483647",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.query.pull.interpreter.enabled" : "true",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.persistence.default.format.value" : null,
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.nested.error.set.null" : "true",
"ksql.udf.collect.metrics" : "false",
"ksql.query.pull.thread.pool.size" : "100",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
{
"version" : "7.0.0",
"timestamp" : 1624942593300,
"path" : "query-validation-tests/date.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : {
"schema" : "`C1` DATE",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
},
"CSAS_OUTPUT_0.OUTPUT" : {
"schema" : "`C2` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
}
}
},
"testCase" : {
"name" : "date schema inference",
"inputs" : [ {
"topic" : "input",
"key" : null,
"value" : {
"c1" : 4
}
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : null,
"value" : {
"C2" : "1970-01-05"
}
} ],
"topics" : [ {
"name" : "input",
"valueSchema" : {
"type" : "record",
"name" : "blah",
"fields" : [ {
"name" : "c1",
"type" : {
"type" : "int",
"connect.name" : "org.apache.kafka.connect.data.Date",
"connect.version" : 1,
"logicalType" : "date"
}
} ]
},
"valueFormat" : "AVRO",
"replicas" : 1,
"numPartitions" : 1
}, {
"name" : "OUTPUT",
"replicas" : 1,
"numPartitions" : 4
} ],
"statements" : [ "CREATE STREAM INPUT WITH (kafka_topic='input', value_format='AVRO');", "CREATE STREAM OUTPUT as SELECT CAST(c1 AS STRING) AS C2 FROM input;" ],
"post" : {
"sources" : [ {
"name" : "INPUT",
"type" : "STREAM",
"schema" : "`C1` DATE",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "AVRO",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
}, {
"name" : "OUTPUT",
"type" : "STREAM",
"schema" : "`C2` STRING",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : "AVRO",
"keyFeatures" : [ ],
"valueFeatures" : [ ]
} ],
"topics" : {
"topics" : [ {
"name" : "input",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
},
"partitions" : 1,
"valueSchema" : {
"type" : "record",
"name" : "blah",
"fields" : [ {
"name" : "c1",
"type" : {
"type" : "int",
"connect.version" : 1,
"connect.name" : "org.apache.kafka.connect.data.Date",
"logicalType" : "date"
}
} ],
"connect.name" : "blah"
}
}, {
"name" : "OUTPUT",
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "AVRO"
},
"partitions" : 4,
"valueSchema" : {
"type" : "record",
"name" : "KsqlDataSourceSchema",
"namespace" : "io.confluent.ksql.avro_schemas",
"fields" : [ {
"name" : "C2",
"type" : [ "null", "string" ],
"default" : null
} ]
}
} ]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Loading

0 comments on commit 35b1cad

Please sign in to comment.