Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SmallRye Reactive Messaging to version 2.1.0 #10047

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bom/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<smallrye-context-propagation.version>1.0.13</smallrye-context-propagation.version>
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-converter-api.version>1.0.13</smallrye-converter-api.version>
<smallrye-reactive-messaging.version>2.0.4</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>2.1.0</smallrye-reactive-messaging.version>
<swagger-ui.version>3.26.1</swagger-ui.version>
<jakarta.activation.version>1.2.1</jakarta.activation.version>
<jakarta.annotation-api.version>1.3.5</jakarta.annotation-api.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;

import javax.security.auth.spi.LoginModule;

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.StickyAssignor;
Expand Down Expand Up @@ -42,7 +44,6 @@

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
Expand Down Expand Up @@ -90,18 +91,20 @@ public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer<Reflectiv
collectImplementors(toRegister, indexBuildItem, Serializer.class);
collectImplementors(toRegister, indexBuildItem, Deserializer.class);
collectImplementors(toRegister, indexBuildItem, Partitioner.class);
// PartitionAssignor is now deprecated, replaced by ConsumerPartitionAssignor
collectImplementors(toRegister, indexBuildItem, PartitionAssignor.class);
collectImplementors(toRegister, indexBuildItem, ConsumerPartitionAssignor.class);

for (Class i : BUILT_INS) {
for (Class<?> i : BUILT_INS) {
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, i.getName()));
collectSubclasses(toRegister, indexBuildItem, i);
}
if (capabilities.isPresent(Capability.JSONB)) {
if (capabilities.isCapabilityPresent(Capabilities.JSONB)) {
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, JsonbSerializer.class, JsonbDeserializer.class));
collectSubclasses(toRegister, indexBuildItem, JsonbSerializer.class);
collectSubclasses(toRegister, indexBuildItem, JsonbDeserializer.class);
}
if (capabilities.isPresent(Capability.JACKSON)) {
if (capabilities.isCapabilityPresent(Capabilities.JACKSON)) {
reflectiveClass.produce(
new ReflectiveClassBuildItem(false, false, ObjectMapperSerializer.class, ObjectMapperDeserializer.class));
collectSubclasses(toRegister, indexBuildItem, ObjectMapperSerializer.class);
Expand Down Expand Up @@ -157,14 +160,17 @@ private static void collectSubclasses(Set<DotName> set, CombinedIndexBuildItem i
}

private static void collectClassNames(Set<DotName> set, Collection<ClassInfo> classInfos) {
classInfos.forEach(c -> {
set.add(c.name());
classInfos.forEach(new Consumer<ClassInfo>() {
@Override
public void accept(ClassInfo c) {
set.add(c.name());
}
});
}

@BuildStep
HealthBuildItem addHealthCheck(KafkaBuildTimeConfig buildTimeConfig) {
return new HealthBuildItem("io.quarkus.kafka.client.health.KafkaHealthCheck",
buildTimeConfig.healthEnabled, "kafka");
buildTimeConfig.healthEnabled);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.quarkus.smallrye.reactivemessaging.deployment;

import java.lang.annotation.Annotation;

import io.smallrye.reactive.messaging.annotations.Broadcast;

public class BroadcastLiteral implements Broadcast {
private final int subscribers;

public BroadcastLiteral(int subscribers) {
this.subscribers = subscribers;
}

@Override
public int value() {
return subscribers;
}

@Override
public Class<? extends Annotation> annotationType() {
return Broadcast.class;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.smallrye.reactivemessaging.deployment;

import io.quarkus.builder.item.MultiBuildItem;
import io.smallrye.reactive.messaging.extension.EmitterConfiguration;

public final class EmitterBuildItem extends MultiBuildItem {

Expand All @@ -12,18 +13,8 @@ public final class EmitterBuildItem extends MultiBuildItem {
* @param bufferSize the buffer size, if overflow is set to {@code BUFFER}
* @return the new {@link EmitterBuildItem}
*/
static EmitterBuildItem of(String name, String overflow, int bufferSize) {
return new EmitterBuildItem(name, overflow, bufferSize);
}

/**
* Creates a new instance of {@link EmitterBuildItem} using the default overflow strategy.
*
* @param name the name of the stream
* @return the new {@link EmitterBuildItem}
*/
static EmitterBuildItem of(String name) {
return new EmitterBuildItem(name, null, -1);
static EmitterBuildItem of(String name, String overflow, int bufferSize, boolean hasBroadcast, int awaitSubscribers) {
return new EmitterBuildItem(name, overflow, bufferSize, hasBroadcast, awaitSubscribers);
}

/**
Expand All @@ -43,22 +34,28 @@ static EmitterBuildItem of(String name) {
*/
private final int bufferSize;

public EmitterBuildItem(String name, String overflow, int bufferSize) {
/**
* Whether the emitter uses the {@link io.smallrye.reactive.messaging.annotations.Broadcast} annotation.
*/
private final boolean hasBroadcast;

/**
* If the emitter uses the {@link io.smallrye.reactive.messaging.annotations.Broadcast} annotation, indicates the
* number of subscribers to be expected before subscribing upstream.
*/
private final int awaitSubscribers;

public EmitterBuildItem(String name, String overflow, int bufferSize, boolean hasBroadcast, int awaitSubscribers) {
this.name = name;
this.overflow = overflow;
this.bufferSize = bufferSize;
this.hasBroadcast = hasBroadcast;
this.awaitSubscribers = hasBroadcast ? awaitSubscribers : -1;
}

public String getName() {
return name;
}

public String getOverflow() {
return overflow;
}

public int getBufferSize() {
return bufferSize;
public EmitterConfiguration getEmitterConfig() {
return new EmitterConfiguration(name, OnOverflowLiteral.create(overflow, bufferSize),
hasBroadcast ? new BroadcastLiteral(awaitSubscribers) : null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.smallrye.reactivemessaging.deployment;

import java.lang.annotation.Annotation;

import org.eclipse.microprofile.reactive.messaging.OnOverflow;

public class OnOverflowLiteral implements OnOverflow {

private final Strategy strategy;
private final long buffer;

OnOverflowLiteral(String strategy, long buffer) {
this.strategy = strategy == null ? Strategy.BUFFER : Strategy.valueOf(strategy.toUpperCase());
this.buffer = buffer;
}

public static OnOverflow create(String strategy, long buffer) {
return new OnOverflowLiteral(strategy, buffer);
}

@Override
public Strategy value() {
return strategy;
}

@Override
public long bufferSize() {
return buffer;
}

@Override
public Class<? extends Annotation> annotationType() {
return OnOverflow.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Vetoed;
Expand Down Expand Up @@ -68,7 +69,7 @@
import io.smallrye.reactive.messaging.annotations.Blocking;

/**
*
*
*/
public class SmallRyeReactiveMessagingProcessor {

Expand All @@ -86,7 +87,8 @@ FeatureBuildItem feature() {
AdditionalBeanBuildItem beans() {
// We add the connector and channel qualifiers to make them part of the index.
return new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class,
Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class, QuarkusWorkerPoolRegistry.class);
Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class,
QuarkusWorkerPoolRegistry.class);
}

@BuildStep
Expand Down Expand Up @@ -163,6 +165,19 @@ void validateBeanDeployment(

for (InjectionPointInfo injectionPoint : validationPhase.getContext()
.get(BuildExtension.Key.INJECTION_POINTS)) {

Optional<AnnotationInstance> broadcast = annotationStore.getAnnotations(injectionPoint.getTarget())
.stream()
.filter(ai -> ReactiveMessagingDotNames.BROADCAST.equals(ai.name()))
.filter(ai -> {
if (ai.target().kind() == AnnotationTarget.Kind.METHOD_PARAMETER && injectionPoint
.isParam()) {
return ai.target().asMethodParameter().position() == injectionPoint.getPosition();
}
return true;
})
.findAny();

// New emitter from the spec.
if (injectionPoint.getRequiredType().name().equals(
ReactiveMessagingDotNames.EMITTER)) {
Expand All @@ -187,7 +202,7 @@ void validateBeanDeployment(
return true;
})
.findAny();
createEmitter(emitters, injectionPoint, channelName, overflow);
createEmitter(emitters, injectionPoint, channelName, overflow, broadcast);
}
}

Expand Down Expand Up @@ -215,7 +230,8 @@ void validateBeanDeployment(
return true;
})
.findAny();
createEmitter(emitters, injectionPoint, channelName, overflow);

createEmitter(emitters, injectionPoint, channelName, overflow, broadcast);
}
}
}
Expand All @@ -224,18 +240,30 @@ void validateBeanDeployment(
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private void createEmitter(BuildProducer<EmitterBuildItem> emitters, InjectionPointInfo injectionPoint,
String channelName,
Optional<AnnotationInstance> overflow) {
Optional<AnnotationInstance> overflow,
Optional<AnnotationInstance> broadcast) {
LOGGER.debugf("Emitter injection point '%s' detected, channel name: '%s'",
injectionPoint.getTargetInfo(), channelName);

boolean hasBroadcast = false;
int awaitSubscribers = -1;
int bufferSize = -1;
String strategy = null;
if (broadcast.isPresent()) {
hasBroadcast = true;
AnnotationValue value = broadcast.get().value();
awaitSubscribers = value == null ? 0 : value.asInt();
}

if (overflow.isPresent()) {
AnnotationInstance annotation = overflow.get();
AnnotationValue maybeBufferSize = annotation.value("bufferSize");
int bufferSize = maybeBufferSize != null ? maybeBufferSize.asInt() : 0;
emitters.produce(
EmitterBuildItem.of(channelName, annotation.value().asString(), bufferSize));
} else {
emitters.produce(EmitterBuildItem.of(channelName));
bufferSize = maybeBufferSize == null ? 0 : maybeBufferSize.asInt();
strategy = annotation.value().asString();
}

emitters.produce(
EmitterBuildItem.of(channelName, strategy, bufferSize, hasBroadcast, awaitSubscribers));
}

@BuildStep
Expand Down Expand Up @@ -327,16 +355,15 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
for (EmitterBuildItem it : emitterFields) {
Config config = ConfigProvider.getConfig();
int defaultBufferSize = config.getOptionalValue("mp.messaging.emitter.default-buffer-size", Integer.class)
.orElseGet(() -> config
.getOptionalValue("smallrye.messaging.emitter.default-buffer-size", Integer.class)
.orElse(127));
if (it.getOverflow() != null) {
recorder.configureEmitter(beanContainer.getValue(), it.getName(), it.getOverflow(),
it.getBufferSize(),
defaultBufferSize);
} else {
recorder.configureEmitter(beanContainer.getValue(), it.getName(), null, 0, defaultBufferSize);
}
.orElseGet(new Supplier<Integer>() {
@Override
public Integer get() {
return config
.getOptionalValue("smallrye.messaging.emitter.default-buffer-size", Integer.class)
.orElse(127);
}
});
recorder.configureEmitter(beanContainer.getValue(), it.getEmitterConfig(), defaultBufferSize);
}
}

Expand Down
Loading