Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: tie up some loose ends around the new formats #4651

Merged
merged 3 commits into from
Mar 2, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -19,28 +19,30 @@
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.cli.console.table.Table.Builder;
import io.confluent.ksql.rest.entity.TopicDescription;
import io.confluent.ksql.serde.FormatFactory;
import java.util.ArrayList;
import java.util.List;

public class TopicDescriptionTableBuilder implements TableBuilder<TopicDescription> {

private static final List<String> NON_AVRO_HEADERS =
private static final List<String> NON_SCHEMA_HEADERS =
ImmutableList.of("Table Name", "Kafka Topic", "Type");

private static final List<String> AVRO_HEADERS =
ImmutableList.of("Table Name", "Kafka Topic", "Type", "AvroSchema");
private static final List<String> SCHEMA_HEADERS =
ImmutableList.of("Table Name", "Kafka Topic", "Type", "Schema");

@Override
public Table buildTable(final TopicDescription topicDescription) {
final boolean avro = topicDescription.getFormat().equalsIgnoreCase("AVRO");
final String format = topicDescription.getFormat();
final boolean supportsSchema = FormatFactory.fromName(format).supportsSchemaInference();

final List<String> headings = avro ? AVRO_HEADERS : NON_AVRO_HEADERS;
final List<String> headings = supportsSchema ? SCHEMA_HEADERS : NON_SCHEMA_HEADERS;

final List<String> row = new ArrayList<>(4);
row.add(topicDescription.getName());
row.add(topicDescription.getKafkaTopic());
row.add(topicDescription.getFormat());
if (avro) {
row.add(format);
if (supportsSchema) {
row.add(topicDescription.getSchemaString());
}

Original file line number Diff line number Diff line change
@@ -541,7 +541,7 @@ public void testPrintTopicDescription() {
+ "} ]\n"));
} else {
assertThat(output, is("\n"
+ " Table Name | Kafka Topic | Type | AvroSchema \n"
+ " Table Name | Kafka Topic | Type | Schema \n"
+ "---------------------------------------------------\n"
+ " TestTopic | TestKafkaTopic | AVRO | schemaString \n"
+ "---------------------------------------------------\n"));
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ public final class CreateConfigs {
public static final String WINDOW_TYPE_PROPERTY = "WINDOW_TYPE";
public static final String WINDOW_SIZE_PROPERTY = "WINDOW_SIZE";
public static final String AVRO_SCHEMA_ID = "AVRO_SCHEMA_ID";
public static final String SCHEMA_ID = "SCHEMA_ID";
public static final String SOURCE_CONNECTOR = "SOURCE_CONNECTOR";

private static final ConfigDef CONFIG_DEF = new ConfigDef()
@@ -63,11 +64,17 @@ public final class CreateConfigs {
+ "then the property should be used to provide the window size, "
+ "for example: '20 SECONDS'."
).define(
AVRO_SCHEMA_ID,
SCHEMA_ID,
ConfigDef.Type.INT,
null,
Importance.LOW,
"Undocumented feature"
).define(
AVRO_SCHEMA_ID,
ConfigDef.Type.INT,
null,
Importance.LOW,
"Undocumented feature (deprecated - use SCHEMA_ID instead)"
).define(
SOURCE_CONNECTOR,
Type.STRING,
Original file line number Diff line number Diff line change
@@ -43,7 +43,6 @@
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.SqlValueCoercer;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeySerdeFactory;
@@ -440,7 +439,7 @@ private byte[] serializeValue(
try {
return valueSerde.serializer().serialize(topicName, row);
} catch (final Exception e) {
if (dataSource.getKsqlTopic().getValueFormat().getFormat() == FormatFactory.AVRO) {
if (dataSource.getKsqlTopic().getValueFormat().getFormat().supportsSchemaInference()) {
final Throwable rootCause = ExceptionUtils.getRootCause(e);
if (rootCause instanceof RestClientException) {
switch (((RestClientException) rootCause).getStatus()) {
Original file line number Diff line number Diff line change
@@ -276,7 +276,7 @@ private void unregisterQuery(final ServiceContext serviceContext, final QueryMet

if (query.hasEverBeenStarted()) {
SchemaRegistryUtil
.cleanUpInternalTopicAvroSchemas(applicationId, serviceContext.getSchemaRegistryClient());
.cleanupInternalTopicSchemas(applicationId, serviceContext.getSchemaRegistryClient());
serviceContext.getTopicClient().deleteInternalTopics(applicationId);
}

Original file line number Diff line number Diff line change
@@ -114,7 +114,7 @@ private SchemaAndId getValueSchema(
) {
final String topicName = statement.getStatement().getProperties().getKafkaTopic();

final SchemaResult result = statement.getStatement().getProperties().getAvroSchemaId()
final SchemaResult result = statement.getStatement().getProperties().getSchemaId()
.map(id -> schemaSupplier.getValueSchema(topicName, Optional.of(id)))
.orElseGet(() -> schemaSupplier.getValueSchema(topicName, Optional.empty()));

@@ -145,7 +145,7 @@ private static CreateSource addSchemaFields(
final CreateSource statement = preparedStatement.getStatement();
final CreateSourceProperties properties = statement.getProperties();

if (properties.getAvroSchemaId().isPresent()) {
if (properties.getSchemaId().isPresent()) {
return statement.copyWith(elements, properties);
}
return statement.copyWith(elements, properties.withSchemaId(schema.id));
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@

package io.confluent.ksql.schema.registry;

import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
@@ -144,12 +145,12 @@ public int getId(final String s, final ParsedSchema parsedSchema) {

@Override
public List<Integer> deleteSubject(final String s) {
throw configException;
return ImmutableList.of();
}

@Override
public List<Integer> deleteSubject(final Map<String, String> map, final String s) {
throw configException;
return ImmutableList.of();
}

@Override
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ public final class SchemaRegistryUtil {
private SchemaRegistryUtil() {
}

public static void cleanUpInternalTopicAvroSchemas(
public static void cleanupInternalTopicSchemas(
final String applicationId,
final SchemaRegistryClient schemaRegistryClient
) {
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@
import io.confluent.ksql.util.LimitedProxyBuilder;
import java.util.Collections;
import java.util.Objects;
import org.apache.avro.Schema;

/**
* SchemaRegistryClient used when trying out operations.
@@ -42,8 +41,6 @@ static SchemaRegistryClient createProxy(final SchemaRegistryClient delegate) {
.swallow("register", anyParams(), 123)
.forward("getLatestSchemaMetadata", methodParams(String.class), delegate)
.forward("getSchemaBySubjectAndId", methodParams(String.class, int.class), delegate)
.forward("testCompatibility",
methodParams(String.class, Schema.class), delegate)
.forward("testCompatibility",
methodParams(String.class, ParsedSchema.class), delegate)
.swallow("deleteSubject", methodParams(String.class), Collections.emptyList())
Original file line number Diff line number Diff line change
@@ -108,7 +108,7 @@ public <T extends Statement> ConfiguredStatement<T> inject(
}

try {
if (source.getKsqlTopic().getValueFormat().getFormat() == FormatFactory.AVRO) {
if (source.getKsqlTopic().getValueFormat().getFormat().supportsSchemaInference()) {
SchemaRegistryUtil.deleteSubjectWithRetries(
schemaRegistryClient,
source.getKafkaTopicName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);
Original file line number Diff line number Diff line change
@@ -286,7 +286,7 @@ public void shouldBuildNewCsStatementText() {
+ "MAPFIELD MAP<STRING, BIGINT>, "
+ "STRUCTFIELD STRUCT<S0 BIGINT>, "
+ "DECIMALFIELD DECIMAL(4, 2)) "
+ "WITH (AVRO_SCHEMA_ID=5, KAFKA_TOPIC='some-topic', VALUE_FORMAT='avro');"
+ "WITH (KAFKA_TOPIC='some-topic', SCHEMA_ID=5, VALUE_FORMAT='avro');"
));
}

@@ -311,14 +311,14 @@ public void shouldBuildNewCtStatementText() {
+ "MAPFIELD MAP<STRING, BIGINT>, "
+ "STRUCTFIELD STRUCT<S0 BIGINT>, "
+ "DECIMALFIELD DECIMAL(4, 2)) "
+ "WITH (AVRO_SCHEMA_ID=5, KAFKA_TOPIC='some-topic', VALUE_FORMAT='avro');"
+ "WITH (KAFKA_TOPIC='some-topic', SCHEMA_ID=5, VALUE_FORMAT='avro');"
));
}

@Test
public void shouldBuildNewCsStatementTextFromId() {
// Given:
when(cs.getProperties()).thenReturn(supportedPropsWith("AVRO_SCHEMA_ID", "42"));
when(cs.getProperties()).thenReturn(supportedPropsWith("SCHEMA_ID", "42"));

when(schemaSupplier.getValueSchema(KAFKA_TOPIC, Optional.of(42)))
.thenReturn(SchemaResult.success(schemaAndId(SUPPORTED_SCHEMA, SCHEMA_ID)));
@@ -338,14 +338,14 @@ public void shouldBuildNewCsStatementTextFromId() {
+ "MAPFIELD MAP<STRING, BIGINT>, "
+ "STRUCTFIELD STRUCT<S0 BIGINT>, "
+ "DECIMALFIELD DECIMAL(4, 2)) "
+ "WITH (AVRO_SCHEMA_ID='42', KAFKA_TOPIC='some-topic', VALUE_FORMAT='avro');"
+ "WITH (KAFKA_TOPIC='some-topic', SCHEMA_ID='42', VALUE_FORMAT='avro');"
));
}

@Test
public void shouldBuildNewCtStatementTextFromId() {
// Given:
when(ct.getProperties()).thenReturn(supportedPropsWith("AVRO_SCHEMA_ID", "42"));
when(ct.getProperties()).thenReturn(supportedPropsWith("SCHEMA_ID", "42"));

when(schemaSupplier.getValueSchema(KAFKA_TOPIC, Optional.of(42)))
.thenReturn(SchemaResult.success(schemaAndId(SUPPORTED_SCHEMA, SCHEMA_ID)));
@@ -365,7 +365,7 @@ public void shouldBuildNewCtStatementTextFromId() {
+ "MAPFIELD MAP<STRING, BIGINT>, "
+ "STRUCTFIELD STRUCT<S0 BIGINT>, "
+ "DECIMALFIELD DECIMAL(4, 2)) "
+ "WITH (AVRO_SCHEMA_ID='42', KAFKA_TOPIC='some-topic', VALUE_FORMAT='avro');"
+ "WITH (KAFKA_TOPIC='some-topic', SCHEMA_ID='42', VALUE_FORMAT='avro');"
));
}

@@ -379,23 +379,23 @@ public void shouldAddSchemaIdIfNotPresentAlready() {
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(SCHEMA_ID)));
assertThat(result.getStatement().getProperties().getSchemaId(), is(Optional.of(SCHEMA_ID)));

assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID=5"));
assertThat(result.getStatementText(), containsString("SCHEMA_ID=5"));
}

@Test
public void shouldNotOverwriteExistingSchemaId() {
// Given:
when(cs.getProperties()).thenReturn(supportedPropsWith("AVRO_SCHEMA_ID", "42"));
when(cs.getProperties()).thenReturn(supportedPropsWith("SCHEMA_ID", "42"));

// When:
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(42)));
assertThat(result.getStatement().getProperties().getSchemaId(), is(Optional.of(42)));

assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID='42'"));
assertThat(result.getStatementText(), containsString("SCHEMA_ID='42'"));
}

@Test
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ public void shouldDeleteChangeLogTopicSchema() throws Exception {
));

// When:
SchemaRegistryUtil.cleanUpInternalTopicAvroSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);

// Then not exception:
verify(schemaRegistryClient).deleteSubject(APP_ID + "SOME-changelog-value");
@@ -58,7 +58,7 @@ public void shouldDeleteRepartitionTopicSchema() throws Exception {
));

// When:
SchemaRegistryUtil.cleanUpInternalTopicAvroSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);

// Then not exception:
verify(schemaRegistryClient).deleteSubject(APP_ID + "SOME-repartition-value");
@@ -72,7 +72,7 @@ public void shouldNotDeleteOtherSchemasForThisApplicationId() throws Exception {
));

// When:
SchemaRegistryUtil.cleanUpInternalTopicAvroSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);

// Then not exception:
verify(schemaRegistryClient, never()).deleteSubject(any());
@@ -86,7 +86,7 @@ public void shouldNotDeleteOtherSchemas() throws Exception {
));

// When:
SchemaRegistryUtil.cleanUpInternalTopicAvroSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);

// Then not exception:
verify(schemaRegistryClient, never()).deleteSubject(any());
@@ -98,7 +98,7 @@ public void shouldNotThrowIfAllSubjectsThrows() throws Exception {
when(schemaRegistryClient.getAllSubjects()).thenThrow(new RuntimeException("Boom!"));

// When:
SchemaRegistryUtil.cleanUpInternalTopicAvroSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);

// Then not exception:
verify(schemaRegistryClient).getAllSubjects();
@@ -114,7 +114,7 @@ public void shouldNotThrowIfDeleteSubjectThrows() throws Exception {
when(schemaRegistryClient.deleteSubject(any())).thenThrow(new RuntimeException("Boom!"));

// When:
SchemaRegistryUtil.cleanUpInternalTopicAvroSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);

// Then not exception:
verify(schemaRegistryClient, times(5)).deleteSubject(any());
Original file line number Diff line number Diff line change
@@ -140,7 +140,7 @@ public void shouldDeleteTopic() {
}

@Test
public void shouldDeleteSchemaInSR() throws IOException, RestClientException {
public void shouldDeleteAvroSchemaInSR() throws IOException, RestClientException {
// Given:
when(topic.getValueFormat()).thenReturn(ValueFormat.of(FormatInfo.of(FormatFactory.AVRO.name())));

@@ -152,7 +152,22 @@ public void shouldDeleteSchemaInSR() throws IOException, RestClientException {
}

@Test
public void shouldNotDeleteSchemaInSRIfNotAvro() throws IOException, RestClientException {
public void shouldDeleteProtoSchemaInSR() throws IOException, RestClientException {
// Given:
when(topic.getValueFormat()).thenReturn(ValueFormat.of(FormatInfo.of(FormatFactory.PROTOBUF.name())));

// When:
deleteInjector.inject(DROP_WITH_DELETE_TOPIC);

// Then:
verify(registryClient).deleteSubject("something" + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);
}

@Test
public void shouldNotDeleteSchemaInSRIfNotSRSupported() throws IOException, RestClientException {
// Given:
when(topic.getValueFormat()).thenReturn(ValueFormat.of(FormatInfo.of(FormatFactory.DELIMITED.name())));

// When:
deleteInjector.inject(DROP_WITH_DELETE_TOPIC);

Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ static Optional<SchemaRegistryClient> getSrClient(
final Format valueFormat,
final KsqlConfig ksqlConfig
) {
// the ksql datagen at the moment only supports AVRO, not JSON/PROTOBUF
if (keyFormat != FormatFactory.AVRO && valueFormat != FormatFactory.AVRO) {
return Optional.empty();
}
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.parser.DurationParser;
import io.confluent.ksql.query.QueryId;
@@ -165,7 +165,7 @@ public Serializer<Object> getKeySerializer() {
final Serializer<?> serializer = keySerdeSupplier.getSerializer(srClient);

serializer.configure(ImmutableMap.of(
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "something"
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "something"
), true);

return (Serializer) serializer;
@@ -179,7 +179,7 @@ public Serializer<Object> getValueSerializer() {
final Serializer<?> serializer = valueSerdeSupplier.getSerializer(srClient);

serializer.configure(ImmutableMap.of(
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "something"
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "something"
), false);

return (Serializer) serializer;
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.serde.Format;
@@ -129,7 +129,7 @@ public static <T> SerdeSupplier<?> getKeySerdeSupplier(
public Serializer<Windowed<T>> getSerializer(final SchemaRegistryClient srClient) {
final Serializer<T> serializer = inner.getSerializer(srClient);
serializer.configure(ImmutableMap.of(
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "something"
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "something"
), true);
return new SessionWindowedSerializer<>(serializer);
}
@@ -148,7 +148,7 @@ public Deserializer<Windowed<T>> getDeserializer(final SchemaRegistryClient srCl
public Serializer<Windowed<T>> getSerializer(final SchemaRegistryClient srClient) {
final Serializer<T> serializer = inner.getSerializer(srClient);
serializer.configure(ImmutableMap.of(
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "something"
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "something"
), true);
return new TimeWindowedSerializer<>(serializer);
}
Original file line number Diff line number Diff line change
@@ -153,7 +153,7 @@ private static TestCasePlan parseSpec(final PlannedTestPath versionDir) {
slurp(topologyPath)
);
} catch (final IOException e) {
throw new RuntimeException(e);
throw new RuntimeException("Failed to parse spec at " + versionDir.path(), e);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"version" : "5.5.0",
"timestamp" : 1583183630437,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (C1 INTEGER) WITH (KAFKA_TOPIC='input', SCHEMA_ID=1, VALUE_FORMAT='AvRo');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` STRING KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT WITH (PARTITIONS=4) AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` STRING KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` STRING KEY, `C1` INTEGER"
},
"selectExpressions" : [ "C1 AS C1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<C1 INT> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<C1 INT> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent501665998288149869",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"version" : "5.5.0",
"timestamp" : 1583183630506,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (C1 BIGINT) WITH (KAFKA_TOPIC='input', SCHEMA_ID=1, VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` STRING KEY, `C1` BIGINT",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT WITH (PARTITIONS=4) AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` STRING KEY, `C1` BIGINT",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` STRING KEY, `C1` BIGINT"
},
"selectExpressions" : [ "C1 AS C1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<C1 BIGINT> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<C1 BIGINT> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent501665998288149869",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"version" : "5.5.0",
"timestamp" : 1583183630472,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (C1 INTEGER) WITH (KAFKA_TOPIC='input', SCHEMA_ID=1, VALUE_FORMAT='PROTOBUF');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` STRING KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "PROTOBUF",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT WITH (PARTITIONS=4) AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` STRING KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "PROTOBUF",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "PROTOBUF",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` STRING KEY, `C1` INTEGER"
},
"selectExpressions" : [ "C1 AS C1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "PROTOBUF",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<C1 INT> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<C1 INT> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent501665998288149869",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"version" : "5.5.0",
"timestamp" : 1583183630576,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ROWKEY INTEGER KEY, C1 INTEGER) WITH (KAFKA_TOPIC='input', SCHEMA_ID=1, VALUE_FORMAT='AvRo');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER"
},
"selectExpressions" : [ "C1 AS C1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<C1 INT> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<C1 INT> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent501665998288149869",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"version" : "5.5.0",
"timestamp" : 1583183630616,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ROWKEY INTEGER KEY, C1 BIGINT) WITH (KAFKA_TOPIC='input', SCHEMA_ID=1, VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` INTEGER KEY, `C1` BIGINT",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` INTEGER KEY, `C1` BIGINT",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` INTEGER KEY, `C1` BIGINT"
},
"selectExpressions" : [ "C1 AS C1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<C1 BIGINT> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<C1 BIGINT> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent501665998288149869",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"version" : "5.5.0",
"timestamp" : 1583183630643,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ROWKEY INTEGER KEY, C1 BIGINT) WITH (KAFKA_TOPIC='input', SCHEMA_ID=1, VALUE_FORMAT='JSON_SR');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` INTEGER KEY, `C1` BIGINT",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` INTEGER KEY, `C1` BIGINT",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` INTEGER KEY, `C1` BIGINT"
},
"selectExpressions" : [ "C1 AS C1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<C1 BIGINT> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<C1 BIGINT> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent501665998288149869",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"version" : "5.5.0",
"timestamp" : 1583183630594,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ROWKEY INTEGER KEY, C1 INTEGER) WITH (KAFKA_TOPIC='input', SCHEMA_ID=1, VALUE_FORMAT='PROTOBUF');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "PROTOBUF",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "PROTOBUF",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "PROTOBUF",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` INTEGER KEY, `C1` INTEGER"
},
"selectExpressions" : [ "C1 AS C1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "PROTOBUF",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<C1 INT> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<C1 INT> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent501665998288149869",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"version" : "5.5.0",
"timestamp" : 1583183630550,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (`@TIMESTAMP` BIGINT, `FROM` BIGINT) WITH (KAFKA_TOPIC='input', SCHEMA_ID=1, VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ROWKEY` STRING KEY, `@TIMESTAMP` BIGINT, `FROM` BIGINT",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` STRING KEY, `@TIMESTAMP` BIGINT, `FROM` BIGINT",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` STRING KEY, `@TIMESTAMP` BIGINT, `FROM` BIGINT"
},
"selectExpressions" : [ "`@TIMESTAMP` AS `@TIMESTAMP`", "`FROM` AS `FROM`" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<@TIMESTAMP BIGINT, FROM BIGINT> NOT NULL",
"CSAS_OUTPUT_0.OUTPUT" : "STRUCT<@TIMESTAMP BIGINT, FROM BIGINT> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent501665998288149869",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT)
<-- Project

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"version" : "5.5.0",
"timestamp" : 1583183643976,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (C1 INTEGER) WITH (KAFKA_TOPIC='input', SCHEMA_ID=1, VALUE_FORMAT='AVRO');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ROWKEY` STRING KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM S AS SELECT *\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "S",
"schema" : "`ROWKEY` STRING KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "S",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "S",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "S"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` STRING KEY, `C1` INTEGER"
},
"selectExpressions" : [ "C1 AS C1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "S"
},
"queryId" : "CSAS_S_0"
}
} ],
"schemas" : {
"CSAS_S_0.KsqlTopic.Source" : "STRUCT<C1 INT> NOT NULL",
"CSAS_S_0.S" : "STRUCT<C1 INT> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent501665998288149869",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: S)
<-- Project

Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
{
"version" : "5.5.0",
"timestamp" : 1583183643966,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (C1 INTEGER) WITH (KAFKA_TOPIC='input', SCHEMA_ID=1, VALUE_FORMAT='AVRO');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ROWKEY` STRING KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM S AS SELECT *\nFROM TEST TEST\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "S",
"schema" : "`ROWKEY` STRING KEY, `C1` INTEGER",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "S",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "S",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "S"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "input",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` STRING KEY, `C1` INTEGER"
},
"selectExpressions" : [ "C1 AS C1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "AVRO",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "S"
},
"queryId" : "CSAS_S_0"
}
} ],
"schemas" : {
"CSAS_S_0.KsqlTopic.Source" : "STRUCT<C1 INT> NOT NULL",
"CSAS_S_0.S" : "STRUCT<C1 INT> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/bz/qnz23q_j6v12b3b_tm9ztv700000gn/T/confluent501665998288149869",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.streamsstore.rebalancing.timeout.ms" : "10000",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-TRANSFORMVALUES-0000000001
Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
--> Project
<-- KSTREAM-SOURCE-0000000000
Processor: Project (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-TRANSFORMVALUES-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: S)
<-- Project

Original file line number Diff line number Diff line change
@@ -129,8 +129,13 @@ public Optional<String> getTimestampFormat() {
return Optional.ofNullable(props.getString(CommonCreateConfigs.TIMESTAMP_FORMAT_PROPERTY));
}

public Optional<Integer> getAvroSchemaId() {
return Optional.ofNullable(props.getInt(CreateConfigs.AVRO_SCHEMA_ID));
public Optional<Integer> getSchemaId() {
Integer schemaId = props.getInt(CreateConfigs.SCHEMA_ID);
if (schemaId == null) {
schemaId = props.getInt(CreateConfigs.AVRO_SCHEMA_ID);
}

return Optional.ofNullable(schemaId);
}

public FormatInfo getFormatInfo() {
@@ -161,7 +166,7 @@ public Optional<Boolean> getWrapSingleValues() {

public CreateSourceProperties withSchemaId(final int id) {
final Map<String, Literal> originals = props.copyOfOriginalLiterals();
originals.put(CreateConfigs.AVRO_SCHEMA_ID, new IntegerLiteral(id));
originals.put(CreateConfigs.SCHEMA_ID, new IntegerLiteral(id));

return new CreateSourceProperties(originals, durationParser);
}
Original file line number Diff line number Diff line change
@@ -81,7 +81,7 @@ public void shouldReturnOptionalEmptyForMissingProps() {
assertThat(properties.getTimestampColumnName(), is(Optional.empty()));
assertThat(properties.getTimestampFormat(), is(Optional.empty()));
assertThat(properties.getWindowType(), is(Optional.empty()));
assertThat(properties.getAvroSchemaId(), is(Optional.empty()));
assertThat(properties.getSchemaId(), is(Optional.empty()));
assertThat(properties.getFormatInfo(), is(FormatInfo.of("AvRo")));
assertThat(properties.getReplicas(), is(Optional.empty()));
assertThat(properties.getPartitions(), is(Optional.empty()));
@@ -275,6 +275,19 @@ public void shouldThrowOnSessionWindowWithSize() {

@Test
public void shouldSetValidSchemaId() {
// When:
final CreateSourceProperties properties = CreateSourceProperties.from(
ImmutableMap.<String, Literal>builder()
.putAll(MINIMUM_VALID_PROPS)
.put(CreateConfigs.SCHEMA_ID, new StringLiteral("1"))
.build());

// Then:
assertThat(properties.getSchemaId(), is(Optional.of(1)));
}

@Test
public void shouldSetValidLegacySchemaId() {
// When:
final CreateSourceProperties properties = CreateSourceProperties.from(
ImmutableMap.<String, Literal>builder()
@@ -283,7 +296,7 @@ public void shouldSetValidSchemaId() {
.build());

// Then:
assertThat(properties.getAvroSchemaId(), is(Optional.of(1)));
assertThat(properties.getSchemaId(), is(Optional.of(1)));
}

@Test
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.ExecutorUtil;
import io.confluent.ksql.util.KsqlConstants;
@@ -82,7 +81,7 @@ private void deleteSinkTopics(final List<String> deleteTopicPatterns) {
final List<DataSource> toDelete = getSourcesToDelete(patterns, ksqlEngine.getMetaStore());

deleteTopics(topicNames(toDelete));
cleanUpSinkAvroSchemas(subjectNames(toDelete));
cleanUpSinkSchemas(subjectNames(toDelete));
}

private List<String> filterNonExistingTopics(final Collection<String> topicList) {
@@ -104,7 +103,7 @@ private void deleteTopics(final Collection<String> topicsToBeDeleted) {
}
}

private void cleanUpSinkAvroSchemas(final Collection<String> subjectsToDelete) {
private void cleanUpSinkSchemas(final Collection<String> subjectsToDelete) {
final Set<String> knownSubject = SchemaRegistryUtil
.getSubjectNames(serviceContext.getSchemaRegistryClient())
.collect(Collectors.toSet());
@@ -145,7 +144,7 @@ private static Set<String> topicNames(final List<DataSource> sources) {

private static Set<String> subjectNames(final List<DataSource> sources) {
return sources.stream()
.filter(s -> s.getKsqlTopic().getValueFormat().getFormat() == FormatFactory.AVRO)
.filter(s -> s.getKsqlTopic().getValueFormat().getFormat().supportsSchemaInference())
.map(DataSource::getKsqlTopic)
.map(KsqlTopic::getKafkaTopicName)
.map(topicName -> topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX)
Original file line number Diff line number Diff line change
@@ -171,7 +171,7 @@ public void shouldDeleteTopicListWithExplicitTopicName() {
}

@Test
public void shouldCleanUpSchemasForExplicitTopicList() throws Exception {
public void shouldCleanUpSchemasForExplicitTopicListAvro() throws Exception {
// Given:
givenTopicsExistInKafka("K_Foo");
givenSinkTopicsExistInMetastore(FormatFactory.AVRO, "K_Foo");
@@ -184,6 +184,20 @@ public void shouldCleanUpSchemasForExplicitTopicList() throws Exception {
verifySchemaDeletedForTopics("K_Foo");
}

@Test
public void shouldCleanUpSchemasForExplicitTopicListProtobuf() throws Exception {
// Given:
givenTopicsExistInKafka("K_Foo");
givenSinkTopicsExistInMetastore(FormatFactory.PROTOBUF, "K_Foo");
givenSchemasForTopicsExistInSchemaRegistry("K_Foo");

// When:
clusterTerminator.terminateCluster(ImmutableList.of("K_Foo"));

// Then:
verifySchemaDeletedForTopics("K_Foo");
}

@Test
public void shouldOnlyDeleteExistingTopics() {
// Given:
@@ -409,10 +423,10 @@ public void shouldNotThrowIfCannotCleanUpSchema() throws Exception {
}

@Test
public void shouldNotCleanUpSchemaForNonAvroTopic() throws Exception {
public void shouldNotCleanUpSchemaForNonSchemaInferenceSupportedTopic() throws Exception {
// Given:
givenTopicsExistInKafka("K_Foo");
givenSinkTopicsExistInMetastore(FormatFactory.JSON,"K_Foo");
givenSinkTopicsExistInMetastore(FormatFactory.DELIMITED,"K_Foo");
givenSchemasForTopicsExistInSchemaRegistry("K_Foo");

// When:
Original file line number Diff line number Diff line change
@@ -82,7 +82,9 @@ public KsqlJsonDeserializer(
final PersistenceSchema physicalSchema,
final boolean isJsonSchema
) {
this.physicalSchema = JsonSerdeUtils.validateSchema(physicalSchema);
this.physicalSchema = JsonSerdeUtils.validateSchema(
Objects.requireNonNull(physicalSchema, "physicalSchema")
);
this.isJsonSchema = isJsonSchema;
}