diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.java index 7633198e65f8..7e585c3a9689 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjector.java @@ -110,7 +110,7 @@ private void registerSchema( final FormatInfo formatInfo, final KsqlConfig config, final String statementText, - final boolean overwriteExisting + final boolean registerIfSchemaExists ) { final Format format = FormatFactory.of(formatInfo); if (!format.supportsSchemaInference()) { @@ -125,16 +125,7 @@ private void registerSchema( final ParsedSchema parsedSchema = format.toParsedSchema(schema.withoutPseudoAndKeyColsInValue().value(), formatInfo); - if (!overwriteExisting && srClient.getAllSubjects().contains(subject)) { - if (!srClient.testCompatibility(subject, parsedSchema)) { - throw new KsqlStatementException( - String.format( - "Could not register schema for subject " - + "'%s' because it is incompatible with existing schema.", - subject), - statementText); - } - } else { + if (registerIfSchemaExists || !srClient.getAllSubjects().contains(subject)) { srClient.register(subject, parsedSchema); } } catch (IOException | RestClientException e) { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java index f1d5b26a6934..c30c1d1f12b4 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/SchemaRegisterInjectorTest.java @@ -23,6 +23,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -63,6 +64,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Optional; +import org.apache.avro.Schema; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -167,7 +169,7 @@ public void shouldNotRegisterSchemaForSchemaRegistryDisabledFormatCreateSource() } @Test - public void shouldRegisterSchemaForSchemaRegistryEnabledFormatCreateSource() + public void shouldRegisterSchemaForSchemaRegistryEnabledFormatCreateSourceIfSubjectDoesntExist() throws IOException, RestClientException { // Given: givenStatement("CREATE STREAM sink (f1 VARCHAR) WITH (kafka_topic='expectedName', value_format='AVRO', partitions=1);"); @@ -179,6 +181,7 @@ public void shouldRegisterSchemaForSchemaRegistryEnabledFormatCreateSource() verify(schemaRegistryClient).register("expectedName-value", AVRO_SCHEMA); } + @SuppressWarnings("deprecation") // make sure deprecated method is not called @Test public void shouldNotReplaceExistingSchemaForSchemaRegistryEnabledFormatCreateSource() throws IOException, RestClientException { @@ -191,24 +194,8 @@ public void shouldNotReplaceExistingSchemaForSchemaRegistryEnabledFormatCreateSo injector.inject(statement); // Then: - verify(schemaRegistryClient).getAllSubjects(); - verify(schemaRegistryClient).testCompatibility(eq("expectedName-value"), any(ParsedSchema.class)); - verifyNoMoreInteractions(schemaRegistryClient); - } - - @Test - public void shouldThrowOnExistingSchemaForSchemaRegistryEnabledFormatCreateSourceIncompatible() - throws IOException, RestClientException { - // Given: - givenStatement("CREATE STREAM sink (f1 VARCHAR) WITH (kafka_topic='expectedName', value_format='AVRO', partitions=1);"); - when(schemaRegistryClient.getAllSubjects()).thenReturn(ImmutableSet.of("expectedName-value")); - when(schemaRegistryClient.testCompatibility(eq("expectedName-value"), any(ParsedSchema.class))).thenReturn(false); - - // When: - final KsqlStatementException e = assertThrows(KsqlStatementException.class, () -> injector.inject(statement)); - - // Then: - assertThat(e.getMessage(), containsString("Could not register schema for subject 'expectedName-value' because it is incompatible with existing schema.")); + verify(schemaRegistryClient, never()).register(any(), any(ParsedSchema.class)); + verify(schemaRegistryClient, never()).register(any(), any(Schema.class)); } @Test