Skip to content

Commit

Permalink
Reintroduce ContextDecorator
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Dec 6, 2022
1 parent 846824b commit f3e0fa4
Show file tree
Hide file tree
Showing 19 changed files with 383 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.smallrye.reactive.messaging.kafka.commit.*;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth;
import io.smallrye.reactive.messaging.providers.locals.ContextOperator;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Vertx;
Expand Down Expand Up @@ -157,8 +156,7 @@ public KafkaSource(Vertx vertx,
incomingMulti = incomingMulti.onItem().invoke(record -> incomingTrace(record, false));
}
this.stream = incomingMulti
.onFailure().invoke(t -> reportFailure(t, false))
.plug(ContextOperator::apply);
.onFailure().invoke(t -> reportFailure(t, false));
this.batchStream = null;
} else {
Multi<ConsumerRecords<K, V>> multi;
Expand Down Expand Up @@ -188,8 +186,7 @@ public KafkaSource(Vertx vertx,
incomingMulti = incomingMulti.onItem().invoke(this::incomingTrace);
}
this.batchStream = incomingMulti
.onFailure().invoke(t -> reportFailure(t, false))
.plug(ContextOperator::apply);
.onFailure().invoke(t -> reportFailure(t, false));
this.stream = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry;
import io.smallrye.reactive.messaging.providers.locals.ContextDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
Expand Down Expand Up @@ -100,6 +101,7 @@ public void initWeld() {
weld.addBeanClass(KafkaClientServiceImpl.class);
weld.addBeanClass(MetricDecorator.class);
weld.addBeanClass(MicrometerDecorator.class);
weld.addBeanClass(ContextDecorator.class);
weld.disableDiscovery();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package io.smallrye.reactive.messaging.kafka.commit;

import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public class StreamAckStrategyTest extends KafkaCompanionTestBase {

@Test
public void testWithPartitions() {
Infrastructure.setOperatorLogger(new Infrastructure.OperatorLogger() {
@Override
public void log(String identifier, String event, Object value, Throwable failure) {
String message = "[--> " + identifier + " | " + event;
if (failure == null) {
if (value != null) {
message = message + "(" + value + ")";
} else {
message = message + "()";
}
} else {
message = message + "(" + failure.getClass().getName() + "(\"" + failure.getMessage() + "\"))";
}
Context context = Vertx.currentContext();
if (context != null) {
message = message + " " + context;
}
System.out.println(message + " " + Thread.currentThread().getName());
}
});
companion.topics().createAndWait(topic, 3);
String sinkTopic = topic + "-sink";
companion.topics().createAndWait(sinkTopic, 3);
String groupId = UUID.randomUUID().toString();

MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka")
.with("group.id", groupId)
.with("topic", topic)
.with("partitions", 3)
.with("auto.offset.reset", "earliest")
.with("commit-strategy", "throttled")
.with("value.deserializer", IntegerDeserializer.class.getName())
.withPrefix("mp.messaging.outgoing.sink")
.with("connector", "smallrye-kafka")
.with("topic", sinkTopic)
.with("value.serializer", IntegerSerializer.class.getName())
.withPrefix("mp.messaging.outgoing.out")
.with("connector", "smallrye-kafka")
.with("topic", topic + "-out")
.with("value.serializer", StringSerializer.class.getName());

DoubleAckBean application = runApplication(config, DoubleAckBean.class);

int expected = 3000;
Random random = new Random();
companion.produceIntegers().usingGenerator(i -> {
int p = random.nextInt(3);
return new ProducerRecord<>(topic, p, Integer.toString(p), i);
}, expected).awaitCompletion(Duration.ofMinutes(1));

await().atMost(1, TimeUnit.MINUTES)
.until(() -> application.count() >= expected);

companion.consumeIntegers().fromTopics(sinkTopic, expected)
.awaitCompletion(Duration.ofMinutes(1));
}

@ApplicationScoped
public static class DoubleAckBean {
private final AtomicLong count = new AtomicLong();
private final Map<String, List<Integer>> received = new ConcurrentHashMap<>();

private final AtomicLong parallel = new AtomicLong();

@Inject
@Channel("out")
MutinyEmitter<String> emitter;

@Incoming("kafka")
@Outgoing("sink")
@Broadcast
public Multi<Message<Integer>> consume(Multi<KafkaRecord<String, Integer>> records) {
return records
.log("upstream")
.call(m -> emitter.send(m.getPayload() + ""))
// .call(m -> Uni.createFrom().voidItem().emitOn(Infrastructure.getDefaultExecutor()))
.invoke(msg -> {
String k = Thread.currentThread().getName();
System.out.println(parallel.incrementAndGet() + " " + k + " " + Vertx.currentContext() + " / "
+ msg.getContextMetadata().get().context());
List<Integer> list = received.computeIfAbsent(k, s -> new CopyOnWriteArrayList<>());
list.add(msg.getPayload());
count.incrementAndGet();
})
.group().by(msg -> msg.getPartition())
.flatMap(group -> {
System.out.println("key " + group.key());
return group.onItem().scan((l, r) -> {
return l;
});
})
.log("downstream")
// .call(m -> Uni.createFrom().voidItem().emitOn(Infrastructure.getDefaultExecutor()))
.call(m -> Uni.createFrom().completionStage(m::ack))
.invoke(() -> parallel.decrementAndGet())
.map(msg -> msg.withPayload(msg.getPayload() + 1));
}

public Map<String, List<Integer>> getReceived() {
return received;
}

public long count() {
return count.get();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.core.Vertx;
import reactor.core.publisher.Flux;

public class ThrottledCommitStrategyTest extends KafkaCompanionTestBase {

Expand Down Expand Up @@ -66,6 +70,78 @@ public void testWithPartitions() {
.awaitCompletion(Duration.ofMinutes(1));
}

@Test
@Disabled
public void testWithPartitionsStreamProcessor() {
companion.topics().createAndWait(topic, 3);
String sinkTopic = topic + "-sink";
companion.topics().createAndWait(sinkTopic, 3);
String groupId = UUID.randomUUID().toString();

MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka")
.with("group.id", groupId)
.with("topic", topic)
.with("partitions", 3)
.with("auto.offset.reset", "earliest")
.with("commit-strategy", "throttled")
.with("value.deserializer", IntegerDeserializer.class.getName())
.withPrefix("mp.messaging.outgoing.sink")
.with("connector", "smallrye-kafka")
.with("topic", sinkTopic)
.with("value.serializer", IntegerSerializer.class.getName());

StreamProcessingBean application = runApplication(config, StreamProcessingBean.class);

int expected = 3000;
Random random = new Random();
companion.produceIntegers().usingGenerator(i -> {
int p = random.nextInt(3);
return new ProducerRecord<>(topic, p, Integer.toString(p), i);
}, expected).awaitCompletion(Duration.ofMinutes(1));

await().atMost(1, TimeUnit.MINUTES)
.until(() -> application.count() >= expected);

companion.consumeIntegers().fromTopics(sinkTopic, expected)
.awaitCompletion(Duration.ofMinutes(1));
}

@Test
@Disabled
public void testWithPartitionsStreamProcessorFlux() {
companion.topics().createAndWait(topic, 3);
String sinkTopic = topic + "-sink";
companion.topics().createAndWait(sinkTopic, 3);
String groupId = UUID.randomUUID().toString();

MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka")
.with("group.id", groupId)
.with("topic", topic)
.with("partitions", 3)
.with("auto.offset.reset", "earliest")
.with("commit-strategy", "throttled")
.with("value.deserializer", IntegerDeserializer.class.getName())
.withPrefix("mp.messaging.outgoing.sink")
.with("connector", "smallrye-kafka")
.with("topic", sinkTopic)
.with("value.serializer", IntegerSerializer.class.getName());

PublisherStreamProcessingBean application = runApplication(config, PublisherStreamProcessingBean.class);

int expected = 2000;
Random random = new Random();
companion.produceIntegers().usingGenerator(i -> {
int p = random.nextInt(3);
return new ProducerRecord<>(topic, p, Integer.toString(p), i);
}, expected).awaitCompletion(Duration.ofMinutes(1));

await().atMost(1, TimeUnit.MINUTES)
.until(() -> application.count() >= expected);

companion.consumeIntegers().fromTopics(sinkTopic, expected)
.awaitCompletion(Duration.ofMinutes(1));
}

@Test
public void testWithPartitionsBlockingUnordered() {
companion.topics().createAndWait(topic, 3);
Expand Down Expand Up @@ -117,6 +193,62 @@ public long count() {
}
}

@ApplicationScoped
public static class StreamProcessingBean {
private final AtomicLong count = new AtomicLong();
private final Map<String, List<Integer>> received = new ConcurrentHashMap<>();

@Incoming("kafka")
@Outgoing("sink")
public Multi<Message<Integer>> process(Multi<KafkaRecord<String, Integer>> multi) {
return multi.map(msg -> {
String k = Thread.currentThread().getName();
List<Integer> list = received.computeIfAbsent(k, s -> new CopyOnWriteArrayList<>());
list.add(msg.getPayload());
count.incrementAndGet();
assert msg.getContextMetadata().get().context() == Vertx.currentContext();
return msg.withPayload(msg.getPayload() + 1)
.addMetadata(OutgoingKafkaRecordMetadata.builder().withPartition(msg.getPartition()).build());
});
}

public Map<String, List<Integer>> getReceived() {
return received;
}

public long count() {
return count.get();
}
}

@ApplicationScoped
public static class PublisherStreamProcessingBean {
private final AtomicLong count = new AtomicLong();
private final Map<String, List<Integer>> received = new ConcurrentHashMap<>();

@Incoming("kafka")
@Outgoing("sink")
public Flux<Message<Integer>> process(Flux<KafkaRecord<String, Integer>> pub) throws InterruptedException {
return pub.map(msg -> {
String k = Thread.currentThread().getName();
List<Integer> list = received.computeIfAbsent(k, s -> new CopyOnWriteArrayList<>());
list.add(msg.getPayload());
count.incrementAndGet();
assert msg.getContextMetadata().get().context() == Vertx.currentContext();
return msg.withPayload(msg.getPayload() + 1)
.addMetadata(OutgoingKafkaRecordMetadata.builder().withPartition(msg.getPartition()).build());
});
}

public Map<String, List<Integer>> getReceived() {
return received;
}

public long count() {
return count.get();
}
}

@ApplicationScoped
public static class BlockingBean {
private final AtomicLong count = new AtomicLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry;
import io.smallrye.reactive.messaging.providers.locals.ContextDecorator;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mqtt.MqttClientOptions;
Expand Down Expand Up @@ -127,6 +128,7 @@ static Weld baseWeld(MapBasedConfig config) {
weld.addPackages(EmitterImpl.class.getPackage());
weld.addExtension(new ReactiveMessagingExtension());
weld.addBeanClass(MqttConnector.class);
weld.addBeanClass(ContextDecorator.class);
weld.addBeanClass(EmitterFactoryImpl.class);
weld.addBeanClass(MutinyEmitterFactoryImpl.class);
weld.addBeanClass(LegacyEmitterFactoryImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloa
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi.onItem().transformToMultiAndConcatenate(message -> {
PublisherBuilder<?> pb = invoke(message.getPayload());
return Multi.createFrom().publisher(pb.buildRs())
return MultiUtils.publisher(pb.buildRs())
.onItem().transform(payload -> Message.of(payload, message.getMetadata()));
// TODO We can handle post-acknowledgement here. -> onCompletion
});
Expand All @@ -242,7 +242,7 @@ private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() {
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
return multi.onItem().transformToMultiAndConcatenate(message -> {
Publisher<?> pub = invoke(message.getPayload());
return Multi.createFrom().publisher(pub)
return MultiUtils.publisher(pub)
.onItem().transform(payload -> Message.of(payload, message.getMetadata()));
// TODO We can handle post-acknowledgement here. -> onCompletion
});
Expand Down
Loading

0 comments on commit f3e0fa4

Please sign in to comment.