Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi request to KafkaRequestReply #2761

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions documentation/src/main/docs/kafka/request-reply.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ Like the core Emitter's `send` methods, `request` method also can receive a `Mes
The ingested reply type of the `KafkaRequestReply` is discovered at runtime,
in order to configure a `MessageConveter` to be applied on the incoming message before returning the `Uni` result.

## Requesting multiple replies

You can use the `requestMulti` method to expect any number of replies represented by the `Multi` return type.

For example this can be used to aggregate multiple replies to a single request.

``` java
{{ insert('kafka/outbound/KafkaRequestReplyMultiEmitter.java') }}
```
Like the other `request` you can also request `Message` types.

!!! note
The channel attribute `reply.timeout` will be applied between each message, if reached the returned `Multi` will
fail.

## Scaling Request/Reply

If multiple requestor instances are configured on the same outgoing topic, and the same reply topic,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.acme;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;

@ApplicationScoped
public class KafkaRequestReplyMultiEmitter {

@Inject
@Channel("my-request")
KafkaRequestReply<String, Integer> quoteRequest;

public Multi<Integer> requestQuote(String request) {
return quoteRequest.requestMulti(request).select().first(5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.EmitterType;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
Expand Down Expand Up @@ -127,6 +128,22 @@ public interface KafkaRequestReply<Req, Rep> extends EmitterType {
*/
Uni<Message<Rep>> request(Message<Req> request);

/**
* Sends a request and receives responses.
*
* @param request the request object to be sent
* @return a Multi object representing the results of the send and receive operation
*/
Multi<Rep> requestMulti(Req request);

/**
* Sends a request and receives responses.
*
* @param request the request object to be sent
* @return a Multi object representing the results of the send and receive operation
*/
Multi<Message<Rep>> requestMulti(Message<Req> request);

/**
* Blocks until the consumer has been assigned all partitions for consumption.
* If a {@code reply.partition} is provided, waits only for the assignment of that particular partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.reactive.messaging.ClientCustomizer;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
Expand Down Expand Up @@ -181,6 +181,12 @@ public void complete() {
pendingReplies.size(), pendingReplies.keySet());
}
}
for (CorrelationId correlationId : pendingReplies.keySet()) {
PendingReplyImpl<Rep> reply = pendingReplies.remove(correlationId);
if (reply != null) {
reply.complete();
}
}
replySource.closeQuietly();
}

Expand All @@ -194,12 +200,22 @@ private void grace(Duration duration) {

@Override
public Uni<Rep> request(Req request) {
return request(ContextAwareMessage.of(request))
.map(Message::getPayload);
return requestMulti(request).toUni();
}

@Override
public Uni<Message<Rep>> request(Message<Req> request) {
return requestMulti(request).toUni();
}

@Override
public Multi<Rep> requestMulti(Req request) {
return requestMulti(ContextAwareMessage.of(request))
.map(Message::getPayload);
}

@Override
public Multi<Message<Rep>> requestMulti(Message<Req> request) {
var builder = request.getMetadata(OutgoingKafkaRecordMetadata.class)
.map(metadata -> OutgoingKafkaRecordMetadata.from(metadata))
.orElseGet(OutgoingKafkaRecordMetadata::builder);
Expand All @@ -213,16 +229,26 @@ public Uni<Message<Rep>> request(Message<Req> request) {
OutgoingMessageMetadata<RecordMetadata> outMetadata = new OutgoingMessageMetadata<>();
return sendMessage(request.addMetadata(builder.build()).addMetadata(outMetadata))
.invoke(() -> subscription.get().request(1))
.chain(unused -> Uni.createFrom().<Message<Rep>> emitter(emitter -> pendingReplies.put(correlationId,
new PendingReplyImpl<>(outMetadata.getResult(), replyTopic, replyPartition,
(UniEmitter<Message<Rep>>) emitter)))
.ifNoItem().after(replyTimeout).fail())
.onItemOrFailure().invoke(() -> pendingReplies.remove(correlationId))
.plug(uni -> replyFailureHandler != null ? uni.onItem().transformToUni(f -> {
Throwable failure = replyFailureHandler.handleReply((KafkaRecord<?, ?>) f);
return failure != null ? Uni.createFrom().failure(failure) : Uni.createFrom().item(f);
}) : uni)
.plug(uni -> replyConverter != null ? uni.map(f -> replyConverter.apply(f)) : uni);
.onItem()
.transformToMulti(unused -> Multi.createFrom().<Message<Rep>> emitter(emitter -> {
pendingReplies.put(correlationId,
new PendingReplyImpl<>(outMetadata.getResult(),
replyTopic,
replyPartition,
(MultiEmitter<Message<Rep>>) emitter));
}))
.ifNoItem().after(replyTimeout).failWith(() -> new KafkaRequestReplyTimeoutException(correlationId))
.onItem().transformToUniAndConcatenate(m -> {
if (replyFailureHandler != null) {
Throwable failure = replyFailureHandler.handleReply((KafkaRecord<?, ?>) m);
if (failure != null) {
return Uni.createFrom().failure(failure);
}
}
return Uni.createFrom().item(m);
})
.onTermination().invoke(() -> pendingReplies.remove(correlationId))
.plug(multi -> replyConverter != null ? multi.map(f -> replyConverter.apply(f)) : multi);
}

@Override
Expand Down Expand Up @@ -271,10 +297,9 @@ public void onItem(KafkaRecord<?, Rep> record) {
// If reply topic header is NOT null, it is considered a request not a reply
if (header != null && record.getHeaders().lastHeader(replyTopicHeader) == null) {
CorrelationId correlationId = correlationIdHandler.parse(header.value());
PendingReplyImpl<Rep> reply = pendingReplies.remove(correlationId);
PendingReplyImpl<Rep> reply = pendingReplies.get(correlationId);
if (reply != null) {
reply.getEmitter().complete(record);
return;
reply.getEmitter().emit(record);
} else {
log.requestReplyRecordIgnored(channel, record.getTopic(), correlationId.toString());
}
Expand All @@ -298,10 +323,10 @@ public static class PendingReplyImpl<Rep> implements PendingReply {
private final RecordMetadata metadata;
private final String replyTopic;
private final int replyPartition;
private final UniEmitter<Message<Rep>> emitter;
private final MultiEmitter<Message<Rep>> emitter;

public PendingReplyImpl(RecordMetadata metadata, String replyTopic, int replyPartition,
UniEmitter<Message<Rep>> emitter) {
MultiEmitter<Message<Rep>> emitter) {
this.replyTopic = replyTopic;
this.replyPartition = replyPartition;
this.metadata = metadata;
Expand All @@ -323,7 +348,17 @@ public RecordMetadata recordMetadata() {
return metadata;
}

public UniEmitter<Message<Rep>> getEmitter() {
@Override
public void complete() {
emitter.complete();
}

@Override
public boolean isCancelled() {
return emitter.isCancelled();
}

public MultiEmitter<Message<Rep>> getEmitter() {
return emitter;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.smallrye.reactive.messaging.kafka.reply;

/**
* Exception thrown when a reply is not received within the configured timeout.
*/
public class KafkaRequestReplyTimeoutException extends RuntimeException {

public KafkaRequestReplyTimeoutException(CorrelationId correlationId) {
super("Timeout waiting for a reply for request with correlation ID: " + correlationId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,14 @@ public interface PendingReply {
* @return the recordMetadata of the request
*/
RecordMetadata recordMetadata();

/**
* Complete the pending reply.
*/
void complete();

/**
* @return whether the pending reply was terminated (with a completion or failure).
*/
boolean isCancelled();
}
Loading
Loading