diff --git a/smallrye-reactive-messaging-kafka/revapi.json b/smallrye-reactive-messaging-kafka/revapi.json index d1a26bd769..29f5898f83 100644 --- a/smallrye-reactive-messaging-kafka/revapi.json +++ b/smallrye-reactive-messaging-kafka/revapi.json @@ -24,7 +24,18 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "code": "java.method.removed", + "old": "method org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder> io.smallrye.reactive.messaging.kafka.KafkaConnector::getPublisherBuilder(org.eclipse.microprofile.config.Config)", + "justification": "KafkaConnector moved to InboundConnector API" + }, + { + "code": "java.method.removed", + "old": "method org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder, java.lang.Void> io.smallrye.reactive.messaging.kafka.KafkaConnector::getSubscriberBuilder(org.eclipse.microprofile.config.Config)", + "justification": "KafkaConnector moved to OutboundConnector API" + } + ] } }, { "extension" : "revapi.reporter.json", @@ -43,4 +54,4 @@ "minCriticality" : "documented", "output" : "out" } -} ] \ No newline at end of file +} ] diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java index 0155351f9a..262fa64edb 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java @@ -20,18 +20,16 @@ import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.spi.Connector; -import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory; -import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory; -import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; -import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; -import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.Tracer; import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction; +import io.smallrye.reactive.messaging.connector.InboundConnector; +import io.smallrye.reactive.messaging.connector.OutboundConnector; import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.health.HealthReporter; import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler; @@ -116,7 +114,7 @@ @ConnectorAttribute(name = "propagate-headers", direction = Direction.OUTGOING, description = "A comma-separating list of incoming record headers to be propagated to the outgoing record", type = "string", defaultValue = "") @ConnectorAttribute(name = "key-serialization-failure-handler", type = "string", direction = Direction.OUTGOING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.SerializationFailureHandler`. If set, serialization failure happening when serializing keys are delegated to this handler which may provide a fallback value.") @ConnectorAttribute(name = "value-serialization-failure-handler", type = "string", direction = Direction.OUTGOING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.SerializationFailureHandler`. If set, serialization failure happening when serializing values are delegated to this handler which may provide a fallback value.") -public class KafkaConnector implements IncomingConnectorFactory, OutgoingConnectorFactory, HealthReporter { +public class KafkaConnector implements InboundConnector, OutboundConnector, HealthReporter { public static final String CONNECTOR_NAME = "smallrye-kafka"; @@ -171,7 +169,7 @@ void init() { } @Override - public PublisherBuilder> getPublisherBuilder(Config config) { + public Publisher> getPublisher(Config config) { Config channelConfiguration = ConfigHelper.retrieveChannelConfiguration(configurations, config); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(channelConfiguration); @@ -207,9 +205,9 @@ public PublisherBuilder> getPublisherBuilder(Config config) stream = source.getBatchStream(); } if (broadcast) { - return ReactiveStreams.fromPublisher(stream.broadcast().toAllSubscribers()); + return stream.broadcast().toAllSubscribers(); } else { - return ReactiveStreams.fromPublisher(stream); + return stream; } } @@ -235,14 +233,14 @@ public PublisherBuilder> getPublisherBuilder(Config config) .streams(streams.toArray(new Publisher[0])); boolean broadcast = ic.getBroadcast(); if (broadcast) { - return ReactiveStreams.fromPublisher(multi.broadcast().toAllSubscribers()); + return multi.broadcast().toAllSubscribers(); } else { - return ReactiveStreams.fromPublisher(multi); + return multi; } } @Override - public SubscriberBuilder, Void> getSubscriberBuilder(Config config) { + public Subscriber> getSubscriber(Config config) { Config channelConfiguration = ConfigHelper.retrieveChannelConfiguration(configurations, config); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(channelConfiguration); diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java index d87b0218d8..79a63efedf 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java @@ -28,8 +28,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; -import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; +import org.reactivestreams.Subscriber; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.OutgoingMessageMetadata; @@ -44,6 +43,7 @@ import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.health.KafkaSinkHealth; import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper; +import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; @SuppressWarnings("jol") public class KafkaSink { @@ -52,7 +52,7 @@ public class KafkaSink { private final int partition; private final String topic; private final String key; - private final SubscriberBuilder, Void> subscriber; + private final Subscriber> subscriber; private final long retries; private final int deliveryTimeoutMs; @@ -118,13 +118,10 @@ public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafk } this.processor = new KafkaSenderProcessor(requests, waitForWriteCompletion, writeMessageToKafka()); - this.subscriber = ReactiveStreams.> builder() - .via(processor) - .onError(f -> { - log.unableToDispatch(f); - reportFailure(f); - }) - .ignore(); + this.subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(f -> { + log.unableToDispatch(f); + reportFailure(f); + })); } @@ -310,7 +307,7 @@ private Object getKey(Message message, return key; } - public SubscriberBuilder, Void> getSink() { + public Subscriber> getSink() { return subscriber; } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java index c2d4917e3c..10f1b1b2d6 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java @@ -58,7 +58,7 @@ public void testSinkUsingInteger() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) .map(Message::of) .subscribe((Subscriber>) subscriber); @@ -78,7 +78,7 @@ public void testSinkUsingIntegerAndChannelName() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) .map(Message::of) .subscribe((Subscriber>) subscriber); @@ -99,7 +99,7 @@ public void testSinkUsingString() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) .map(i -> Integer.toString(i)) .map(Message::of) @@ -238,7 +238,7 @@ public void testInvalidPayloadType() { List acked = new CopyOnWriteArrayList<>(); List nacked = new CopyOnWriteArrayList<>(); - Subscriber subscriber = sink.getSink().build(); + Subscriber subscriber = sink.getSink(); Multi.createFrom().range(0, 6) .map(i -> { if (i == 3 || i == 5) { @@ -279,7 +279,7 @@ public void testInvalidTypeWithDefaultInflightMessages() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber subscriber = sink.getSink().build(); + Subscriber subscriber = sink.getSink(); Multi.createFrom().range(0, 5) .map(i -> { if (i == 3 || i == 5) { diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java index cdfcde3c8b..d1652018c6 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java @@ -63,7 +63,7 @@ public void testSinkUsingInteger() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) .map(Message::of) .subscribe((Subscriber>) subscriber); @@ -83,7 +83,7 @@ public void testSinkUsingIntegerAndChannelName() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) .map(Message::of) .subscribe((Subscriber>) subscriber); @@ -104,7 +104,7 @@ public void testSinkUsingString() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) .map(i -> Integer.toString(i)) .map(Message::of) @@ -244,7 +244,7 @@ public void testInvalidPayloadType() { List acked = new CopyOnWriteArrayList<>(); List nacked = new CopyOnWriteArrayList<>(); - Subscriber subscriber = sink.getSink().build(); + Subscriber subscriber = sink.getSink(); Multi.createFrom().range(0, 6) .map(i -> { if (i == 3 || i == 5) { @@ -285,7 +285,7 @@ public void testInvalidTypeWithDefaultInflightMessages() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber subscriber = sink.getSink().build(); + Subscriber subscriber = sink.getSink(); Multi.createFrom().range(0, 5) .map(i -> { if (i == 3 || i == 5) { diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java index 818e8ed48b..79e39751cf 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.jboss.weld.exceptions.DeploymentException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Tag; @@ -149,13 +148,12 @@ public void testBroadcast() { connector.failureHandlerFactories = new SingletonInstance<>("fail", new KafkaFailStop.Factory()); connector.init(); - PublisherBuilder builder = (PublisherBuilder) connector - .getPublisherBuilder(config); + Multi multi = (Multi) connector.getPublisher(config); List messages1 = new ArrayList<>(); List messages2 = new ArrayList<>(); - builder.forEach(messages1::add).run(); - builder.forEach(messages2::add).run(); + multi.subscribe().with(messages1::add); + multi.subscribe().with(messages2::add); companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); @@ -190,13 +188,12 @@ public void testBroadcastWithPartitions() { connector.failureHandlerFactories = new SingletonInstance<>("fail", new KafkaFailStop.Factory()); connector.init(); - PublisherBuilder builder = (PublisherBuilder) connector - .getPublisherBuilder(config); + Multi multi = (Multi) connector.getPublisher(config); List messages1 = new ArrayList<>(); List messages2 = new ArrayList<>(); - builder.forEach(messages1::add).run(); - builder.forEach(messages2::add).run(); + multi.subscribe().with(messages1::add); + multi.subscribe().with(messages2::add); companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java index 7ff82a8601..b5040d87da 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.jboss.weld.exceptions.DeploymentException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Tag; @@ -154,13 +153,12 @@ public void testBroadcast() { connector.kafkaCDIEvents = testEvents; connector.init(); - PublisherBuilder builder = (PublisherBuilder) connector - .getPublisherBuilder(config); + Multi multi = (Multi) connector.getPublisher(config); List messages1 = new ArrayList<>(); List messages2 = new ArrayList<>(); - builder.forEach(messages1::add).run(); - builder.forEach(messages2::add).run(); + multi.subscribe().with(messages1::add); + multi.subscribe().with(messages2::add); companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); @@ -195,13 +193,12 @@ public void testBroadcastWithPartitions() { connector.failureHandlerFactories = new SingletonInstance<>("fail", new KafkaFailStop.Factory()); connector.init(); - PublisherBuilder builder = (PublisherBuilder) connector - .getPublisherBuilder(config); + Multi multi = (Multi) connector.getPublisher(config); List messages1 = new ArrayList<>(); List messages2 = new ArrayList<>(); - builder.forEach(messages1::add).run(); - builder.forEach(messages2::add).run(); + multi.subscribe().with(messages1::add); + multi.subscribe().with(messages2::add); companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java index 7431c5dcdf..80c60aeff1 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java @@ -68,8 +68,7 @@ public void testSendingStructuredCloudEvents() { .withId("some id") .build()); - Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + Multi.createFrom().> item(message).subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.count() == 1); @@ -115,7 +114,7 @@ public void testSendingStructuredCloudEventsWithComplexPayload() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.count() == 1); @@ -158,7 +157,7 @@ public void testSendingStructuredCloudEventsWithTimestampAndSubject() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.count() == 1); @@ -202,7 +201,7 @@ public void testSendingStructuredCloudEventsMissingMandatoryAttribute() { }); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> { HealthReport.HealthReportBuilder builder = HealthReport.builder(); @@ -245,7 +244,7 @@ public void testSendingStructuredCloudEventsWithKey() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.count() == 1); @@ -284,7 +283,7 @@ public void testSendingStructuredCloudEventsWithConfiguredTypeAndSource() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.count() == 1); @@ -320,7 +319,7 @@ public void testSendingStructuredCloudEventsWithConfiguredTypeAndSourceAndNoClou Message message = Message.of("hello!"); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -360,7 +359,7 @@ public void testSendingStructuredCloudEventsWithExtensions() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -399,7 +398,7 @@ public void testSendingBinaryCloudEvents() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -436,7 +435,7 @@ public void testSendingBinaryCloudEventsWithContentType() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -476,7 +475,7 @@ public void testSendingBinaryCloudEventsWithKey() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -514,7 +513,7 @@ public void testSendingBinaryCloudEventsWithConfiguredTypeAndSource() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -549,7 +548,7 @@ public void testSendingBinaryCloudEventsWithConfiguredTypeAndSourceButNoMetadata Message message = Message.of("hello!"); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -589,7 +588,7 @@ public void testSendingBinaryCloudEventsMissingMandatoryAttribute() { }); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> { HealthReport.HealthReportBuilder builder = HealthReport.builder(); @@ -617,7 +616,7 @@ public void testWithCloudEventDisabled() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -651,7 +650,7 @@ public void testSendingBinaryCloudEventsWithExtensions() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java index 615389011e..9b7cb4008b 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java @@ -57,7 +57,7 @@ void testLazyInitializedProducer() { assertThat(producer).isNotNull(); assertThat(producer.unwrap()).isNull(); - Subscriber> subscriber = (Subscriber>) sink.getSink().build(); + Subscriber> subscriber = (Subscriber>) sink.getSink(); await().untilAsserted(() -> { assertThat(getHealthReport(sink::isStarted).isOk()).isTrue(); assertThat(getHealthReport(sink::isReady).isOk()).isTrue(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java index c1532010aa..2d7c0db8c2 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java @@ -132,7 +132,7 @@ private void sharedProducerTest(int numberOfThreads, int numberOfMessagesPerThre Thread actualProducer = new Thread(() -> { Multi> merge = Multi.createBy().merging().streams(multis); - Subscriber> subscriber = (Subscriber>) createSink().getSink().build(); + Subscriber> subscriber = (Subscriber>) createSink().getSink(); merge.subscribe().withSubscriber(subscriber); }); threads.add(actualProducer); @@ -225,7 +225,7 @@ public void run() { emitter.complete(); }); - Subscriber> subscriber = (Subscriber>) createSink().getSink().build(); + Subscriber> subscriber = (Subscriber>) createSink().getSink(); stream.subscribe().withSubscriber(subscriber); } } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/MultiplePartitionsThrottledStrategyTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/MultiplePartitionsThrottledStrategyTest.java index 12d9b3371b..9e09a043b2 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/MultiplePartitionsThrottledStrategyTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/MultiplePartitionsThrottledStrategyTest.java @@ -123,7 +123,6 @@ public void testWithPartitionsBlocking() { } @Test - @Disabled public void testWithPartitionsStreamProcessor() { companion.topics().createAndWait(topic, 3); String sinkTopic = topic + "-sink"; diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java index 9321c7cf44..fb3cd4a1c5 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java @@ -48,7 +48,7 @@ public void testThatWhenNotSetKeySerializerIsString() { ConsumerTask consumed = companion.consumeStrings().fromTopics(topic, 4, Duration.ofSeconds(10)); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().items( Message.of(of("key", "value")), Message.of(of(null, "value")), Message.of(of("key", null)), Message.of(of(null, null))) @@ -77,7 +77,7 @@ public void testKeySerializationFailure() { .with("retries", 0L); sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); AtomicBoolean nacked = new AtomicBoolean(); Multi.createFrom().items( Message.of(of(125.25, new JsonObject().put("k", "v"))).withNack(t -> { @@ -96,7 +96,7 @@ public void testValueSerializationFailure() { .with("retries", 0L); sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); AtomicBoolean nacked = new AtomicBoolean(); Multi.createFrom().items( Message.of(of(new JsonObject().put("k", "v"), 125.25)).withNack(t -> { diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java index 4648eedc30..56f7928610 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java @@ -3,12 +3,15 @@ import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; import java.util.concurrent.CompletionStage; +import java.util.function.Function; import java.util.function.Supplier; import org.eclipse.microprofile.reactive.messaging.Acknowledgment; import org.eclipse.microprofile.reactive.messaging.Message; +import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -45,4 +48,40 @@ public static Multi> handlePreProcessingAcknowledgement(Mul })); } + @SuppressWarnings({ "unchecked" }) + public static Multi via(Multi multi, Processor processor) { + return multi.plug(stream -> Multi.createFrom().deferred(() -> { + Multi m = (Multi) MultiUtils.publisher(processor); + stream.subscribe(processor); + return m; + })); + } + + public static Subscriber via(Processor processor, Function, Multi

> function) { + return new MultiSubscriber<>() { + @Override + public void onSubscribe(Subscription subscription) { + processor.onSubscribe(subscription); + MultiUtils.publisher(processor).plug(function).subscribe().with(r -> { + // ignore + }); + } + + @Override + public void onItem(T item) { + processor.onNext(item); + } + + @Override + public void onFailure(Throwable throwable) { + processor.onError(throwable); + } + + @Override + public void onCompletion() { + processor.onComplete(); + } + }; + } + }