Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(InMemoryConnector) Sink does not receive message as per Kafka docs #19059

Closed
chris-asl opened this issue Jul 28, 2021 · 5 comments · Fixed by #20137
Closed

(InMemoryConnector) Sink does not receive message as per Kafka docs #19059

chris-asl opened this issue Jul 28, 2021 · 5 comments · Fixed by #20137
Labels
area/kafka kind/bug Something isn't working
Milestone

Comments

@chris-asl
Copy link
Contributor

chris-asl commented Jul 28, 2021

Describe the bug

I'm testing reactive messaging using a Kafka connector.

I want to test with the InMemoryConnector as provided by SmallRye, here and outlined in the related guide on the Testing without a broker section.

FYI @el10686

Expected behavior

The message should be received by the InMemorySink.

Actual behavior

The message is not received at all.

Running the test with ./mvnw clean test -Dtest=InMemoryConnectorTest and awaitility condition check times out with

[ERROR] org.acme.InMemoryConnectorTest.test  Time elapsed: 10.696 s  <<< ERROR!
org.awaitility.core.ConditionTimeoutException: Lambda expression in org.acme.InMemoryConnectorTest that uses io.smallrye.reactive.messaging.connectors.InMemorySink: expected the predicate to return <true> but it returned <false> for input of <[]> within 10 seconds.
	at org.acme.InMemoryConnectorTest.test(InMemoryConnectorTest.java:36)

How to Reproduce?

Here's a link to a reproducer:
https://github.com/chris-asl/quarkus-reactive-messaging-inmemory

I've included two tests:

  1. InMemoryConnectorTest follows the Kafka documentation code but fails, run with ./mvnw clean test -Dtest=InMemoryConnectorTest
  2. InMemoryProducerConsumerTest uses the in memory connector (and passes) but decouples testing of the two components (Consumer, Producer), run with ./mvnw clean test -Dtest=InMemoryProducerConsumerTest

Output of uname -a or ver

Linux homer 5.8.0-59-generic #66~20.04.1-Ubuntu SMP Thu Jun 17 11:14:10 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

Output of java -version

openjdk version "11.0.11" 2021-04-20 OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2.20.04) OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2.20.04, mixed mode, sharing)

GraalVM version (if different from Java)

NA

Quarkus version or git rev

2.0.3.Final

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)

Additional information

No response

@chris-asl chris-asl added the kind/bug Something isn't working label Jul 28, 2021
@quarkus-bot
Copy link

quarkus-bot bot commented Jul 28, 2021

/cc @cescoffier

@geoand
Copy link
Contributor

geoand commented Jul 29, 2021

cc @ozangunalp

@ozangunalp
Copy link
Contributor

@chris-asl thanks for the reproducer, it helped a lot.

I hear that this is very misleading, we need to explain this more in the documentation:

The (very confusingly named) InMemorySink and InMemorySource are independent outgoing and incoming in-memory channels.
The fact that your application.yml contains connector: smallrye-kafka and topic: order config doesn't make them work like Kafka and transmit messages sent to the order topic.

The InMemoryConnectorTest will work if you add a processor component which does just that:

    @Incoming("order")
    @Outgoing("order-sink")
    Order consume(Order order) {
        System.out.println("Order received " + order.getFood());
        return order;
    }

It is for the same reason that the InMemoryProducerConsumerTest works as expected.

A side note: Kafka dev service creates an in-container Kafka broker for the tests. It spins up fast and is configured by default.
If you want you can disable it with quarkus.kafka.devservices.enabled: false

Hope this helps.

@chris-asl
Copy link
Contributor Author

I see, this makes it much more clearer (the independent part).
I've added a commit to the reproducer which uses the example from the Quarkus Kafka docs with the addition of said processor.

I believe the independent clarification along with the processor component, should be also added to the documentation.
I could prepare a patch along the lines of #18851, if you'd like.

@cescoffier
Copy link
Member

@chris-asl Yes! That would be awesome!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka kind/bug Something isn't working
Projects
None yet
4 participants