Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot handle multiple group-ids with same topic #85

Closed
sn-intrasoft opened this issue Aug 24, 2022 · 3 comments
Closed

Cannot handle multiple group-ids with same topic #85

sn-intrasoft opened this issue Aug 24, 2022 · 3 comments
Assignees
Labels
bug Something isn't working

Comments

@sn-intrasoft
Copy link

Describe the bug
When there multiple consumer group-id consuming the same topic with different payloads, ClassLevelKafkaListenerScanner throws an exception while scanning due to duplicate key of topics.

Dependencies and versions used
springwolf-kafka version 0.7.0.

Stack trace and error logs
i.g.s.s.asyncapi.DefaultChannelsService : An error was encountered during channel scanning with io.github.stavshamir.springwolf.asyncapi.scanners.channels.ClassLevelKafkaListenerScanner@1dd76982: Duplicate key CUSTOMER_PRODUCT_ADPT (attempted merging values ChannelItem(description=null, subscribe=null, publish=Operation(operationId=null, summary=null, description=null, tags=null, externalDocs=null, bindings={kafka=KafkaOperationBinding(groupId=GroupA, clientId=null, bindingVersion=null)}, traits=null, message={oneOf=[Message(name=java.lang.Object, title=Object, payload=io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference@1), Message(name=com.example.PayloadA, title=PayloadA, payload=io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference@1)]}), parameters=null, bindings={kafka=KafkaChannelBinding()}) and ChannelItem(description=null, subscribe=null, publish=Operation(operationId=null, summary=null, description=null, tags=null, externalDocs=null, bindings={kafka=KafkaOperationBinding(groupId=GroupB, clientId=null, bindingVersion=null)}, traits=null, message={oneOf=[Message(name=java.lang.Object, title=Object, payload=io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference@1), Message(name=com.example.PayloadB, title=PayloadB, payload=io.github.stavshamir.springwolf.asyncapi.types.channel.operation.message.PayloadReference@1)]}), parameters=null, bindings={kafka=KafkaChannelBinding()}))

@sn-intrasoft sn-intrasoft added the bug Something isn't working label Aug 24, 2022
@stavshamir
Copy link
Member

Thanks for reporting this issue! I am swamped lately but will try to get to researching this in the coming weeks.

@sn-intrasoft
Copy link
Author

Thanks. More specifically, this case please

@Test
public void scan_componentWithMultipleKafkaListenersAndHandlers() {
    // Given multiple @KafkaListener annotated classes with method(s) annotated with @KafkaHandler
    setClassToScan(KafkaListenerClassWithOneKafkaHandler.class, KafkaListenerClassWithMultipleKafkaHandler.class);

    // When scan is called
    Map<String, ChannelItem> actualChannels = methodLevelKafkaListenerScanner.scan();

    // Then the returned collection contains the channel with message set to oneOf
    Message fooMessage = Message.builder()
            .name(SimpleFoo.class.getName())
            .title(SimpleFoo.class.getSimpleName())
            .payload(PayloadReference.fromModelName(SimpleFoo.class.getSimpleName()))
            .build();

    Message barMessage = Message.builder()
            .name(SimpleBar.class.getName())
            .title(SimpleBar.class.getSimpleName())
            .payload(PayloadReference.fromModelName(SimpleBar.class.getSimpleName()))
            .build();

    Operation operation = Operation.builder()
            .bindings(ImmutableMap.of("kafka", new KafkaOperationBinding()))
            .message(ImmutableMap.of(ONE_OF, ImmutableSet.of(fooMessage, barMessage)))
            .build();

    ChannelItem expectedChannel = ChannelItem.builder()
            .bindings(ImmutableMap.of("kafka", new KafkaChannelBinding()))
            .publish(operation)
            .build();

    assertThat(actualChannels)
            .containsExactly(Maps.immutableEntry(TOPIC, expectedChannel));
}

@stavshamir stavshamir self-assigned this Oct 23, 2022
stavshamir added a commit that referenced this issue Oct 23, 2022
- Added support for multiple class level kafka listeners for the same topic

- `operationId` for class level kafka listeners is now assigned the value of the class name instead of the first method name in that class
@sn-intrasoft
Copy link
Author

Thanks @stavshamir 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants