diff --git a/ballerina/tests/consumer_constraint_tests.bal b/ballerina/tests/consumer_constraint_tests.bal index 78b9b34a..da448fff 100644 --- a/ballerina/tests/consumer_constraint_tests.bal +++ b/ballerina/tests/consumer_constraint_tests.bal @@ -234,7 +234,7 @@ function intMaxValueConstraintListenerConsumerRecordTest() returns error? { Service intConstraintService = service object { remote function onConsumerRecord(IntConstraintConsumerRecord[] records) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { log:printInfo("Received record: " + records[i].toString()); } } @@ -270,7 +270,7 @@ function intMinValueConstraintListenerConsumerRecordTest() returns error? { Service intConstraintService = service object { remote function onConsumerRecord(IntConstraintConsumerRecord[] records) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { log:printInfo("Received record: " + records[i].toString()); } } @@ -306,7 +306,7 @@ function numberMaxValueConstraintListenerPayloadTest() returns error? { Service intConstraintService = service object { remote function onConsumerRecord(Weight[] records) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { log:printInfo("Received record: " + records[i].toString()); } } @@ -342,7 +342,7 @@ function numberMinValueConstraintListenerPayloadTest() returns error? { Service intConstraintService = service object { remote function onConsumerRecord(Weight[] records) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { log:printInfo("Received record: " + records[i].toString()); } } @@ -381,7 +381,7 @@ function validRecordConstraintPayloadTest() returns error? { Service validRecordService = service object { remote function onConsumerRecord(Child[] records) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { log:printInfo("Received record: " + records[i].toString()); receivedValidRecordCount += 1; } @@ -491,7 +491,7 @@ function invalidRecordConstraintWithSeekPayloadTest() returns error? { Service invalidRecordService = service object { remote function onConsumerRecord(Child[] records) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { log:printInfo("Received record: " + records[i].toString()); receivedSeekedValidRecordCount += 1; } diff --git a/ballerina/tests/listener_client_tests.bal b/ballerina/tests/listener_client_tests.bal index 421ae4bc..9bdeab17 100644 --- a/ballerina/tests/listener_client_tests.bal +++ b/ballerina/tests/listener_client_tests.bal @@ -965,7 +965,7 @@ function listenerWithPollTimeoutConfigTest() returns error? { Service configService = service object { remote function onConsumerRecord(string[] records) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { receivedTimeoutConfigValue = records[i]; } } diff --git a/ballerina/tests/listener_data_binding_tests.bal b/ballerina/tests/listener_data_binding_tests.bal index 2ac51e17..4085e762 100644 --- a/ballerina/tests/listener_data_binding_tests.bal +++ b/ballerina/tests/listener_data_binding_tests.bal @@ -224,7 +224,7 @@ function intConsumerRecordBindingListenerTest() returns error? { Service intBindingService = service object { remote function onConsumerRecord(readonly & IntConsumerRecord[] records, Caller caller) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { receivedIntValue = records[i].value; log:printInfo("Received record: " + records[i].toString()); } @@ -259,7 +259,7 @@ function floatConsumerRecordBindingListenerTest() returns error? { Service floatBindingService = service object { remote function onConsumerRecord(FloatConsumerRecord[] & readonly records, Caller caller) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { receivedFloatValue = records[i].value; log:printInfo("Received record: " + records[i].toString()); } @@ -294,7 +294,7 @@ function decimalConsumerRecordBindingListenerTest() returns error? { Service decimalBindingService = service object { remote function onConsumerRecord(DecimalConsumerRecord[] records, Caller caller) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { receivedDecimalValue = records[i].value; log:printInfo("Received record: " + records[i].toString()); } @@ -329,7 +329,7 @@ function booleanConsumerRecordBindingListenerTest() returns error? { Service booleanBindingService = service object { remote function onConsumerRecord(BooleanConsumerRecord[] records, Caller caller) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { receivedBooleanValue = records[i].value; log:printInfo("Received record: " + records[i].toString()); } @@ -366,7 +366,7 @@ function stringConsumerRecordListenerTest() returns error? { Service stringBindingService = service object { remote function onConsumerRecord(StringConsumerRecord[] records) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { receivedStringValue = records[i].value; log:printInfo("Received record: " + records[i].toString()); } @@ -404,7 +404,7 @@ function xmlConsumerRecordListenerTest() returns error? { Service xmlBindingService = service object { remote function onConsumerRecord(Caller caller, XmlConsumerRecord[] records) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { receivedXmlValue = records[i].value; log:printInfo("Received record: " + records[i].toString()); } @@ -441,7 +441,7 @@ function recordConsumerRecordListenerTest() returns error? { Service recordBindingService = service object { remote function onConsumerRecord(PersonConsumerRecord[] records, Caller caller) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { receivedPersonValue = records[i].value; log:printInfo("Received record: " + records[i].toString()); } @@ -478,7 +478,7 @@ function mapConsumerRecordListenerTest() returns error? { Service mapBindingService = service object { remote function onConsumerRecord(MapConsumerRecord[] records, Caller caller) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { receivedMapValue = records[i].value; log:printInfo("Received record: " + records[i].toString()); } @@ -518,7 +518,7 @@ function tableConsumerRecordListenerTest() returns error? { Service tableBindingService = service object { remote function onConsumerRecord(TableConsumerRecord[] records) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { receivedTableValue = records[i].value; log:printInfo("Received record: " + records[i].toString()); } @@ -555,7 +555,7 @@ function jsonConsumerRecordListenerTest() returns error? { Service jsonBindingService = service object { remote function onConsumerRecord(Caller caller, JsonConsumerRecord[] records) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { receivedJsonValue = records[i].value; log:printInfo("Received record: " + records[i].toString()); } @@ -630,7 +630,7 @@ function intPayloadBindingListenerTest() returns error? { Service intBindingService = service object { remote function onConsumerRecord(int[] payload) returns error? { - foreach int i in 0 ... payload.length() { + foreach int i in 0 ... payload.length() - 1 { receivedIntPayload = payload[i]; log:printInfo("Received record: " + payload[i].toString()); } @@ -665,7 +665,7 @@ function floatPayloadBindingListenerTest() returns error? { Service floatBindingService = service object { remote function onConsumerRecord(float[] payload) returns error? { - foreach int i in 0 ... payload.length() { + foreach int i in 0 ... payload.length() - 1 { receivedFloatPayload = payload[i]; log:printInfo("Received record: " + payload[i].toString()); } @@ -700,7 +700,7 @@ function decimalPayloadBindingListenerTest() returns error? { Service decimalBindingService = service object { remote function onConsumerRecord(decimal[] payload) returns error? { - foreach int i in 0 ... payload.length() { + foreach int i in 0 ... payload.length() - 1 { receivedDecimalPayload = payload[i]; log:printInfo("Received record: " + payload[i].toString()); } @@ -735,7 +735,7 @@ function booleanPayloadBindingListenerTest() returns error? { Service booleanBindingService = service object { remote function onConsumerRecord(boolean[] payload, Caller caller) returns error? { - foreach int i in 0 ... payload.length() { + foreach int i in 0 ... payload.length() - 1 { receivedBooleanPayload = payload[i]; log:printInfo("Received record: " + payload[i].toString()); } @@ -772,7 +772,7 @@ function stringPayloadListenerTest() returns error? { Service stringBindingService = service object { remote function onConsumerRecord(StringConsumerRecord[] records, string[] payload) returns error? { - foreach int i in 0 ... payload.length() { + foreach int i in 0 ... payload.length() - 1 { receivedStringPayload = payload[i]; log:printInfo("Received record: " + payload[i].toString()); } @@ -810,7 +810,7 @@ function xmlPayloadListenerTest() returns error? { Service xmlBindingService = service object { remote function onConsumerRecord(Caller caller, xml[] payload) returns error? { - foreach int i in 0 ... payload.length() { + foreach int i in 0 ... payload.length() - 1 { receivedXmlPayload = payload[i]; log:printInfo("Received record: " + payload[i].toString()); } @@ -847,7 +847,7 @@ function recordPayloadListenerTest() returns error? { Service recordBindingService = service object { remote function onConsumerRecord(Person[] payload, Caller caller, PersonConsumerRecord[] records) returns error? { - foreach int i in 0 ... payload.length() { + foreach int i in 0 ... payload.length() - 1 { receivedPersonPayload = payload[i]; log:printInfo("Received record: " + payload[i].toString()); } @@ -884,7 +884,7 @@ function mapPayloadListenerTest() returns error? { Service mapBindingService = service object { remote function onConsumerRecord(map[] payload) returns error? { - foreach int i in 0 ... payload.length() { + foreach int i in 0 ... payload.length() - 1 { receivedMapPayload = payload[i]; log:printInfo("Received record: " + payload[i].toString()); } @@ -924,7 +924,7 @@ function tablePayloadListenerTest() returns error? { Service tableBindingService = service object { remote function onConsumerRecord(table[] payload, TableConsumerRecord[] records) returns error? { - foreach int i in 0 ... payload.length() { + foreach int i in 0 ... payload.length() - 1 { receivedTablePayload = payload[i]; log:printInfo("Received record: " + payload[i].toString()); } @@ -961,7 +961,7 @@ function jsonPayloadListenerTest() returns error? { Service jsonBindingService = service object { remote function onConsumerRecord(json[] payload) returns error? { - foreach int i in 0 ... payload.length() { + foreach int i in 0 ... payload.length() - 1 { receivedJsonPayload = payload[i]; log:printInfo("Received record: " + payload[i].toString()); } @@ -998,7 +998,7 @@ function payloadConsumerRecordListenerTest() returns error? { Service payloadRecordBindingService = service object { remote function onConsumerRecord(@Payload PayloadConsumerRecord[] payloadRecords, JsonConsumerRecord[] consumerRecords) returns error? { - foreach int i in 0 ... payloadRecords.length() { + foreach int i in 0 ... payloadRecords.length() - 1 { receivedPayloadConsumerRecordValue = payloadRecords[i]; log:printInfo("Received record: " + payloadRecords[i].toString()); } @@ -1166,7 +1166,7 @@ function invalidRecordPayloadWithSeekListenerTest() returns error? { Service invalidRecordService = service object { remote function onConsumerRecord(PersonConsumerRecord[] records) returns error? { - foreach int i in 0 ... records.length() { + foreach int i in 0 ... records.length() - 1 { log:printInfo("Received record: " + records[i].toString()); receivedSeekedValidRecordListenerCount += 1; } diff --git a/build.gradle b/build.gradle index e63a52ca..147a74c6 100644 --- a/build.gradle +++ b/build.gradle @@ -16,7 +16,7 @@ */ plugins { - id "com.github.spotbugs" version "4.0.5" + id "com.github.spotbugs" version "4.2.3" id "com.github.johnrengelman.shadow" version "5.2.0" id "de.undercouch.download" version "4.0.4" id "net.researchgate.release" version "2.8.0" diff --git a/changelog.md b/changelog.md index a1759ca9..1f0ebc53 100644 --- a/changelog.md +++ b/changelog.md @@ -4,14 +4,25 @@ This file contains all the notable changes done to the Ballerina Kafka package t The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Changed +- [Exit the service when a panic occurs inside the service method](https://github.com/ballerina-platform/ballerina-standard-library/issues/4241) + +## [3.6.0] - 2023-02-20 ### Added - [Added a feature to autoseek when a payload binding fails or constraint validation fails](https://github.com/ballerina-platform/ballerina-standard-library/issues/3827) +### Changed +- [Stopped Java client logs from appearing in the console](https://github.com/ballerina-platform/ballerina-standard-library/issues/3698) + ### Fixed - [Fixed consumer panic when a type casting error occurs](https://github.com/ballerina-platform/ballerina-standard-library/issues/3696) - [Fixed not logging errors returned from the kafka `onConsumerRecord` method](https://github.com/ballerina-platform/ballerina-standard-library/issues/3884) +## [3.5.1] - 2023-01-27 +### Changed +- [Changed the send operation of `kafka:Producer` to be non-blocking](https://github.com/ballerina-platform/ballerina-standard-library/issues/3991) + ## [3.5.0] - 2022-11-29 ### Changed diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaListenerImpl.java b/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaListenerImpl.java index ea0276e8..bd9c3974 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaListenerImpl.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaListenerImpl.java @@ -20,6 +20,7 @@ import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.Runtime; +import io.ballerina.runtime.api.async.Callback; import io.ballerina.runtime.api.async.StrandMetadata; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.ArrayType; @@ -157,10 +158,10 @@ private void executeOnError(MethodType onErrorMethod, Throwable throwable) { ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(service.getType()); if (serviceType.isIsolated() && serviceType.isIsolated(KAFKA_RESOURCE_ON_ERROR)) { bRuntime.invokeMethodAsyncConcurrently(service, KAFKA_RESOURCE_ON_ERROR, null, metadata, - null, properties, PredefinedTypes.TYPE_NULL, arguments); + new KafkaOnErrorCallback(), properties, PredefinedTypes.TYPE_NULL, arguments); } else { bRuntime.invokeMethodAsyncSequentially(service, KAFKA_RESOURCE_ON_ERROR, null, metadata, - null, properties, PredefinedTypes.TYPE_NULL, arguments); + new KafkaOnErrorCallback(), properties, PredefinedTypes.TYPE_NULL, arguments); } } @@ -289,4 +290,19 @@ private StrandMetadata getStrandMetadata(String parentFunctionName) { return new StrandMetadata(ModuleUtils.getModule().getOrg(), ModuleUtils.getModule().getName(), ModuleUtils.getModule().getMajorVersion(), parentFunctionName); } + + static class KafkaOnErrorCallback implements Callback { + @Override + public void notifySuccess(Object result) { + if (result instanceof BError) { + ((BError) result).printStackTrace(); + } + } + + @Override + public void notifyFailure(BError bError) { + bError.printStackTrace(); + System.exit(1); + } + } } diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaPollCycleFutureListener.java b/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaPollCycleFutureListener.java index 1bb1f4f4..340da3da 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaPollCycleFutureListener.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaPollCycleFutureListener.java @@ -73,8 +73,12 @@ public void notifySuccess(Object obj) { @Override public void notifyFailure(BError error) { sem.release(); - logger.error("Ballerina engine has completed resource invocation with exception for service " + serviceId + - ". Semaphore is released to continue next polling cycle.", error.toString()); + if (logger.isDebugEnabled()) { + logger.error("Ballerina engine has completed resource invocation with exception for service " + serviceId + + ". Semaphore is released to continue next polling cycle.", error.toString()); + } + error.printStackTrace(); + System.exit(1); } } diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml index 60288aab..2772e6ba 100644 --- a/spotbugs-exclude.xml +++ b/spotbugs-exclude.xml @@ -34,4 +34,12 @@ + + + + + + + +