Skip to content

Commit

Permalink
chore: tie up some loose ends around the new formats (#4651)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Mar 2, 2020
1 parent 7e625ec commit 43db241
Show file tree
Hide file tree
Showing 42 changed files with 1,745 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,30 @@
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.cli.console.table.Table.Builder;
import io.confluent.ksql.rest.entity.TopicDescription;
import io.confluent.ksql.serde.FormatFactory;
import java.util.ArrayList;
import java.util.List;

public class TopicDescriptionTableBuilder implements TableBuilder<TopicDescription> {

private static final List<String> NON_AVRO_HEADERS =
private static final List<String> NON_SCHEMA_HEADERS =
ImmutableList.of("Table Name", "Kafka Topic", "Type");

private static final List<String> AVRO_HEADERS =
ImmutableList.of("Table Name", "Kafka Topic", "Type", "AvroSchema");
private static final List<String> SCHEMA_HEADERS =
ImmutableList.of("Table Name", "Kafka Topic", "Type", "Schema");

@Override
public Table buildTable(final TopicDescription topicDescription) {
final boolean avro = topicDescription.getFormat().equalsIgnoreCase("AVRO");
final String format = topicDescription.getFormat();
final boolean supportsSchema = FormatFactory.fromName(format).supportsSchemaInference();

final List<String> headings = avro ? AVRO_HEADERS : NON_AVRO_HEADERS;
final List<String> headings = supportsSchema ? SCHEMA_HEADERS : NON_SCHEMA_HEADERS;

final List<String> row = new ArrayList<>(4);
row.add(topicDescription.getName());
row.add(topicDescription.getKafkaTopic());
row.add(topicDescription.getFormat());
if (avro) {
row.add(format);
if (supportsSchema) {
row.add(topicDescription.getSchemaString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ public void testPrintTopicDescription() {
+ "} ]\n"));
} else {
assertThat(output, is("\n"
+ " Table Name | Kafka Topic | Type | AvroSchema \n"
+ " Table Name | Kafka Topic | Type | Schema \n"
+ "---------------------------------------------------\n"
+ " TestTopic | TestKafkaTopic | AVRO | schemaString \n"
+ "---------------------------------------------------\n"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class CreateConfigs {
public static final String WINDOW_TYPE_PROPERTY = "WINDOW_TYPE";
public static final String WINDOW_SIZE_PROPERTY = "WINDOW_SIZE";
public static final String AVRO_SCHEMA_ID = "AVRO_SCHEMA_ID";
public static final String SCHEMA_ID = "SCHEMA_ID";
public static final String SOURCE_CONNECTOR = "SOURCE_CONNECTOR";

private static final ConfigDef CONFIG_DEF = new ConfigDef()
Expand Down Expand Up @@ -63,11 +64,17 @@ public final class CreateConfigs {
+ "then the property should be used to provide the window size, "
+ "for example: '20 SECONDS'."
).define(
AVRO_SCHEMA_ID,
SCHEMA_ID,
ConfigDef.Type.INT,
null,
Importance.LOW,
"Undocumented feature"
).define(
AVRO_SCHEMA_ID,
ConfigDef.Type.INT,
null,
Importance.LOW,
"Undocumented feature (deprecated - use SCHEMA_ID instead)"
).define(
SOURCE_CONNECTOR,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.schema.ksql.SqlValueCoercer;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeySerdeFactory;
Expand Down Expand Up @@ -440,7 +439,7 @@ private byte[] serializeValue(
try {
return valueSerde.serializer().serialize(topicName, row);
} catch (final Exception e) {
if (dataSource.getKsqlTopic().getValueFormat().getFormat() == FormatFactory.AVRO) {
if (dataSource.getKsqlTopic().getValueFormat().getFormat().supportsSchemaInference()) {
final Throwable rootCause = ExceptionUtils.getRootCause(e);
if (rootCause instanceof RestClientException) {
switch (((RestClientException) rootCause).getStatus()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private void unregisterQuery(final ServiceContext serviceContext, final QueryMet

if (query.hasEverBeenStarted()) {
SchemaRegistryUtil
.cleanUpInternalTopicAvroSchemas(applicationId, serviceContext.getSchemaRegistryClient());
.cleanupInternalTopicSchemas(applicationId, serviceContext.getSchemaRegistryClient());
serviceContext.getTopicClient().deleteInternalTopics(applicationId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private SchemaAndId getValueSchema(
) {
final String topicName = statement.getStatement().getProperties().getKafkaTopic();

final SchemaResult result = statement.getStatement().getProperties().getAvroSchemaId()
final SchemaResult result = statement.getStatement().getProperties().getSchemaId()
.map(id -> schemaSupplier.getValueSchema(topicName, Optional.of(id)))
.orElseGet(() -> schemaSupplier.getValueSchema(topicName, Optional.empty()));

Expand Down Expand Up @@ -145,7 +145,7 @@ private static CreateSource addSchemaFields(
final CreateSource statement = preparedStatement.getStatement();
final CreateSourceProperties properties = statement.getProperties();

if (properties.getAvroSchemaId().isPresent()) {
if (properties.getSchemaId().isPresent()) {
return statement.copyWith(elements, properties);
}
return statement.copyWith(elements, properties.withSchemaId(schema.id));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.schema.registry;

import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
Expand Down Expand Up @@ -144,12 +145,12 @@ public int getId(final String s, final ParsedSchema parsedSchema) {

@Override
public List<Integer> deleteSubject(final String s) {
throw configException;
return ImmutableList.of();
}

@Override
public List<Integer> deleteSubject(final Map<String, String> map, final String s) {
throw configException;
return ImmutableList.of();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class SchemaRegistryUtil {
private SchemaRegistryUtil() {
}

public static void cleanUpInternalTopicAvroSchemas(
public static void cleanupInternalTopicSchemas(
final String applicationId,
final SchemaRegistryClient schemaRegistryClient
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.confluent.ksql.util.LimitedProxyBuilder;
import java.util.Collections;
import java.util.Objects;
import org.apache.avro.Schema;

/**
* SchemaRegistryClient used when trying out operations.
Expand All @@ -42,8 +41,6 @@ static SchemaRegistryClient createProxy(final SchemaRegistryClient delegate) {
.swallow("register", anyParams(), 123)
.forward("getLatestSchemaMetadata", methodParams(String.class), delegate)
.forward("getSchemaBySubjectAndId", methodParams(String.class, int.class), delegate)
.forward("testCompatibility",
methodParams(String.class, Schema.class), delegate)
.forward("testCompatibility",
methodParams(String.class, ParsedSchema.class), delegate)
.swallow("deleteSubject", methodParams(String.class), Collections.emptyList())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public <T extends Statement> ConfiguredStatement<T> inject(
}

try {
if (source.getKsqlTopic().getValueFormat().getFormat() == FormatFactory.AVRO) {
if (source.getKsqlTopic().getValueFormat().getFormat().supportsSchemaInference()) {
SchemaRegistryUtil.deleteSubjectWithRetries(
schemaRegistryClient,
source.getKafkaTopicName() + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public void shouldBuildNewCsStatementText() {
+ "MAPFIELD MAP<STRING, BIGINT>, "
+ "STRUCTFIELD STRUCT<S0 BIGINT>, "
+ "DECIMALFIELD DECIMAL(4, 2)) "
+ "WITH (AVRO_SCHEMA_ID=5, KAFKA_TOPIC='some-topic', VALUE_FORMAT='avro');"
+ "WITH (KAFKA_TOPIC='some-topic', SCHEMA_ID=5, VALUE_FORMAT='avro');"
));
}

Expand All @@ -311,14 +311,14 @@ public void shouldBuildNewCtStatementText() {
+ "MAPFIELD MAP<STRING, BIGINT>, "
+ "STRUCTFIELD STRUCT<S0 BIGINT>, "
+ "DECIMALFIELD DECIMAL(4, 2)) "
+ "WITH (AVRO_SCHEMA_ID=5, KAFKA_TOPIC='some-topic', VALUE_FORMAT='avro');"
+ "WITH (KAFKA_TOPIC='some-topic', SCHEMA_ID=5, VALUE_FORMAT='avro');"
));
}

@Test
public void shouldBuildNewCsStatementTextFromId() {
// Given:
when(cs.getProperties()).thenReturn(supportedPropsWith("AVRO_SCHEMA_ID", "42"));
when(cs.getProperties()).thenReturn(supportedPropsWith("SCHEMA_ID", "42"));

when(schemaSupplier.getValueSchema(KAFKA_TOPIC, Optional.of(42)))
.thenReturn(SchemaResult.success(schemaAndId(SUPPORTED_SCHEMA, SCHEMA_ID)));
Expand All @@ -338,14 +338,14 @@ public void shouldBuildNewCsStatementTextFromId() {
+ "MAPFIELD MAP<STRING, BIGINT>, "
+ "STRUCTFIELD STRUCT<S0 BIGINT>, "
+ "DECIMALFIELD DECIMAL(4, 2)) "
+ "WITH (AVRO_SCHEMA_ID='42', KAFKA_TOPIC='some-topic', VALUE_FORMAT='avro');"
+ "WITH (KAFKA_TOPIC='some-topic', SCHEMA_ID='42', VALUE_FORMAT='avro');"
));
}

@Test
public void shouldBuildNewCtStatementTextFromId() {
// Given:
when(ct.getProperties()).thenReturn(supportedPropsWith("AVRO_SCHEMA_ID", "42"));
when(ct.getProperties()).thenReturn(supportedPropsWith("SCHEMA_ID", "42"));

when(schemaSupplier.getValueSchema(KAFKA_TOPIC, Optional.of(42)))
.thenReturn(SchemaResult.success(schemaAndId(SUPPORTED_SCHEMA, SCHEMA_ID)));
Expand All @@ -365,7 +365,7 @@ public void shouldBuildNewCtStatementTextFromId() {
+ "MAPFIELD MAP<STRING, BIGINT>, "
+ "STRUCTFIELD STRUCT<S0 BIGINT>, "
+ "DECIMALFIELD DECIMAL(4, 2)) "
+ "WITH (AVRO_SCHEMA_ID='42', KAFKA_TOPIC='some-topic', VALUE_FORMAT='avro');"
+ "WITH (KAFKA_TOPIC='some-topic', SCHEMA_ID='42', VALUE_FORMAT='avro');"
));
}

Expand All @@ -379,23 +379,23 @@ public void shouldAddSchemaIdIfNotPresentAlready() {
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(SCHEMA_ID)));
assertThat(result.getStatement().getProperties().getSchemaId(), is(Optional.of(SCHEMA_ID)));

assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID=5"));
assertThat(result.getStatementText(), containsString("SCHEMA_ID=5"));
}

@Test
public void shouldNotOverwriteExistingSchemaId() {
// Given:
when(cs.getProperties()).thenReturn(supportedPropsWith("AVRO_SCHEMA_ID", "42"));
when(cs.getProperties()).thenReturn(supportedPropsWith("SCHEMA_ID", "42"));

// When:
final ConfiguredStatement<CreateStream> result = injector.inject(csStatement);

// Then:
assertThat(result.getStatement().getProperties().getAvroSchemaId(), is(Optional.of(42)));
assertThat(result.getStatement().getProperties().getSchemaId(), is(Optional.of(42)));

assertThat(result.getStatementText(), containsString("AVRO_SCHEMA_ID='42'"));
assertThat(result.getStatementText(), containsString("SCHEMA_ID='42'"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void shouldDeleteChangeLogTopicSchema() throws Exception {
));

// When:
SchemaRegistryUtil.cleanUpInternalTopicAvroSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);

// Then not exception:
verify(schemaRegistryClient).deleteSubject(APP_ID + "SOME-changelog-value");
Expand All @@ -58,7 +58,7 @@ public void shouldDeleteRepartitionTopicSchema() throws Exception {
));

// When:
SchemaRegistryUtil.cleanUpInternalTopicAvroSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);

// Then not exception:
verify(schemaRegistryClient).deleteSubject(APP_ID + "SOME-repartition-value");
Expand All @@ -72,7 +72,7 @@ public void shouldNotDeleteOtherSchemasForThisApplicationId() throws Exception {
));

// When:
SchemaRegistryUtil.cleanUpInternalTopicAvroSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);

// Then not exception:
verify(schemaRegistryClient, never()).deleteSubject(any());
Expand All @@ -86,7 +86,7 @@ public void shouldNotDeleteOtherSchemas() throws Exception {
));

// When:
SchemaRegistryUtil.cleanUpInternalTopicAvroSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);

// Then not exception:
verify(schemaRegistryClient, never()).deleteSubject(any());
Expand All @@ -98,7 +98,7 @@ public void shouldNotThrowIfAllSubjectsThrows() throws Exception {
when(schemaRegistryClient.getAllSubjects()).thenThrow(new RuntimeException("Boom!"));

// When:
SchemaRegistryUtil.cleanUpInternalTopicAvroSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);

// Then not exception:
verify(schemaRegistryClient).getAllSubjects();
Expand All @@ -114,7 +114,7 @@ public void shouldNotThrowIfDeleteSubjectThrows() throws Exception {
when(schemaRegistryClient.deleteSubject(any())).thenThrow(new RuntimeException("Boom!"));

// When:
SchemaRegistryUtil.cleanUpInternalTopicAvroSchemas(APP_ID, schemaRegistryClient);
SchemaRegistryUtil.cleanupInternalTopicSchemas(APP_ID, schemaRegistryClient);

// Then not exception:
verify(schemaRegistryClient, times(5)).deleteSubject(any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void shouldDeleteTopic() {
}

@Test
public void shouldDeleteSchemaInSR() throws IOException, RestClientException {
public void shouldDeleteAvroSchemaInSR() throws IOException, RestClientException {
// Given:
when(topic.getValueFormat()).thenReturn(ValueFormat.of(FormatInfo.of(FormatFactory.AVRO.name())));

Expand All @@ -152,7 +152,22 @@ public void shouldDeleteSchemaInSR() throws IOException, RestClientException {
}

@Test
public void shouldNotDeleteSchemaInSRIfNotAvro() throws IOException, RestClientException {
public void shouldDeleteProtoSchemaInSR() throws IOException, RestClientException {
// Given:
when(topic.getValueFormat()).thenReturn(ValueFormat.of(FormatInfo.of(FormatFactory.PROTOBUF.name())));

// When:
deleteInjector.inject(DROP_WITH_DELETE_TOPIC);

// Then:
verify(registryClient).deleteSubject("something" + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX);
}

@Test
public void shouldNotDeleteSchemaInSRIfNotSRSupported() throws IOException, RestClientException {
// Given:
when(topic.getValueFormat()).thenReturn(ValueFormat.of(FormatInfo.of(FormatFactory.DELIMITED.name())));

// When:
deleteInjector.inject(DROP_WITH_DELETE_TOPIC);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ static Optional<SchemaRegistryClient> getSrClient(
final Format valueFormat,
final KsqlConfig ksqlConfig
) {
// the ksql datagen at the moment only supports AVRO, not JSON/PROTOBUF
if (keyFormat != FormatFactory.AVRO && valueFormat != FormatFactory.AVRO) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.parser.DurationParser;
import io.confluent.ksql.query.QueryId;
Expand Down Expand Up @@ -165,7 +165,7 @@ public Serializer<Object> getKeySerializer() {
final Serializer<?> serializer = keySerdeSupplier.getSerializer(srClient);

serializer.configure(ImmutableMap.of(
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "something"
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "something"
), true);

return (Serializer) serializer;
Expand All @@ -179,7 +179,7 @@ public Serializer<Object> getValueSerializer() {
final Serializer<?> serializer = valueSerdeSupplier.getSerializer(srClient);

serializer.configure(ImmutableMap.of(
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "something"
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "something"
), false);

return (Serializer) serializer;
Expand Down
Loading

0 comments on commit 43db241

Please sign in to comment.