From 589f2995e767b256944600a27ebccb01850b5ee8 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 6 Sep 2023 13:51:44 -0700 Subject: [PATCH 1/7] MINOR allow rule class prop to be passed as class instance --- .../schemaregistry/rules/jsonata/JsonataExecutorTest.java | 2 +- .../kafka/serializers/AbstractKafkaSchemaSerDe.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java index b289b78387b..58ffe874a22 100644 --- a/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java +++ b/schema-rules/src/test/java/io/confluent/kafka/schemaregistry/rules/jsonata/JsonataExecutorTest.java @@ -98,7 +98,7 @@ public JsonataExecutorTest() { defaultConfig.put(KafkaAvroSerializerConfig.LATEST_COMPATIBILITY_STRICT, "false"); defaultConfig.put(KafkaAvroSerializerConfig.RULE_EXECUTORS, "jsonata"); defaultConfig.put(KafkaAvroSerializerConfig.RULE_EXECUTORS + ".jsonata.class", - JsonataExecutor.class.getName()); + JsonataExecutor.class); avroSerializer = new KafkaAvroSerializer(schemaRegistry, defaultConfig); Map defaultConfig2 = new HashMap<>(defaultConfig); defaultConfig2.put(KafkaAvroSerializerConfig.USE_LATEST_WITH_METADATA, diff --git a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java index a79fe91ddaa..3ecf922944c 100644 --- a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java +++ b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java @@ -229,9 +229,10 @@ private Stream initRuleObject( if (propertyValue == null) { return Stream.empty(); } - String className = propertyValue.toString(); try { - RuleBase ruleObject = Utils.newInstance(className, RuleBase.class); + RuleBase ruleObject = propertyValue instanceof Class ? + Utils.newInstance((Class) propertyValue, RuleBase.class) : + Utils.newInstance(propertyValue.toString(), RuleBase.class); configureRuleObject(ruleObject, name, config, configName); return Stream.of(ruleObject); } catch (ClassNotFoundException e) { From 6725070cefe902a7d3da8297ba7552d5b8299686 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 6 Sep 2023 14:21:40 -0700 Subject: [PATCH 2/7] Minor fix --- .../kafka/serializers/AbstractKafkaSchemaSerDe.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java index 3ecf922944c..de9d33d5f39 100644 --- a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java +++ b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java @@ -230,9 +230,9 @@ private Stream initRuleObject( return Stream.empty(); } try { - RuleBase ruleObject = propertyValue instanceof Class ? - Utils.newInstance((Class) propertyValue, RuleBase.class) : - Utils.newInstance(propertyValue.toString(), RuleBase.class); + RuleBase ruleObject = propertyValue instanceof Class + ? Utils.newInstance((Class) propertyValue, RuleBase.class) + : Utils.newInstance(propertyValue.toString(), RuleBase.class); configureRuleObject(ruleObject, name, config, configName); return Stream.of(ruleObject); } catch (ClassNotFoundException e) { @@ -720,6 +720,9 @@ private void runAction(RuleContext ctx, RuleMode ruleMode, Rule rule, String act } if (actionName != null) { RuleAction ruleAction = getRuleAction(ctx, actionName); + if (ruleAction == null) { + throw new IllegalArgumentException("Could not find rule action of type " + actionName); + } try { ruleAction.run(ctx, message, ex); } catch (RuleException e) { From 2b806b1710a3d9b561ea8793560e534598109f10 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 6 Sep 2023 14:50:05 -0700 Subject: [PATCH 3/7] Add log --- .../io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java | 1 + 1 file changed, 1 insertion(+) diff --git a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java index de9d33d5f39..aff1e6b2348 100644 --- a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java +++ b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java @@ -721,6 +721,7 @@ private void runAction(RuleContext ctx, RuleMode ruleMode, Rule rule, String act if (actionName != null) { RuleAction ruleAction = getRuleAction(ctx, actionName); if (ruleAction == null) { + log.error("Could not find rule action of type {}", actionName); throw new IllegalArgumentException("Could not find rule action of type " + actionName); } try { From dfd123ada9be637c0486b4c6c89b1a723bb02745 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 6 Sep 2023 15:13:07 -0700 Subject: [PATCH 4/7] Fix exception type --- .../confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java index aff1e6b2348..217d8fb68a5 100644 --- a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java +++ b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java @@ -722,7 +722,7 @@ private void runAction(RuleContext ctx, RuleMode ruleMode, Rule rule, String act RuleAction ruleAction = getRuleAction(ctx, actionName); if (ruleAction == null) { log.error("Could not find rule action of type {}", actionName); - throw new IllegalArgumentException("Could not find rule action of type " + actionName); + throw new ConfigException("Could not find rule action of type " + actionName); } try { ruleAction.run(ctx, message, ex); From 7860a6b23798e9396ceda6685b202e89e7d41adf Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 6 Sep 2023 16:55:14 -0700 Subject: [PATCH 5/7] Minor fix --- checkstyle/suppressions.xml | 2 +- .../schemaregistry/rules/cel/CelExecutor.java | 24 +++++++------------ .../serializers/AbstractKafkaSchemaSerDe.java | 14 ++++++++++- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 325728a7750..32ce64c59ef 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -28,7 +28,7 @@ files="(AvroData|ConfigResource|DownloadSchemaRegistryMojo|KafkaSchemaRegistry|KafkaStore|KafkaStoreReaderThread|MessageDefinition|Schema|SchemaValue|SchemaDiff|MessageSchemaDiff|AbstractKafkaSchemaSerDe|AbstractKafkaAvroSerializer|AbstractKafkaAvroDeserializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufDeserializer|ProtobufData|ProtobufSchemaUtils|JsonSchemaData|SchemaMessageFormatter|SchemaMessageReader|ContextFilter|QualifiedSubject|SubjectVersionsResource|Rule|WildcardMatcher|JsonSchemaComparator|LocalSchemaRegistryClient|DataEncryptionKeyId|FieldEncryptionExecutor).java"/> + files="(AbstractKafkaAvroSerializer|AbstractKafkaJsonSchemaSerializer|AbstractKafkaJsonSchemaDeserializer|AbstractKafkaProtobufSerializer|AbstractKafkaProtobufDeserializer|AbstractKafkaSchemaSerDe|AvroData|AvroSchema|AvroSchemaUtils|ProtobufData|SchemaDiff|NumberSchemaDiff|JsonSchema|JsonSchemaData|KafkaSchemaRegistry|KafkaStoreReaderThread|ProtobufSchema|ProtobufSchemaUtils|JsonSchemaComparator|SchemaMessageFormatter|SchemaMessageReader).java"/> diff --git a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java index 7fe9ff4749b..c3e1771689a 100644 --- a/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java +++ b/schema-rules/src/main/java/io/confluent/kafka/schemaregistry/rules/cel/CelExecutor.java @@ -134,24 +134,16 @@ public Object transform(RuleContext ctx, Object message) throws RuleException { input = message; } Object result = execute(ctx, input, ImmutableMap.of("message", input)); - if (ctx.rule().getKind() == RuleKind.CONDITION) { - if (Boolean.TRUE.equals(result)) { - return message; - } else { - throw new RuleException("Expr failed: '" + ctx.rule().getExpr() + "'"); - } - } else { - if (result instanceof Map) { - // Convert maps to the target object type - try { - JsonNode jsonNode = mapper.valueToTree(result); - result = ctx.target().fromJson(jsonNode); - } catch (IOException e) { - throw new RuleException(e); - } + if (result instanceof Map) { + // Convert maps to the target object type + try { + JsonNode jsonNode = mapper.valueToTree(result); + result = ctx.target().fromJson(jsonNode); + } catch (IOException e) { + throw new RuleException(e); } - return result; } + return result; } protected Object execute( diff --git a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java index 217d8fb68a5..894cec88877 100644 --- a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java +++ b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java @@ -676,7 +676,19 @@ protected Object executeRules( RuleExecutor ruleExecutor = getRuleExecutor(ctx); if (ruleExecutor != null) { try { - message = ruleExecutor.transform(ctx, message); + Object result = ruleExecutor.transform(ctx, message); + switch (ctx.rule().getKind()) { + case CONDITION: + if (Boolean.FALSE.equals(result)) { + throw new RuleException("Expr failed: '" + ctx.rule().getExpr() + "'"); + } + break; + case TRANSFORM: + message = result; + break; + default: + throw new IllegalStateException("Unsupported rule kind " + ctx.rule().getKind()); + } runAction(ctx, ruleMode, rule, message != null ? rule.getOnSuccess() : rule.getOnFailure(), message, null, message != null ? null : ErrorAction.TYPE From ea7054bbb6125a0e3a765d746347a4f5d054e50c Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 6 Sep 2023 16:58:45 -0700 Subject: [PATCH 6/7] Minor fix --- .../kafka/serializers/AbstractKafkaSchemaSerDe.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java index 894cec88877..8ea46fa8105 100644 --- a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java +++ b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java @@ -680,7 +680,14 @@ protected Object executeRules( switch (ctx.rule().getKind()) { case CONDITION: if (Boolean.FALSE.equals(result)) { - throw new RuleException("Expr failed: '" + ctx.rule().getExpr() + "'"); + String expr = ctx.rule().getExpr(); + String errMsg; + if (expr != null) { + errMsg = "Expr failed: '" + expr + "'"; + } else { + errMsg = "Condition failed: '" + ctx.rule().getName() + "'"; + } + throw new RuleException(errMsg); } break; case TRANSFORM: From 07027861273370c49f4dc6b9cdb5e577cdf1dd64 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Wed, 6 Sep 2023 17:03:14 -0700 Subject: [PATCH 7/7] Minor cleanup --- .../serializers/AbstractKafkaSchemaSerDe.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java index 8ea46fa8105..df27ab27086 100644 --- a/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java +++ b/schema-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaSchemaSerDe.java @@ -677,16 +677,13 @@ protected Object executeRules( if (ruleExecutor != null) { try { Object result = ruleExecutor.transform(ctx, message); - switch (ctx.rule().getKind()) { + switch (rule.getKind()) { case CONDITION: if (Boolean.FALSE.equals(result)) { - String expr = ctx.rule().getExpr(); - String errMsg; - if (expr != null) { - errMsg = "Expr failed: '" + expr + "'"; - } else { - errMsg = "Condition failed: '" + ctx.rule().getName() + "'"; - } + String expr = rule.getExpr(); + String errMsg = expr != null + ? "Expr failed: '" + expr + "'" + : "Condition failed: '" + rule.getName() + "'"; throw new RuleException(errMsg); } break; @@ -694,7 +691,7 @@ protected Object executeRules( message = result; break; default: - throw new IllegalStateException("Unsupported rule kind " + ctx.rule().getKind()); + throw new IllegalStateException("Unsupported rule kind " + rule.getKind()); } runAction(ctx, ruleMode, rule, message != null ? rule.getOnSuccess() : rule.getOnFailure(),