Skip to content

Commit

Permalink
fix: fix test cases with InsertValuesExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Aug 8, 2019
1 parent 4201dff commit 0e851a5
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ private byte[] serializeValue(
topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX
));
default:
throw new KsqlException("Could not serialize row: " + row, e);
break;
}
}
}
Expand Down
27 changes: 16 additions & 11 deletions ksql-engine/src/main/java/io/confluent/ksql/util/AvroUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,23 @@ private static boolean isValidAvroSchemaForTopic(
"Could not check Schema compatibility: %s", e.getMessage()
));
} catch (final RestClientException e) {
switch (e.getStatus()) {
case HttpStatus.SC_NOT_FOUND:
case HttpStatus.SC_UNAUTHORIZED:
case HttpStatus.SC_FORBIDDEN:
// Assume the subject is unknown.
// See https://github.com/confluentinc/schema-registry/issues/951
return true;
default:
throw new KsqlException(String.format(
"Could not connect to Schema Registry service: %s", e.getMessage()
));
if (e.getStatus() == HttpStatus.SC_NOT_FOUND) {
// Assume the subject is unknown.
// See https://github.com/confluentinc/schema-registry/issues/951
return true;
}

String errorMessage = e.getMessage();
if (e.getStatus() == HttpStatus.SC_UNAUTHORIZED || e.getStatus() == HttpStatus.SC_FORBIDDEN) {
errorMessage = String.format(
"Not authorized to access Schema Registry subject: [%s]",
topicName + KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX
);
}

throw new KsqlException(String.format(
"Could not connect to Schema Registry service: %s", errorMessage
));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.ksql.engine;

import static org.hamcrest.Matchers.containsString;
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -485,7 +487,7 @@ public void shouldThrowOnSerializingKeyError() {

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Could not serialize key");
expectedException.expectCause(hasMessage(containsString("Could not serialize key")));

// When:
executor.execute(statement, engine, serviceContext);
Expand All @@ -507,7 +509,7 @@ public void shouldThrowOnSerializingValueError() {

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Could not serialize row");
expectedException.expectCause(hasMessage(containsString("Could not serialize row")));

// When:
executor.execute(statement, engine, serviceContext);
Expand All @@ -525,7 +527,7 @@ public void shouldThrowIfRowKeyAndKeyDoNotMatch() {

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Expected ROWKEY and COL0 to match");
expectedException.expectCause(hasMessage(containsString("Expected ROWKEY and COL0 to match")));

// When:
executor.execute(statement, engine, serviceContext);
Expand All @@ -542,7 +544,7 @@ public void shouldThrowIfNotEnoughValuesSuppliedWithNoSchema() {

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Expected a value for each column");
expectedException.expectCause(hasMessage(containsString("Expected a value for each column")));

// When:
executor.execute(statement, engine, serviceContext);
Expand All @@ -562,7 +564,7 @@ public void shouldFailOnDowncast() {

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Expected type INTEGER for field");
expectedException.expectCause(hasMessage(containsString("Expected type INTEGER for field")));

// When:
executor.execute(statement, engine, serviceContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.confluent.ksql.util;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -203,11 +204,30 @@ public void shouldReturnValidEvolutionIfSubjectNotRegistered() throws Exception
}

@Test
public void shouldThrowOnAnyOtherEvolutionSrException() throws Exception {
public void shouldThrowOnSrAuthorizationErrors() throws Exception {
// Given:
when(srClient.testCompatibility(any(), any()))
.thenThrow(new RestClientException("Unknown subject", 403, 40401));

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Could not connect to Schema Registry service");
expectedException.expectMessage(containsString(String.format(
"Not authorized to access Schema Registry subject: [%s]",
persistentQuery.getResultTopic().getKafkaTopicName()
+ KsqlConstants.SCHEMA_REGISTRY_VALUE_SUFFIX
)));

// When:
AvroUtil.isValidSchemaEvolution(persistentQuery, srClient);
}

@Test
public void shouldThrowOnAnyOtherEvolutionSrException() throws Exception {
// Given:
when(srClient.testCompatibility(any(), any()))
.thenThrow(new RestClientException("Unknown subject", 500, 40401));

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Could not connect to Schema Registry service");
Expand Down

0 comments on commit 0e851a5

Please sign in to comment.