Reactive client for Apache Pulsar which is compatible with the Reactive Streams specification. This uses Project Reactor as the Reactive Streams implementation.
This library requires Java 8 or + to run.
With Gradle:
dependencies {
implementation "org.apache.pulsar:pulsar-client-reactive-adapter:0.5.10"
}
With Maven:
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-reactive-adapter</artifactId>
<version>0.5.10</version>
</dependency>
</dependencies>
While the above dependency is sufficient to obtain the Pulsar Reactive Java client, it is recommended to also use the provided BOM to ensure that all Pulsar dependencies are at the same expected version. In order to use the BOM, the previous directions are modified slightly as follows:
With Gradle:
def pulsarReactiveVersion = '0.5.10'
dependencies {
implementation enforcedPlatform("org.apache.pulsar:pulsar-client-reactive-bom:${pulsarVersion}")
implementation 'org.apache.pulsar:pulsar-client-reactive-adapter'
}
With Maven:
<!-- in your <properties> block -->
<pulsar-reactive.version>0.5.10</pulsar-reactive.version>
<!-- in your <dependencyManagement>/<dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-reactive-bom</artifactId>
<version>${pulsar-reactive.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-reactive-adapter</artifactId>
</dependency>
When using the Spring Boot Pulsar Reactive starter there is no need to directly specify the client dependency as described above because it will be automatically added to dependencies as a transitive dependency of the spring-boot-starter-pulsar-reactive
dependency.
The Spring Boot Dependency Version properties define pulsar-reactive.version
for controlling the Pulsar Java Reactive client version.
You can find more information about using Pulsar with Spring Boot in the Spring Boot documentation.
With Gradle:
// Alternatively, you can set the `pulsar-reactive.version` property in the `gradle.properties` file.
ext['pulsar-reactive.version'] = '0.5.10'
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-pulsar-reactive'
}
With Maven:
<!-- in your <properties> block -->
<pulsar-reactive.version>0.5.10</pulsar-reactive.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar-reactive</artifactId>
</dependency>
Note
|
In both Maven and Gradle cases above, the pulsar-reactive.version only needs to be specified when you want to override the version of the client that Spring Boot recommends.
|
ReactiveMessageSender<String> messageSender = reactivePulsarClient
.messageSender(Schema.STRING)
.topic(topicName)
.maxInflight(100)
.build();
Mono<MessageId> messageId = messageSender
.sendOne(MessageSpec.of("Hello world!"));
// for demonstration
messageId.subscribe(System.out::println);
By default, a ConcurrentHashMap based cache is used. It’s recommended to use a more advanced cache based on Caffeine. The cache will get used as the default implementation when it is on the classpath.
Adding Caffeine based producer cache with Gradle:
dependencies {
implementation "org.apache.pulsar:pulsar-client-reactive-adapter:0.5.10"
implementation "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine:0.5.10"
}
Adding Caffeine based producer cache with Maven:
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-reactive-adapter</artifactId>
<version>0.5.10</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-reactive-producer-cache-caffeine</artifactId>
<version>0.5.10</version>
</dependency>
</dependencies>
When using the BOM the above dependency version numbers can be omitted
Usage example of cache
ReactiveMessageSender<String> messageSender = reactivePulsarClient
.messageSender(Schema.STRING)
.cache(AdaptedReactivePulsarClientFactory.createCache())
.topic(topicName)
.maxInflight(100)
.build();
Mono<MessageId> messageId = messageSender
.sendOne(MessageSpec.of("Hello world!"));
// for demonstration
messageId.subscribe(System.out::println);
It is recommended to use a cached producer in most cases. The cache enables reusing the Pulsar Producer instance and related resources across multiple message sending calls. This improves performance since a producer won’t have to be created and closed before and after sending a message.
The adapter library implementation together with the cache implementation will also enable reactive backpressure for sending messages.
The maxInflight
setting will limit the number of messages that are pending from the client to the broker.
The solution will limit reactive streams subscription requests to keep the number of pending messages under the defined limit.
This limit is per-topic and impacts the local JVM only.
A version of the provider is available that shades it usage of Caffeine. This is useful in scenarios where there is another version of Caffeine required in your application or if you do not want Caffeine on the classpath.
Adding shaded Caffeine based producer cache with Gradle:
dependencies {
implementation "org.apache.pulsar:pulsar-client-reactive-adapter:0.5.10"
implementation "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine-shaded:0.5.10"
}
Adding shaded Caffeine based producer cache with Maven:
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-reactive-adapter</artifactId>
<version>0.5.10</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-reactive-producer-cache-caffeine-shaded</artifactId>
<version>0.5.10</version>
</dependency>
</dependencies>
When using the BOM the above dependency version numbers can be omitted
Reading all messages for a topic:
ReactiveMessageReader<String> messageReader =
reactivePulsarClient.messageReader(Schema.STRING)
.topic(topicName)
.build();
messageReader.readMany()
.map(Message::getValue)
// for demonstration
.subscribe(System.out::println);
By default, the stream will complete when the tail of the topic is reached.
With .endOfStreamAction(EndOfStreamAction.POLL)
the Reader will poll for new messages when the reader reaches the end of the topic.
ReactiveMessageReader<String> messageReader =
reactivePulsarClient.messageReader(Schema.STRING)
.topic(topicName)
.startAtSpec(StartAtSpec.ofLatest())
.endOfStreamAction(EndOfStreamAction.POLL)
.build();
messageReader.readMany()
.take(Duration.ofSeconds(5))
.take(5)
// for demonstration
.subscribe(System.out::println);
ReactiveMessageConsumer<String> messageConsumer=
reactivePulsarClient.messageConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("sub")
.build();
messageConsumer.consumeMany(messageFlux ->
messageFlux.map(message ->
MessageResult.acknowledge(message.getMessageId(), message.getValue())))
.take(Duration.ofSeconds(2))
// for demonstration
.subscribe(System.out::println);
ReactiveMessagePipeline reactiveMessagePipeline =
reactivePulsarClient
.messageConsumer(Schema.STRING)
.subscriptionName("sub")
.topic(topicName)
.build()
.messagePipeline()
.messageHandler(message -> Mono.fromRunnable(()->{
System.out.println(message.getValue());
}))
.build()
.start();
// for demonstration
// the reactive message handler is running in the background, delay for 10 seconds
Thread.sleep(10000L);
// now stop the message handler component
reactiveMessagePipeline.stop();
Reactive client for Apache Pulsar is Open Source Software released under the Apache Software License 2.0.
The library is Apache 2.0 licensed.
Contributions are welcome. Please discuss larger changes on the Apache Pulsar dev mailing list. There’s a contributing guide with more details.
If you detect a bug or have a feature request or a good idea for Reactive client for Apache Pulsar, please open a GitHub issue.
Please use [reactive-pulsar] tag on Stackoverflow. Ask a question now.