Skip to content

Commit

Permalink
Merge pull request #10047 from cescoffier/features/update-reactive-me…
Browse files Browse the repository at this point in the history
…ssaging

Update SmallRye Reactive Messaging to version 2.1.0
  • Loading branch information
gsmet authored Jun 17, 2020
2 parents ab6301e + a58ad99 commit fc190b2
Show file tree
Hide file tree
Showing 13 changed files with 506 additions and 54 deletions.
2 changes: 1 addition & 1 deletion bom/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<smallrye-context-propagation.version>1.0.13</smallrye-context-propagation.version>
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-converter-api.version>1.0.13</smallrye-converter-api.version>
<smallrye-reactive-messaging.version>2.0.4</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>2.1.0</smallrye-reactive-messaging.version>
<swagger-ui.version>3.26.1</swagger-ui.version>
<jakarta.activation.version>1.2.1</jakarta.activation.version>
<jakarta.annotation-api.version>1.3.5</jakarta.annotation-api.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;

import javax.security.auth.spi.LoginModule;

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.StickyAssignor;
Expand Down Expand Up @@ -42,7 +44,6 @@

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.Capabilities;
import io.quarkus.deployment.Capability;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
Expand Down Expand Up @@ -90,18 +91,20 @@ public void build(CombinedIndexBuildItem indexBuildItem, BuildProducer<Reflectiv
collectImplementors(toRegister, indexBuildItem, Serializer.class);
collectImplementors(toRegister, indexBuildItem, Deserializer.class);
collectImplementors(toRegister, indexBuildItem, Partitioner.class);
// PartitionAssignor is now deprecated, replaced by ConsumerPartitionAssignor
collectImplementors(toRegister, indexBuildItem, PartitionAssignor.class);
collectImplementors(toRegister, indexBuildItem, ConsumerPartitionAssignor.class);

for (Class i : BUILT_INS) {
for (Class<?> i : BUILT_INS) {
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, i.getName()));
collectSubclasses(toRegister, indexBuildItem, i);
}
if (capabilities.isPresent(Capability.JSONB)) {
if (capabilities.isCapabilityPresent(Capabilities.JSONB)) {
reflectiveClass.produce(new ReflectiveClassBuildItem(false, false, JsonbSerializer.class, JsonbDeserializer.class));
collectSubclasses(toRegister, indexBuildItem, JsonbSerializer.class);
collectSubclasses(toRegister, indexBuildItem, JsonbDeserializer.class);
}
if (capabilities.isPresent(Capability.JACKSON)) {
if (capabilities.isCapabilityPresent(Capabilities.JACKSON)) {
reflectiveClass.produce(
new ReflectiveClassBuildItem(false, false, ObjectMapperSerializer.class, ObjectMapperDeserializer.class));
collectSubclasses(toRegister, indexBuildItem, ObjectMapperSerializer.class);
Expand Down Expand Up @@ -157,14 +160,17 @@ private static void collectSubclasses(Set<DotName> set, CombinedIndexBuildItem i
}

private static void collectClassNames(Set<DotName> set, Collection<ClassInfo> classInfos) {
classInfos.forEach(c -> {
set.add(c.name());
classInfos.forEach(new Consumer<ClassInfo>() {
@Override
public void accept(ClassInfo c) {
set.add(c.name());
}
});
}

@BuildStep
HealthBuildItem addHealthCheck(KafkaBuildTimeConfig buildTimeConfig) {
return new HealthBuildItem("io.quarkus.kafka.client.health.KafkaHealthCheck",
buildTimeConfig.healthEnabled, "kafka");
buildTimeConfig.healthEnabled);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.quarkus.smallrye.reactivemessaging.deployment;

import java.lang.annotation.Annotation;

import io.smallrye.reactive.messaging.annotations.Broadcast;

public class BroadcastLiteral implements Broadcast {
private final int subscribers;

public BroadcastLiteral(int subscribers) {
this.subscribers = subscribers;
}

@Override
public int value() {
return subscribers;
}

@Override
public Class<? extends Annotation> annotationType() {
return Broadcast.class;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.smallrye.reactivemessaging.deployment;

import io.quarkus.builder.item.MultiBuildItem;
import io.smallrye.reactive.messaging.extension.EmitterConfiguration;

public final class EmitterBuildItem extends MultiBuildItem {

Expand All @@ -12,18 +13,8 @@ public final class EmitterBuildItem extends MultiBuildItem {
* @param bufferSize the buffer size, if overflow is set to {@code BUFFER}
* @return the new {@link EmitterBuildItem}
*/
static EmitterBuildItem of(String name, String overflow, int bufferSize) {
return new EmitterBuildItem(name, overflow, bufferSize);
}

/**
* Creates a new instance of {@link EmitterBuildItem} using the default overflow strategy.
*
* @param name the name of the stream
* @return the new {@link EmitterBuildItem}
*/
static EmitterBuildItem of(String name) {
return new EmitterBuildItem(name, null, -1);
static EmitterBuildItem of(String name, String overflow, int bufferSize, boolean hasBroadcast, int awaitSubscribers) {
return new EmitterBuildItem(name, overflow, bufferSize, hasBroadcast, awaitSubscribers);
}

/**
Expand All @@ -43,22 +34,28 @@ static EmitterBuildItem of(String name) {
*/
private final int bufferSize;

public EmitterBuildItem(String name, String overflow, int bufferSize) {
/**
* Whether the emitter uses the {@link io.smallrye.reactive.messaging.annotations.Broadcast} annotation.
*/
private final boolean hasBroadcast;

/**
* If the emitter uses the {@link io.smallrye.reactive.messaging.annotations.Broadcast} annotation, indicates the
* number of subscribers to be expected before subscribing upstream.
*/
private final int awaitSubscribers;

public EmitterBuildItem(String name, String overflow, int bufferSize, boolean hasBroadcast, int awaitSubscribers) {
this.name = name;
this.overflow = overflow;
this.bufferSize = bufferSize;
this.hasBroadcast = hasBroadcast;
this.awaitSubscribers = hasBroadcast ? awaitSubscribers : -1;
}

public String getName() {
return name;
}

public String getOverflow() {
return overflow;
}

public int getBufferSize() {
return bufferSize;
public EmitterConfiguration getEmitterConfig() {
return new EmitterConfiguration(name, OnOverflowLiteral.create(overflow, bufferSize),
hasBroadcast ? new BroadcastLiteral(awaitSubscribers) : null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.smallrye.reactivemessaging.deployment;

import java.lang.annotation.Annotation;

import org.eclipse.microprofile.reactive.messaging.OnOverflow;

public class OnOverflowLiteral implements OnOverflow {

private final Strategy strategy;
private final long buffer;

OnOverflowLiteral(String strategy, long buffer) {
this.strategy = strategy == null ? Strategy.BUFFER : Strategy.valueOf(strategy.toUpperCase());
this.buffer = buffer;
}

public static OnOverflow create(String strategy, long buffer) {
return new OnOverflowLiteral(strategy, buffer);
}

@Override
public Strategy value() {
return strategy;
}

@Override
public long bufferSize() {
return buffer;
}

@Override
public Class<? extends Annotation> annotationType() {
return OnOverflow.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Vetoed;
Expand Down Expand Up @@ -68,7 +69,7 @@
import io.smallrye.reactive.messaging.annotations.Blocking;

/**
*
*
*/
public class SmallRyeReactiveMessagingProcessor {

Expand All @@ -86,7 +87,8 @@ FeatureBuildItem feature() {
AdditionalBeanBuildItem beans() {
// We add the connector and channel qualifiers to make them part of the index.
return new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class,
Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class, QuarkusWorkerPoolRegistry.class);
Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class,
QuarkusWorkerPoolRegistry.class);
}

@BuildStep
Expand Down Expand Up @@ -163,6 +165,19 @@ void validateBeanDeployment(

for (InjectionPointInfo injectionPoint : validationPhase.getContext()
.get(BuildExtension.Key.INJECTION_POINTS)) {

Optional<AnnotationInstance> broadcast = annotationStore.getAnnotations(injectionPoint.getTarget())
.stream()
.filter(ai -> ReactiveMessagingDotNames.BROADCAST.equals(ai.name()))
.filter(ai -> {
if (ai.target().kind() == AnnotationTarget.Kind.METHOD_PARAMETER && injectionPoint
.isParam()) {
return ai.target().asMethodParameter().position() == injectionPoint.getPosition();
}
return true;
})
.findAny();

// New emitter from the spec.
if (injectionPoint.getRequiredType().name().equals(
ReactiveMessagingDotNames.EMITTER)) {
Expand All @@ -187,7 +202,7 @@ void validateBeanDeployment(
return true;
})
.findAny();
createEmitter(emitters, injectionPoint, channelName, overflow);
createEmitter(emitters, injectionPoint, channelName, overflow, broadcast);
}
}

Expand Down Expand Up @@ -215,7 +230,8 @@ void validateBeanDeployment(
return true;
})
.findAny();
createEmitter(emitters, injectionPoint, channelName, overflow);

createEmitter(emitters, injectionPoint, channelName, overflow, broadcast);
}
}
}
Expand All @@ -224,18 +240,30 @@ void validateBeanDeployment(
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private void createEmitter(BuildProducer<EmitterBuildItem> emitters, InjectionPointInfo injectionPoint,
String channelName,
Optional<AnnotationInstance> overflow) {
Optional<AnnotationInstance> overflow,
Optional<AnnotationInstance> broadcast) {
LOGGER.debugf("Emitter injection point '%s' detected, channel name: '%s'",
injectionPoint.getTargetInfo(), channelName);

boolean hasBroadcast = false;
int awaitSubscribers = -1;
int bufferSize = -1;
String strategy = null;
if (broadcast.isPresent()) {
hasBroadcast = true;
AnnotationValue value = broadcast.get().value();
awaitSubscribers = value == null ? 0 : value.asInt();
}

if (overflow.isPresent()) {
AnnotationInstance annotation = overflow.get();
AnnotationValue maybeBufferSize = annotation.value("bufferSize");
int bufferSize = maybeBufferSize != null ? maybeBufferSize.asInt() : 0;
emitters.produce(
EmitterBuildItem.of(channelName, annotation.value().asString(), bufferSize));
} else {
emitters.produce(EmitterBuildItem.of(channelName));
bufferSize = maybeBufferSize == null ? 0 : maybeBufferSize.asInt();
strategy = annotation.value().asString();
}

emitters.produce(
EmitterBuildItem.of(channelName, strategy, bufferSize, hasBroadcast, awaitSubscribers));
}

@BuildStep
Expand Down Expand Up @@ -327,16 +355,15 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
for (EmitterBuildItem it : emitterFields) {
Config config = ConfigProvider.getConfig();
int defaultBufferSize = config.getOptionalValue("mp.messaging.emitter.default-buffer-size", Integer.class)
.orElseGet(() -> config
.getOptionalValue("smallrye.messaging.emitter.default-buffer-size", Integer.class)
.orElse(127));
if (it.getOverflow() != null) {
recorder.configureEmitter(beanContainer.getValue(), it.getName(), it.getOverflow(),
it.getBufferSize(),
defaultBufferSize);
} else {
recorder.configureEmitter(beanContainer.getValue(), it.getName(), null, 0, defaultBufferSize);
}
.orElseGet(new Supplier<Integer>() {
@Override
public Integer get() {
return config
.getOptionalValue("smallrye.messaging.emitter.default-buffer-size", Integer.class)
.orElse(127);
}
});
recorder.configureEmitter(beanContainer.getValue(), it.getEmitterConfig(), defaultBufferSize);
}
}

Expand Down
Loading

0 comments on commit fc190b2

Please sign in to comment.