diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 325728a7750..32ce64c59ef 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -28,7 +28,7 @@ files="(AvroData|ConfigResource|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaSchemaSerDe|AbstractKafkaAvroSerializer|AbstractKafkaAvroDeserializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource|Rule|WildcardMatcher|JsonSchemaComparator|LocalSchemaRegistryClient|DataEncryptionKeyId|FieldEncryptionExecutor).java"/> + files="(AbstractKafkaAvroSerializer|AbstractKafkaJsonSchemaSerializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|AbstractKafkaSchemaSerDe|AvroData|AvroSchema|AvroSchemaUtils|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchema|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema|ProtobufSchemaUtils|JsonSchemaComparator|SchemaMessageFormatter|SchemaMessageReader).java"/> diff --git a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java index 7fe9ff4749b..c3e1771689a 100644 --- a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java +++ b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java @@ -134,24 +134,16 @@ public Object transform(RuleContext ctx, Object message) throws RuleException { input = message; } Object result = execute(ctx, input, ImmutableMap.of("message", input)); - if (ctx.rule().getKind() == RuleKind.CONDITION) { - if (Boolean.TRUE.equals(result)) { - return message; - } else { - throw new RuleException("Expr failed: '" + ctx.rule().getExpr() + "'"); - } - } else { - if (result instanceof Map) { - // Convert maps to the target object type - try { - JsonNode jsonNode = mapper.valueToTree(result); - result = ctx.target().fromJson(jsonNode); - } catch (IOException e) { - throw new RuleException(e); - } + if (result instanceof Map) { + // Convert maps to the target object type + try { + JsonNode jsonNode = mapper.valueToTree(result); + result = ctx.target().fromJson(jsonNode); + } catch (IOException e) { + throw new RuleException(e); } - return result; } + return result; } protected Object execute( diff --git a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java index b289b78387b..58ffe874a22 100644 --- a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java +++ b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java @@ -98,7 +98,7 @@ public JsonataExecutorTest() { defaultConfig.put(KafkaAvroSerializerConfig.LATEST_COMPATIBILITY_STRICT, "false"); defaultConfig.put(KafkaAvroSerializerConfig.RULE_EXECUTORS, "jsonata"); defaultConfig.put(KafkaAvroSerializerConfig.RULE_EXECUTORS + ".jsonata.class", - JsonataExecutor.class.getName()); + JsonataExecutor.class); avroSerializer = new KafkaAvroSerializer(schemaRegistry, defaultConfig); Map defaultConfig2 = new HashMap<>(defaultConfig); defaultConfig2.put(KafkaAvroSerializerConfig.USE_LATEST_WITH_METADATA, diff --git a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java index a79fe91ddaa..df27ab27086 100644 --- a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java +++ b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java @@ -229,9 +229,10 @@ private Stream initRuleObject( if (propertyValue == null) { return Stream.empty(); } - String className = propertyValue.toString(); try { - RuleBase ruleObject = Utils.newInstance(className, RuleBase.class); + RuleBase ruleObject = propertyValue instanceof Class + ? Utils.newInstance((Class) propertyValue, RuleBase.class) + : Utils.newInstance(propertyValue.toString(), RuleBase.class); configureRuleObject(ruleObject, name, config, configName); return Stream.of(ruleObject); } catch (ClassNotFoundException e) { @@ -675,7 +676,23 @@ protected Object executeRules( RuleExecutor ruleExecutor = getRuleExecutor(ctx); if (ruleExecutor != null) { try { - message = ruleExecutor.transform(ctx, message); + Object result = ruleExecutor.transform(ctx, message); + switch (rule.getKind()) { + case CONDITION: + if (Boolean.FALSE.equals(result)) { + String expr = rule.getExpr(); + String errMsg = expr != null + ? "Expr failed: '" + expr + "'" + : "Condition failed: '" + rule.getName() + "'"; + throw new RuleException(errMsg); + } + break; + case TRANSFORM: + message = result; + break; + default: + throw new IllegalStateException("Unsupported rule kind " + rule.getKind()); + } runAction(ctx, ruleMode, rule, message != null ? rule.getOnSuccess() : rule.getOnFailure(), message, null, message != null ? null : ErrorAction.TYPE @@ -719,6 +736,10 @@ private void runAction(RuleContext ctx, RuleMode ruleMode, Rule rule, String act } if (actionName != null) { RuleAction ruleAction = getRuleAction(ctx, actionName); + if (ruleAction == null) { + log.error("Could not find rule action of type {}", actionName); + throw new ConfigException("Could not find rule action of type " + actionName); + } try { ruleAction.run(ctx, message, ex); } catch (RuleException e) {