diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 325728a7750..32ce64c59ef 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -28,7 +28,7 @@
files="(AvroData|ConfigResource|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaSchemaSerDe|AbstractKafkaAvroSerializer|AbstractKafkaAvroDeserializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource|Rule|WildcardMatcher|JsonSchemaComparator|LocalSchemaRegistryClient|DataEncryptionKeyId|FieldEncryptionExecutor).java"/>
+ files="(AbstractKafkaAvroSerializer|AbstractKafkaJsonSchemaSerializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|AbstractKafkaSchemaSerDe|AvroData|AvroSchema|AvroSchemaUtils|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchema|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema|ProtobufSchemaUtils|JsonSchemaComparator|SchemaMessageFormatter|SchemaMessageReader).java"/>
diff --git a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java
index 7fe9ff4749b..c3e1771689a 100644
--- a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java
+++ b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java
@@ -134,24 +134,16 @@ public Object transform(RuleContext ctx, Object message) throws RuleException {
input = message;
}
Object result = execute(ctx, input, ImmutableMap.of("message", input));
- if (ctx.rule().getKind() == RuleKind.CONDITION) {
- if (Boolean.TRUE.equals(result)) {
- return message;
- } else {
- throw new RuleException("Expr failed: '" + ctx.rule().getExpr() + "'");
- }
- } else {
- if (result instanceof Map) {
- // Convert maps to the target object type
- try {
- JsonNode jsonNode = mapper.valueToTree(result);
- result = ctx.target().fromJson(jsonNode);
- } catch (IOException e) {
- throw new RuleException(e);
- }
+ if (result instanceof Map) {
+ // Convert maps to the target object type
+ try {
+ JsonNode jsonNode = mapper.valueToTree(result);
+ result = ctx.target().fromJson(jsonNode);
+ } catch (IOException e) {
+ throw new RuleException(e);
}
- return result;
}
+ return result;
}
protected Object execute(
diff --git a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java
index b289b78387b..58ffe874a22 100644
--- a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java
+++ b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java
@@ -98,7 +98,7 @@ public JsonataExecutorTest() {
defaultConfig.put(KafkaAvroSerializerConfig.LATEST_COMPATIBILITY_STRICT, "false");
defaultConfig.put(KafkaAvroSerializerConfig.RULE_EXECUTORS, "jsonata");
defaultConfig.put(KafkaAvroSerializerConfig.RULE_EXECUTORS + ".jsonata.class",
- JsonataExecutor.class.getName());
+ JsonataExecutor.class);
avroSerializer = new KafkaAvroSerializer(schemaRegistry, defaultConfig);
Map defaultConfig2 = new HashMap<>(defaultConfig);
defaultConfig2.put(KafkaAvroSerializerConfig.USE_LATEST_WITH_METADATA,
diff --git a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java
index a79fe91ddaa..df27ab27086 100644
--- a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java
+++ b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java
@@ -229,9 +229,10 @@ private Stream initRuleObject(
if (propertyValue == null) {
return Stream.empty();
}
- String className = propertyValue.toString();
try {
- RuleBase ruleObject = Utils.newInstance(className, RuleBase.class);
+ RuleBase ruleObject = propertyValue instanceof Class
+ ? Utils.newInstance((Class>) propertyValue, RuleBase.class)
+ : Utils.newInstance(propertyValue.toString(), RuleBase.class);
configureRuleObject(ruleObject, name, config, configName);
return Stream.of(ruleObject);
} catch (ClassNotFoundException e) {
@@ -675,7 +676,23 @@ protected Object executeRules(
RuleExecutor ruleExecutor = getRuleExecutor(ctx);
if (ruleExecutor != null) {
try {
- message = ruleExecutor.transform(ctx, message);
+ Object result = ruleExecutor.transform(ctx, message);
+ switch (rule.getKind()) {
+ case CONDITION:
+ if (Boolean.FALSE.equals(result)) {
+ String expr = rule.getExpr();
+ String errMsg = expr != null
+ ? "Expr failed: '" + expr + "'"
+ : "Condition failed: '" + rule.getName() + "'";
+ throw new RuleException(errMsg);
+ }
+ break;
+ case TRANSFORM:
+ message = result;
+ break;
+ default:
+ throw new IllegalStateException("Unsupported rule kind " + rule.getKind());
+ }
runAction(ctx, ruleMode, rule,
message != null ? rule.getOnSuccess() : rule.getOnFailure(),
message, null, message != null ? null : ErrorAction.TYPE
@@ -719,6 +736,10 @@ private void runAction(RuleContext ctx, RuleMode ruleMode, Rule rule, String act
}
if (actionName != null) {
RuleAction ruleAction = getRuleAction(ctx, actionName);
+ if (ruleAction == null) {
+ log.error("Could not find rule action of type {}", actionName);
+ throw new ConfigException("Could not find rule action of type " + actionName);
+ }
try {
ruleAction.run(ctx, message, ex);
} catch (RuleException e) {