Skip to content

Commit

Permalink
Move KafkaConnector to Inbound/Outbound Connector API
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Dec 6, 2022
1 parent edac6e5 commit 0a875ba
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 77 deletions.
15 changes: 13 additions & 2 deletions smallrye-reactive-messaging-kafka/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,18 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"code": "java.method.removed",
"old": "method org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder<? extends org.eclipse.microprofile.reactive.messaging.Message<?>> io.smallrye.reactive.messaging.kafka.KafkaConnector::getPublisherBuilder(org.eclipse.microprofile.config.Config)",
"justification": "KafkaConnector moved to InboundConnector API"
},
{
"code": "java.method.removed",
"old": "method org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder<? extends org.eclipse.microprofile.reactive.messaging.Message<?>, java.lang.Void> io.smallrye.reactive.messaging.kafka.KafkaConnector::getSubscriberBuilder(org.eclipse.microprofile.config.Config)",
"justification": "KafkaConnector moved to OutboundConnector API"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand All @@ -43,4 +54,4 @@
"minCriticality" : "documented",
"output" : "out"
}
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.health.HealthReporter;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
Expand Down Expand Up @@ -116,7 +114,7 @@
@ConnectorAttribute(name = "propagate-headers", direction = Direction.OUTGOING, description = "A comma-separating list of incoming record headers to be propagated to the outgoing record", type = "string", defaultValue = "")
@ConnectorAttribute(name = "key-serialization-failure-handler", type = "string", direction = Direction.OUTGOING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.SerializationFailureHandler`. If set, serialization failure happening when serializing keys are delegated to this handler which may provide a fallback value.")
@ConnectorAttribute(name = "value-serialization-failure-handler", type = "string", direction = Direction.OUTGOING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.SerializationFailureHandler`. If set, serialization failure happening when serializing values are delegated to this handler which may provide a fallback value.")
public class KafkaConnector implements IncomingConnectorFactory, OutgoingConnectorFactory, HealthReporter {
public class KafkaConnector implements InboundConnector, OutboundConnector, HealthReporter {

public static final String CONNECTOR_NAME = "smallrye-kafka";

Expand Down Expand Up @@ -171,7 +169,7 @@ void init() {
}

@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
public Publisher<? extends Message<?>> getPublisher(Config config) {
Config channelConfiguration = ConfigHelper.retrieveChannelConfiguration(configurations, config);

KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(channelConfiguration);
Expand Down Expand Up @@ -207,9 +205,9 @@ public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config)
stream = source.getBatchStream();
}
if (broadcast) {
return ReactiveStreams.fromPublisher(stream.broadcast().toAllSubscribers());
return stream.broadcast().toAllSubscribers();
} else {
return ReactiveStreams.fromPublisher(stream);
return stream;
}
}

Expand All @@ -235,14 +233,14 @@ public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config)
.streams(streams.toArray(new Publisher[0]));
boolean broadcast = ic.getBroadcast();
if (broadcast) {
return ReactiveStreams.fromPublisher(multi.broadcast().toAllSubscribers());
return multi.broadcast().toAllSubscribers();
} else {
return ReactiveStreams.fromPublisher(multi);
return multi;
}
}

@Override
public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
public Subscriber<? extends Message<?>> getSubscriber(Config config) {
Config channelConfiguration = ConfigHelper.retrieveChannelConfiguration(configurations, config);

KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(channelConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Subscriber;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
Expand All @@ -44,6 +43,7 @@
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.health.KafkaSinkHealth;
import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;

@SuppressWarnings("jol")
public class KafkaSink {
Expand All @@ -52,7 +52,7 @@ public class KafkaSink {
private final int partition;
private final String topic;
private final String key;
private final SubscriberBuilder<? extends Message<?>, Void> subscriber;
private final Subscriber<? extends Message<?>> subscriber;

private final long retries;
private final int deliveryTimeoutMs;
Expand Down Expand Up @@ -118,13 +118,10 @@ public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafk
}
this.processor = new KafkaSenderProcessor(requests, waitForWriteCompletion,
writeMessageToKafka());
this.subscriber = ReactiveStreams.<Message<?>> builder()
.via(processor)
.onError(f -> {
log.unableToDispatch(f);
reportFailure(f);
})
.ignore();
this.subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(f -> {
log.unableToDispatch(f);
reportFailure(f);
}));

}

Expand Down Expand Up @@ -310,7 +307,7 @@ private Object getKey(Message<?> message,
return key;
}

public SubscriberBuilder<? extends Message<?>, Void> getSink() {
public Subscriber<? extends Message<?>> getSink() {
return subscriber;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testSinkUsingInteger() {
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance());

Subscriber<? extends Message<?>> subscriber = sink.getSink().build();
Subscriber<? extends Message<?>> subscriber = sink.getSink();
Multi.createFrom().range(0, 10)
.map(Message::of)
.subscribe((Subscriber<? super Message<Integer>>) subscriber);
Expand All @@ -78,7 +78,7 @@ public void testSinkUsingIntegerAndChannelName() {
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance());

Subscriber<? extends Message<?>> subscriber = sink.getSink().build();
Subscriber<? extends Message<?>> subscriber = sink.getSink();
Multi.createFrom().range(0, 10)
.map(Message::of)
.subscribe((Subscriber<? super Message<Integer>>) subscriber);
Expand All @@ -99,7 +99,7 @@ public void testSinkUsingString() {
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance());

Subscriber<? extends Message<?>> subscriber = sink.getSink().build();
Subscriber<? extends Message<?>> subscriber = sink.getSink();
Multi.createFrom().range(0, 10)
.map(i -> Integer.toString(i))
.map(Message::of)
Expand Down Expand Up @@ -238,7 +238,7 @@ public void testInvalidPayloadType() {

List<Object> acked = new CopyOnWriteArrayList<>();
List<Object> nacked = new CopyOnWriteArrayList<>();
Subscriber subscriber = sink.getSink().build();
Subscriber subscriber = sink.getSink();
Multi.createFrom().range(0, 6)
.map(i -> {
if (i == 3 || i == 5) {
Expand Down Expand Up @@ -279,7 +279,7 @@ public void testInvalidTypeWithDefaultInflightMessages() {
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance());

Subscriber subscriber = sink.getSink().build();
Subscriber subscriber = sink.getSink();
Multi.createFrom().range(0, 5)
.map(i -> {
if (i == 3 || i == 5) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testSinkUsingInteger() {
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance());

Subscriber<? extends Message<?>> subscriber = sink.getSink().build();
Subscriber<? extends Message<?>> subscriber = sink.getSink();
Multi.createFrom().range(0, 10)
.map(Message::of)
.subscribe((Subscriber<? super Message<Integer>>) subscriber);
Expand All @@ -83,7 +83,7 @@ public void testSinkUsingIntegerAndChannelName() {
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance());

Subscriber<? extends Message<?>> subscriber = sink.getSink().build();
Subscriber<? extends Message<?>> subscriber = sink.getSink();
Multi.createFrom().range(0, 10)
.map(Message::of)
.subscribe((Subscriber<? super Message<Integer>>) subscriber);
Expand All @@ -104,7 +104,7 @@ public void testSinkUsingString() {
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance());

Subscriber<? extends Message<?>> subscriber = sink.getSink().build();
Subscriber<? extends Message<?>> subscriber = sink.getSink();
Multi.createFrom().range(0, 10)
.map(i -> Integer.toString(i))
.map(Message::of)
Expand Down Expand Up @@ -244,7 +244,7 @@ public void testInvalidPayloadType() {

List<Object> acked = new CopyOnWriteArrayList<>();
List<Object> nacked = new CopyOnWriteArrayList<>();
Subscriber subscriber = sink.getSink().build();
Subscriber subscriber = sink.getSink();
Multi.createFrom().range(0, 6)
.map(i -> {
if (i == 3 || i == 5) {
Expand Down Expand Up @@ -285,7 +285,7 @@ public void testInvalidTypeWithDefaultInflightMessages() {
KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config);
sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance());

Subscriber subscriber = sink.getSink().build();
Subscriber subscriber = sink.getSink();
Multi.createFrom().range(0, 5)
.map(i -> {
if (i == 3 || i == 5) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.jboss.weld.exceptions.DeploymentException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -149,13 +148,12 @@ public void testBroadcast() {
connector.failureHandlerFactories = new SingletonInstance<>("fail", new KafkaFailStop.Factory());
connector.init();

PublisherBuilder<? extends KafkaRecord> builder = (PublisherBuilder<? extends KafkaRecord>) connector
.getPublisherBuilder(config);
Multi<? extends KafkaRecord> multi = (Multi<? extends KafkaRecord>) connector.getPublisher(config);

List<KafkaRecord> messages1 = new ArrayList<>();
List<KafkaRecord> messages2 = new ArrayList<>();
builder.forEach(messages1::add).run();
builder.forEach(messages2::add).run();
multi.subscribe().with(messages1::add);
multi.subscribe().with(messages2::add);

companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10);

Expand Down Expand Up @@ -190,13 +188,12 @@ public void testBroadcastWithPartitions() {
connector.failureHandlerFactories = new SingletonInstance<>("fail", new KafkaFailStop.Factory());
connector.init();

PublisherBuilder<? extends KafkaRecord> builder = (PublisherBuilder<? extends KafkaRecord>) connector
.getPublisherBuilder(config);
Multi<? extends KafkaRecord> multi = (Multi<? extends KafkaRecord>) connector.getPublisher(config);

List<KafkaRecord> messages1 = new ArrayList<>();
List<KafkaRecord> messages2 = new ArrayList<>();
builder.forEach(messages1::add).run();
builder.forEach(messages2::add).run();
multi.subscribe().with(messages1::add);
multi.subscribe().with(messages2::add);

companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.jboss.weld.exceptions.DeploymentException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -154,13 +153,12 @@ public void testBroadcast() {
connector.kafkaCDIEvents = testEvents;
connector.init();

PublisherBuilder<? extends KafkaRecord> builder = (PublisherBuilder<? extends KafkaRecord>) connector
.getPublisherBuilder(config);
Multi<? extends KafkaRecord> multi = (Multi<? extends KafkaRecord>) connector.getPublisher(config);

List<KafkaRecord> messages1 = new ArrayList<>();
List<KafkaRecord> messages2 = new ArrayList<>();
builder.forEach(messages1::add).run();
builder.forEach(messages2::add).run();
multi.subscribe().with(messages1::add);
multi.subscribe().with(messages2::add);

companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10);

Expand Down Expand Up @@ -195,13 +193,12 @@ public void testBroadcastWithPartitions() {
connector.failureHandlerFactories = new SingletonInstance<>("fail", new KafkaFailStop.Factory());
connector.init();

PublisherBuilder<? extends KafkaRecord> builder = (PublisherBuilder<? extends KafkaRecord>) connector
.getPublisherBuilder(config);
Multi<? extends KafkaRecord> multi = (Multi<? extends KafkaRecord>) connector.getPublisher(config);

List<KafkaRecord> messages1 = new ArrayList<>();
List<KafkaRecord> messages2 = new ArrayList<>();
builder.forEach(messages1::add).run();
builder.forEach(messages2::add).run();
multi.subscribe().with(messages1::add);
multi.subscribe().with(messages2::add);

companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10);

Expand Down
Loading

0 comments on commit 0a875ba

Please sign in to comment.