Skip to content

Commit

Permalink
Reintroduce ContextDecorator
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Nov 29, 2022
1 parent e76d5cf commit 0d704ad
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.smallrye.reactive.messaging.kafka.commit.*;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth;
import io.smallrye.reactive.messaging.providers.locals.ContextOperator;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Vertx;
Expand Down Expand Up @@ -157,8 +156,7 @@ public KafkaSource(Vertx vertx,
incomingMulti = incomingMulti.onItem().invoke(record -> incomingTrace(record, false));
}
this.stream = incomingMulti
.onFailure().invoke(t -> reportFailure(t, false))
.plug(ContextOperator::apply);
.onFailure().invoke(t -> reportFailure(t, false));
this.batchStream = null;
} else {
Multi<ConsumerRecords<K, V>> multi;
Expand Down Expand Up @@ -188,8 +186,7 @@ public KafkaSource(Vertx vertx,
incomingMulti = incomingMulti.onItem().invoke(this::incomingTrace);
}
this.batchStream = incomingMulti
.onFailure().invoke(t -> reportFailure(t, false))
.plug(ContextOperator::apply);
.onFailure().invoke(t -> reportFailure(t, false));
this.stream = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry;
import io.smallrye.reactive.messaging.providers.locals.ContextDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
Expand Down Expand Up @@ -100,6 +101,7 @@ public void initWeld() {
weld.addBeanClass(KafkaClientServiceImpl.class);
weld.addBeanClass(MetricDecorator.class);
weld.addBeanClass(MicrometerDecorator.class);
weld.addBeanClass(ContextDecorator.class);
weld.disableDiscovery();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package io.smallrye.reactive.messaging.kafka.commit;

import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
import io.smallrye.reactive.messaging.providers.locals.ContextOperator;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public class StreamAckStrategyTest extends KafkaCompanionTestBase {

@Test
public void testWithPartitions() {
Infrastructure.setOperatorLogger(new Infrastructure.OperatorLogger() {
@Override
public void log(String identifier, String event, Object value, Throwable failure) {
String message = "[--> " + identifier + " | " + event;
if (failure == null) {
if (value != null) {
message = message + "(" + value + ")";
} else {
message = message + "()";
}
} else {
message = message + "(" + failure.getClass().getName() + "(\"" + failure.getMessage() + "\"))";
}
Context context = Vertx.currentContext();
if (context != null) {
message = message + " " + context;
}
System.out.println(message + " " + Thread.currentThread().getName());
}
});
companion.topics().createAndWait(topic, 3);
String sinkTopic = topic + "-sink";
companion.topics().createAndWait(sinkTopic, 3);
String groupId = UUID.randomUUID().toString();

MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka")
.with("group.id", groupId)
.with("topic", topic)
.with("partitions", 3)
.with("auto.offset.reset", "earliest")
.with("commit-strategy", "throttled")
.with("value.deserializer", IntegerDeserializer.class.getName())
.withPrefix("mp.messaging.outgoing.sink")
.with("connector", "smallrye-kafka")
.with("topic", sinkTopic)
.with("value.serializer", IntegerSerializer.class.getName())
.withPrefix("mp.messaging.outgoing.out")
.with("connector", "smallrye-kafka")
.with("topic", topic + "-out")
.with("value.serializer", StringSerializer.class.getName())
;

DoubleAckBean application = runApplication(config, DoubleAckBean.class);

int expected = 3000;
Random random = new Random();
companion.produceIntegers().usingGenerator(i -> {
int p = random.nextInt(3);
return new ProducerRecord<>(topic, p, Integer.toString(p), i);
}, expected).awaitCompletion(Duration.ofMinutes(1));

await().atMost(1, TimeUnit.MINUTES)
.until(() -> application.count() >= expected);

companion.consumeIntegers().fromTopics(sinkTopic, expected)
.awaitCompletion(Duration.ofMinutes(1));
}

@ApplicationScoped
public static class DoubleAckBean {
private final AtomicLong count = new AtomicLong();
private final Map<String, List<Integer>> received = new ConcurrentHashMap<>();

private final AtomicLong parallel = new AtomicLong();

@Inject
@Channel("out")
MutinyEmitter<String> emitter;

@Incoming("kafka")
@Outgoing("sink")
@Broadcast
public Multi<Message<Integer>> consume(Multi<KafkaRecord<String, Integer>> records) {
return records.plug(ContextOperator::apply)
.log("upstream")
.call(m -> emitter.send(m.getPayload() + ""))
// .call(m -> Uni.createFrom().voidItem().emitOn(Infrastructure.getDefaultExecutor()))
.invoke(msg -> {
String k = Thread.currentThread().getName();
System.out.println(parallel.incrementAndGet() + " " + k + " " + Vertx.currentContext() + " / " + msg.getContextMetadata().get().context());
List<Integer> list = received.computeIfAbsent(k, s -> new CopyOnWriteArrayList<>());
list.add(msg.getPayload());
count.incrementAndGet();
})
.group().by(msg -> msg.getPartition())
.flatMap(group -> {
System.out.println("key " + group.key());
return group.onItem().scan((l, r) -> {
return l;
});
})
.log("downstream")
// .call(m -> Uni.createFrom().voidItem().emitOn(Infrastructure.getDefaultExecutor()))
.call(m -> Uni.createFrom().completionStage(m::ack))
.invoke(() -> parallel.decrementAndGet())
.map(msg -> msg.withPayload(msg.getPayload() + 1));
}

public Map<String, List<Integer>> getReceived() {
return received;
}

public long count() {
return count.get();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,43 @@
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

public class ThrottledCommitStrategyTest extends KafkaCompanionTestBase {
@BeforeEach
void setUp() {
Infrastructure.setOperatorLogger(new Infrastructure.OperatorLogger() {
@Override
public void log(String identifier, String event, Object value, Throwable failure) {
String message = "[--> " + identifier + " | " + event;
if (failure == null) {
if (value != null) {
message = message + "(" + value + ")";
} else {
message = message + "()";
}
} else {
message = message + "(" + failure.getClass().getName() + "(\"" + failure.getMessage() + "\"))";
}
Context context = Vertx.currentContext();
if (context != null) {
message = message + " " + context;
}
System.out.println(message + " " + Thread.currentThread().getName());
}
});
}

@Test
public void testWithPartitions() {
Expand Down Expand Up @@ -67,6 +95,7 @@ public void testWithPartitions() {
}

@Test
@Disabled
public void testWithPartitionsBlockingUnordered() {
companion.topics().createAndWait(topic, 3);
String groupId = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry;
import io.smallrye.reactive.messaging.providers.locals.ContextDecorator;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.mqtt.MqttClientOptions;
Expand Down Expand Up @@ -127,6 +128,7 @@ static Weld baseWeld(MapBasedConfig config) {
weld.addPackages(EmitterImpl.class.getPackage());
weld.addExtension(new ReactiveMessagingExtension());
weld.addBeanClass(MqttConnector.class);
weld.addBeanClass(ContextDecorator.class);
weld.addBeanClass(EmitterFactoryImpl.class);
weld.addBeanClass(MutinyEmitterFactoryImpl.class);
weld.addBeanClass(LegacyEmitterFactoryImpl.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.smallrye.reactive.messaging.providers.locals;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.PublisherDecorator;

/**
* Decorator to dispatch messages on the Vert.x context attached to the message via {@link LocalContextMetadata}.
* Low priority to be called before other decorators.
*/
@ApplicationScoped
public class ContextDecorator implements PublisherDecorator {

@Override
public int getPriority() {
return 0;
}

@Override
public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, String channelName,
boolean isConnector) {
return ContextOperator.apply(publisher);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

/**
* Decorator to dispatch messages on the Vert.x context attached to the message via {@link LocalContextMetadata}.
Expand Down Expand Up @@ -78,16 +77,11 @@ public void onItem(T item) {

@Override
public void request(long numberOfItems) {
Context context = Vertx.currentContext();
if (context != null) {
super.request(numberOfItems);
Context root = ROOT_CONTEXT_UPDATER.get(this);
if (root != null) {
root.runOnContext(x -> super.request(numberOfItems));
} else {
Context root = ROOT_CONTEXT_UPDATER.get(this);
if (root != null) {
root.runOnContext(x -> super.request(numberOfItems));
} else {
super.request(numberOfItems);
}
super.request(numberOfItems);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry;
import io.smallrye.reactive.messaging.providers.locals.ContextDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
Expand Down Expand Up @@ -110,6 +111,7 @@ public void setUp() {
MicrometerDecorator.class,
MetricDecorator.class,
HealthCenter.class,
ContextDecorator.class,
// Messaging provider
MyDummyConnector.class,
// Emitter factories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import io.smallrye.reactive.messaging.providers.locals.ContextOperator;
import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.mutiny.core.Context;
Expand Down Expand Up @@ -175,8 +174,7 @@ public Publisher<? extends Message<?>> getPublisher(Config config) {
.onItem()
.transformToUniAndConcatenate(i -> Uni.createFrom().emitter(e -> context.runOnContext(() -> e.complete(i))))
.map(Message::of)
.map(ContextAwareMessage::withContextMetadata)
.plug(ContextOperator::apply);
.map(ContextAwareMessage::withContextMetadata);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry;
import io.smallrye.reactive.messaging.providers.locals.ContextDecorator;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;

/**
Expand Down Expand Up @@ -59,6 +60,7 @@ void testAlternativeAnalysis() {
ConfiguredChannelFactory.class,
ConnectorFactories.class,
HealthCenter.class,
ContextDecorator.class,
// Messaging provider
MyDummyConnector.class,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory;
import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories;
import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry;
import io.smallrye.reactive.messaging.providers.locals.ContextDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator;
import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator;
import io.smallrye.reactive.messaging.providers.wiring.Wiring;
Expand Down Expand Up @@ -67,6 +68,7 @@ public void initWeld() {
weld.addBeanClass(RabbitMQConnector.class);
weld.addBeanClass(MetricDecorator.class);
weld.addBeanClass(MicrometerDecorator.class);
weld.addBeanClass(ContextDecorator.class);
weld.disableDiscovery();
}

Expand Down

0 comments on commit 0d704ad

Please sign in to comment.