Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exit service after a panic happens inside the onConsumerRecord method and onError method #822

Merged
merged 6 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions ballerina/tests/consumer_constraint_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ function intMaxValueConstraintListenerConsumerRecordTest() returns error? {
Service intConstraintService =
service object {
remote function onConsumerRecord(IntConstraintConsumerRecord[] records) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
log:printInfo("Received record: " + records[i].toString());
}
}
Expand Down Expand Up @@ -270,7 +270,7 @@ function intMinValueConstraintListenerConsumerRecordTest() returns error? {
Service intConstraintService =
service object {
remote function onConsumerRecord(IntConstraintConsumerRecord[] records) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
log:printInfo("Received record: " + records[i].toString());
}
}
Expand Down Expand Up @@ -306,7 +306,7 @@ function numberMaxValueConstraintListenerPayloadTest() returns error? {
Service intConstraintService =
service object {
remote function onConsumerRecord(Weight[] records) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
log:printInfo("Received record: " + records[i].toString());
}
}
Expand Down Expand Up @@ -342,7 +342,7 @@ function numberMinValueConstraintListenerPayloadTest() returns error? {
Service intConstraintService =
service object {
remote function onConsumerRecord(Weight[] records) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
log:printInfo("Received record: " + records[i].toString());
}
}
Expand Down Expand Up @@ -381,7 +381,7 @@ function validRecordConstraintPayloadTest() returns error? {
Service validRecordService =
service object {
remote function onConsumerRecord(Child[] records) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
log:printInfo("Received record: " + records[i].toString());
receivedValidRecordCount += 1;
}
Expand Down Expand Up @@ -491,7 +491,7 @@ function invalidRecordConstraintWithSeekPayloadTest() returns error? {
Service invalidRecordService =
service object {
remote function onConsumerRecord(Child[] records) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
log:printInfo("Received record: " + records[i].toString());
receivedSeekedValidRecordCount += 1;
}
Expand Down
2 changes: 1 addition & 1 deletion ballerina/tests/listener_client_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,7 @@ function listenerWithPollTimeoutConfigTest() returns error? {
Service configService =
service object {
remote function onConsumerRecord(string[] records) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
receivedTimeoutConfigValue = records[i];
}
}
Expand Down
44 changes: 22 additions & 22 deletions ballerina/tests/listener_data_binding_tests.bal
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ function intConsumerRecordBindingListenerTest() returns error? {
Service intBindingService =
service object {
remote function onConsumerRecord(readonly & IntConsumerRecord[] records, Caller caller) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
receivedIntValue = records[i].value;
log:printInfo("Received record: " + records[i].toString());
}
Expand Down Expand Up @@ -259,7 +259,7 @@ function floatConsumerRecordBindingListenerTest() returns error? {
Service floatBindingService =
service object {
remote function onConsumerRecord(FloatConsumerRecord[] & readonly records, Caller caller) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
receivedFloatValue = records[i].value;
log:printInfo("Received record: " + records[i].toString());
}
Expand Down Expand Up @@ -294,7 +294,7 @@ function decimalConsumerRecordBindingListenerTest() returns error? {
Service decimalBindingService =
service object {
remote function onConsumerRecord(DecimalConsumerRecord[] records, Caller caller) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
receivedDecimalValue = records[i].value;
log:printInfo("Received record: " + records[i].toString());
}
Expand Down Expand Up @@ -329,7 +329,7 @@ function booleanConsumerRecordBindingListenerTest() returns error? {
Service booleanBindingService =
service object {
remote function onConsumerRecord(BooleanConsumerRecord[] records, Caller caller) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
receivedBooleanValue = records[i].value;
log:printInfo("Received record: " + records[i].toString());
}
Expand Down Expand Up @@ -366,7 +366,7 @@ function stringConsumerRecordListenerTest() returns error? {
Service stringBindingService =
service object {
remote function onConsumerRecord(StringConsumerRecord[] records) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
receivedStringValue = records[i].value;
log:printInfo("Received record: " + records[i].toString());
}
Expand Down Expand Up @@ -404,7 +404,7 @@ function xmlConsumerRecordListenerTest() returns error? {
Service xmlBindingService =
service object {
remote function onConsumerRecord(Caller caller, XmlConsumerRecord[] records) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
receivedXmlValue = records[i].value;
log:printInfo("Received record: " + records[i].toString());
}
Expand Down Expand Up @@ -441,7 +441,7 @@ function recordConsumerRecordListenerTest() returns error? {
Service recordBindingService =
service object {
remote function onConsumerRecord(PersonConsumerRecord[] records, Caller caller) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
receivedPersonValue = records[i].value;
log:printInfo("Received record: " + records[i].toString());
}
Expand Down Expand Up @@ -478,7 +478,7 @@ function mapConsumerRecordListenerTest() returns error? {
Service mapBindingService =
service object {
remote function onConsumerRecord(MapConsumerRecord[] records, Caller caller) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
receivedMapValue = records[i].value;
log:printInfo("Received record: " + records[i].toString());
}
Expand Down Expand Up @@ -518,7 +518,7 @@ function tableConsumerRecordListenerTest() returns error? {
Service tableBindingService =
service object {
remote function onConsumerRecord(TableConsumerRecord[] records) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
receivedTableValue = records[i].value;
log:printInfo("Received record: " + records[i].toString());
}
Expand Down Expand Up @@ -555,7 +555,7 @@ function jsonConsumerRecordListenerTest() returns error? {
Service jsonBindingService =
service object {
remote function onConsumerRecord(Caller caller, JsonConsumerRecord[] records) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
receivedJsonValue = records[i].value;
log:printInfo("Received record: " + records[i].toString());
}
Expand Down Expand Up @@ -630,7 +630,7 @@ function intPayloadBindingListenerTest() returns error? {
Service intBindingService =
service object {
remote function onConsumerRecord(int[] payload) returns error? {
foreach int i in 0 ... payload.length() {
foreach int i in 0 ... payload.length() - 1 {
receivedIntPayload = payload[i];
log:printInfo("Received record: " + payload[i].toString());
}
Expand Down Expand Up @@ -665,7 +665,7 @@ function floatPayloadBindingListenerTest() returns error? {
Service floatBindingService =
service object {
remote function onConsumerRecord(float[] payload) returns error? {
foreach int i in 0 ... payload.length() {
foreach int i in 0 ... payload.length() - 1 {
receivedFloatPayload = payload[i];
log:printInfo("Received record: " + payload[i].toString());
}
Expand Down Expand Up @@ -700,7 +700,7 @@ function decimalPayloadBindingListenerTest() returns error? {
Service decimalBindingService =
service object {
remote function onConsumerRecord(decimal[] payload) returns error? {
foreach int i in 0 ... payload.length() {
foreach int i in 0 ... payload.length() - 1 {
receivedDecimalPayload = payload[i];
log:printInfo("Received record: " + payload[i].toString());
}
Expand Down Expand Up @@ -735,7 +735,7 @@ function booleanPayloadBindingListenerTest() returns error? {
Service booleanBindingService =
service object {
remote function onConsumerRecord(boolean[] payload, Caller caller) returns error? {
foreach int i in 0 ... payload.length() {
foreach int i in 0 ... payload.length() - 1 {
receivedBooleanPayload = payload[i];
log:printInfo("Received record: " + payload[i].toString());
}
Expand Down Expand Up @@ -772,7 +772,7 @@ function stringPayloadListenerTest() returns error? {
Service stringBindingService =
service object {
remote function onConsumerRecord(StringConsumerRecord[] records, string[] payload) returns error? {
foreach int i in 0 ... payload.length() {
foreach int i in 0 ... payload.length() - 1 {
receivedStringPayload = payload[i];
log:printInfo("Received record: " + payload[i].toString());
}
Expand Down Expand Up @@ -810,7 +810,7 @@ function xmlPayloadListenerTest() returns error? {
Service xmlBindingService =
service object {
remote function onConsumerRecord(Caller caller, xml[] payload) returns error? {
foreach int i in 0 ... payload.length() {
foreach int i in 0 ... payload.length() - 1 {
receivedXmlPayload = payload[i];
log:printInfo("Received record: " + payload[i].toString());
}
Expand Down Expand Up @@ -847,7 +847,7 @@ function recordPayloadListenerTest() returns error? {
Service recordBindingService =
service object {
remote function onConsumerRecord(Person[] payload, Caller caller, PersonConsumerRecord[] records) returns error? {
foreach int i in 0 ... payload.length() {
foreach int i in 0 ... payload.length() - 1 {
receivedPersonPayload = payload[i];
log:printInfo("Received record: " + payload[i].toString());
}
Expand Down Expand Up @@ -884,7 +884,7 @@ function mapPayloadListenerTest() returns error? {
Service mapBindingService =
service object {
remote function onConsumerRecord(map<Person>[] payload) returns error? {
foreach int i in 0 ... payload.length() {
foreach int i in 0 ... payload.length() - 1 {
receivedMapPayload = payload[i];
log:printInfo("Received record: " + payload[i].toString());
}
Expand Down Expand Up @@ -924,7 +924,7 @@ function tablePayloadListenerTest() returns error? {
Service tableBindingService =
service object {
remote function onConsumerRecord(table<Person>[] payload, TableConsumerRecord[] records) returns error? {
foreach int i in 0 ... payload.length() {
foreach int i in 0 ... payload.length() - 1 {
receivedTablePayload = payload[i];
log:printInfo("Received record: " + payload[i].toString());
}
Expand Down Expand Up @@ -961,7 +961,7 @@ function jsonPayloadListenerTest() returns error? {
Service jsonBindingService =
service object {
remote function onConsumerRecord(json[] payload) returns error? {
foreach int i in 0 ... payload.length() {
foreach int i in 0 ... payload.length() - 1 {
receivedJsonPayload = payload[i];
log:printInfo("Received record: " + payload[i].toString());
}
Expand Down Expand Up @@ -998,7 +998,7 @@ function payloadConsumerRecordListenerTest() returns error? {
Service payloadRecordBindingService =
service object {
remote function onConsumerRecord(@Payload PayloadConsumerRecord[] payloadRecords, JsonConsumerRecord[] consumerRecords) returns error? {
foreach int i in 0 ... payloadRecords.length() {
foreach int i in 0 ... payloadRecords.length() - 1 {
receivedPayloadConsumerRecordValue = payloadRecords[i];
log:printInfo("Received record: " + payloadRecords[i].toString());
}
Expand Down Expand Up @@ -1166,7 +1166,7 @@ function invalidRecordPayloadWithSeekListenerTest() returns error? {
Service invalidRecordService =
service object {
remote function onConsumerRecord(PersonConsumerRecord[] records) returns error? {
foreach int i in 0 ... records.length() {
foreach int i in 0 ... records.length() - 1 {
log:printInfo("Received record: " + records[i].toString());
receivedSeekedValidRecordListenerCount += 1;
}
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

plugins {
id "com.github.spotbugs" version "4.0.5"
id "com.github.spotbugs" version "4.2.3"
id "com.github.johnrengelman.shadow" version "5.2.0"
id "de.undercouch.download" version "4.0.4"
id "net.researchgate.release" version "2.8.0"
Expand Down
11 changes: 11 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,25 @@ This file contains all the notable changes done to the Ballerina Kafka package t
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Changed
- [Exit the service when a panic occurs inside the service method](https://github.com/ballerina-platform/ballerina-standard-library/issues/4241)

## [3.6.0] - 2023-02-20

### Added
- [Added a feature to autoseek when a payload binding fails or constraint validation fails](https://github.com/ballerina-platform/ballerina-standard-library/issues/3827)

### Changed
- [Stopped Java client logs from appearing in the console](https://github.com/ballerina-platform/ballerina-standard-library/issues/3698)

### Fixed
- [Fixed consumer panic when a type casting error occurs](https://github.com/ballerina-platform/ballerina-standard-library/issues/3696)
- [Fixed not logging errors returned from the kafka `onConsumerRecord` method](https://github.com/ballerina-platform/ballerina-standard-library/issues/3884)

## [3.5.1] - 2023-01-27
### Changed
- [Changed the send operation of `kafka:Producer` to be non-blocking](https://github.com/ballerina-platform/ballerina-standard-library/issues/3991)

## [3.5.0] - 2022-11-29

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.ballerina.runtime.api.PredefinedTypes;
import io.ballerina.runtime.api.Runtime;
import io.ballerina.runtime.api.async.Callback;
import io.ballerina.runtime.api.async.StrandMetadata;
import io.ballerina.runtime.api.creators.ValueCreator;
import io.ballerina.runtime.api.types.ArrayType;
Expand Down Expand Up @@ -157,10 +158,10 @@ private void executeOnError(MethodType onErrorMethod, Throwable throwable) {
ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(service.getType());
if (serviceType.isIsolated() && serviceType.isIsolated(KAFKA_RESOURCE_ON_ERROR)) {
bRuntime.invokeMethodAsyncConcurrently(service, KAFKA_RESOURCE_ON_ERROR, null, metadata,
null, properties, PredefinedTypes.TYPE_NULL, arguments);
new KafkaOnErrorCallback(), properties, PredefinedTypes.TYPE_NULL, arguments);
} else {
bRuntime.invokeMethodAsyncSequentially(service, KAFKA_RESOURCE_ON_ERROR, null, metadata,
null, properties, PredefinedTypes.TYPE_NULL, arguments);
new KafkaOnErrorCallback(), properties, PredefinedTypes.TYPE_NULL, arguments);
}
}

Expand Down Expand Up @@ -289,4 +290,19 @@ private StrandMetadata getStrandMetadata(String parentFunctionName) {
return new StrandMetadata(ModuleUtils.getModule().getOrg(),
ModuleUtils.getModule().getName(), ModuleUtils.getModule().getMajorVersion(), parentFunctionName);
}

static class KafkaOnErrorCallback implements Callback {
@Override
public void notifySuccess(Object result) {
if (result instanceof BError) {
((BError) result).printStackTrace();
}
}

@Override
public void notifyFailure(BError bError) {
bError.printStackTrace();
System.exit(1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ public void notifySuccess(Object obj) {
@Override
public void notifyFailure(BError error) {
sem.release();
logger.error("Ballerina engine has completed resource invocation with exception for service " + serviceId +
". Semaphore is released to continue next polling cycle.", error.toString());
if (logger.isDebugEnabled()) {
logger.error("Ballerina engine has completed resource invocation with exception for service " + serviceId +
". Semaphore is released to continue next polling cycle.", error.toString());
}
error.printStackTrace();
System.exit(1);
}

}
8 changes: 8 additions & 0 deletions spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,12 @@
<Class name="io.ballerina.stdlib.kafka.utils.KafkaUtils" />
<Bug pattern="BC_UNCONFIRMED_CAST" />
</Match>
<Match>
<Class name="io.ballerina.stdlib.kafka.impl.KafkaListenerImpl$KafkaOnErrorCallback"/>
<Bug pattern="DM_EXIT"/>
</Match>
<Match>
<Class name="io.ballerina.stdlib.kafka.impl.KafkaPollCycleFutureListener"/>
<Bug pattern="DM_EXIT"/>
</Match>
</FindBugsFilter>