Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documentation #47

Merged
merged 10 commits into from
Feb 5, 2024
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

:summaryTableId: quarkus-solace-extension-common
Common configuration for Solace Quarkus Extension Incoming and Outgoing channels
Common configuration for Quarkus Solace Messaging Connector Incoming and Outgoing channels
[.configuration-reference.searchable, cols="80,.^10,.^10"]
|===

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

:summaryTableId: quarkus-solace-extension-incoming
Incoming configuration for Solace Quarkus Extension
Incoming configuration for Quarkus Solace Messaging Connector
[.configuration-reference.searchable, cols="80,.^10,.^10"]
|===

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

:summaryTableId: quarkus-solace-extension-outgoing
Outgoing configuration for Solace Quarkus Extension
Outgoing configuration for Quarkus Solace Messaging Connector
[.configuration-reference.searchable, cols="80,.^10,.^10"]
|===

Expand Down Expand Up @@ -111,7 +111,7 @@ Supported strategies `reject`, `elastic`, `wait`. Refer to `https://docs.solace.
// Environment variable: `+++QUARKUS_SOLACE_DEVSERVICES_SERVICE_NAME+++`
// endif::add-copy-button-to-env-var[]
--|string
|`wait`
|`elastic`


a| [[quarkus-solace_quarkus.producer.back-pressure.buffer-capacity]]`link:#quarkus-solace_quarkus.producer.back-pressure.buffer-capacity[producer.back-pressure.buffer-capacity]`
Expand Down
336 changes: 333 additions & 3 deletions docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,19 @@

include::./includes/attributes.adoc[]

TIP: Solace Quarkus Extension for integrating with Solace PubSub+ message brokers. The extension provides the ability to publish or consume events from event mesh.
== Introduction

The https://solace.com/products/platform/[Solace PubSub+ Platform]'s https://solace.com/products/event-broker/software/[software event broker] efficiently streams event-driven information between applications, IoT devices and user interfaces running in the cloud, on-premises, and hybrid environments using open APIs and protocols like AMQP, JMS, MQTT, REST and WebSocket. It can be installed into a variety of public and private clouds, PaaS, and on-premises environments, and brokers in multiple locations can be linked together in an https://solace.com/what-is-an-event-mesh/[event mesh] to dynamically share events across the distributed enterprise.

== Installation
== Quarkus Extension for Solace

Solace Quarkus Extension for integrating with Solace PubSub+ message brokers. The extension provides the ability to publish or consume events from event mesh.

Users have the choice to use the extension in two ways

{empty}1. `com.solace.quarkus:quarkus-solace-client`

This extension provides only Solace Java Messaging API and users need to have their own implementation and configuration to interact with Solace PubSub+ broker.

If you want to use this extension, you need to add the `com.solace.quarkus:quarkus-solace-client` extension first to your build file.

Expand All @@ -20,6 +29,23 @@ For instance, with Maven, add the following dependency to your POM file:
</dependency>
----

{empty}2. `com.solace.quarkus:quarkus-solace-messaging-connector`

This extension is based on reactive messaging framework and provides pre-defined configurations for incoming and outgoing channels.

If you want to use this extension, you need to add the `com.solace.quarkus:quarkus-solace-messaging-connector` extension first to your build file.

For instance, with Maven, add the following dependency to your POM file:

[source,xml,subs=attributes+]
----
<dependency>
<groupId>com.solace.quarkus</groupId>
<artifactId>quarkus-solace-messaging-connector</artifactId>
<version>{project-version}</version>
</dependency>
----

[[extension-configuration-reference]]
== Extension Configuration Reference

Expand All @@ -38,4 +64,308 @@ include::includes/quarkus-solace-extension-outgoing.adoc[leveloffset=+1, opts=op
[[extension-common-configuration-reference]]
== Common Configuration Reference

include::includes/quarkus-solace-extension-common.adoc[leveloffset=+1, opts=optional]
include::includes/quarkus-solace-extension-common.adoc[leveloffset=+1, opts=optional]

[[configuring-quarkus-solace-messaging-connector]]
== Configuring Quarkus Solace Messaging Connector

Reactive Messaging framework supports different messaging backends it employs a generic vocabulary:

* Applications send and receive messages. A message wraps a payload and can be extended with some metadata. With the Solace connector, a message corresponds to Inbound or Outbound Message.

* Messages transit on channels. Application components connect to channels to publish and consume messages. The Solace connector maps channels to Solace queues and topics.

* Channels are connected to message backends using connectors. Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. Each connector is dedicated to a specific messaging technology. For example, the connector dealing with Solace is named `quarkus-solace`.

A minimal configuration for the Solace connector with an incoming channel looks like the following:

The following lines of configuration assumes that a exclusive queue is already provisioned on the broker
[source,properties]
----
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=basic
quarkus.solace.authentication.basic.password=basic

mp.messaging.incoming.temperatures.connector=quarkus-solace
mp.messaging.incoming.temperatures.consumer.queue.name=temperatures
----

The extension also supports provisioning queues and subscriptions on broker given that the user has role access to create queues with subscriptions. Configuration is as follows

[source,properties]
----
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=basic
quarkus.solace.authentication.basic.password=basic

mp.messaging.incoming.temperatures.connector=quarkus-solace
mp.messaging.incoming.temperatures.consumer.queue.missing-resource-creation-strategy=create-on-start
mp.messaging.incoming.temperatures.consumer.queue.add-additional-subscriptions=true
mp.messaging.incoming.temperatures.consumer.queue.subscriptions=hello/foobar
----

1. When running in dev mode or tests dev services will automatically start a Solace PubSub+ broker and if broker configuration details are not provided the extension automatically picks up the details of broker started by dev services.

2. If `consumer.queue.name` property is not specified, channel name will be used as queue name.

[[receiving-messages-from-solace]]
== Receiving messages from Solace

Using the previous configuration, Quarkus application can receive message in several possible ways.

__Direct Payload__
[source,java]
----
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
SravanThotakura05 marked this conversation as resolved.
Show resolved Hide resolved
public void consume(Double temperature) {
// process.
}
}
----

__Message__
[source,java]
----
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
public CompletionStage<Void> consume(Message<Double> msg) {
// access record metadata
SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow();
// process the message payload.
Double temperature = msg.getPayload();
// Acknowledge the incoming message
return msg.ack();
}
}
----

SravanThotakura05 marked this conversation as resolved.
Show resolved Hide resolved
__SolaceInboundMessage__ This is a wrapper to incoming Inbound Message from Solace Messaging API
[source,java]
----
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
public void consume(SolaceInboundMessage<Double> solaceInboundMessage) {
// get actual inbound message
InboundMessage inboundMessage = solaceInboundMessage.getMessage();
// process the message payload.
Double temperature = solaceInboundMessage.getPayload();
// access record metadata
SolaceInboundMetadata metadata = solaceInboundMessage.getMetadata();
// ...
solaceInboundMessage.ack();
}
}
----

[[acknowledgement-handling]]
== Acknowledgment Handling

By default, acknowledgement strategy is set to client acknowledgement. This gives greater control over acknowledgement and ensures that messages are acknowledged only after successful processing.

[source,java]
----
@ApplicationScoped
public class TemperaturesConsumer {
@Incoming("temperatures")
SravanThotakura05 marked this conversation as resolved.
Show resolved Hide resolved
public CompletionStage<Void> consume(SolaceInboundMessage<Double> msg) {
// access record metadata
SolaceInboundMetadata metadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow();
// process the message payload.
Double temperature = msg.getPayload();
// Acknowledge the incoming message
return msg.ack();
}
}
----

[[failure-strategies]]
== Failure Strategies

If a message is nacked, a failure strategy is applied. Refer to <<extension-incoming-configuration-reference>><<quarkus-solace_quarkus.consumer.queue.failure-strategy>>. The default strategy is set to `ignore` and move on to next message. Following are the strategies supported by Quarkus Solace Messaging Connector extension.

`ignore` - Mark the message as IGNORED, will continue processing with next message. It TTL and DMQ are configured on the queue message will be moved to DMQ once TTL is reached. If no DMQ is configured but TTL is set message will be lost.

`fail` - Mark the message as FAILED, broker will redeliver the message. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version.

`discard` - Mark the message as REJECTED, broker will discard the message. The message will be moved to DMQ if DMQ is configured for queue and DMQ Eligible is set on message otherwise message will be lost. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version.

`error_topic` - Will publish the message to configured error topic, on success the message will be acknowledged in the queue.

[[sending-messages-to-solace]]
== Sending messages to Solace

Outgoing channel configuration to publish messages to Solace.

[source,properties]
----
quarkus.solace.host=tcp://localhost:55555
quarkus.solace.vpn=default
quarkus.solace.authentication.basic.username=basic
quarkus.solace.authentication.basic.password=basic

mp.messaging.incoming.temperatures-out.connector=quarkus-solace
mp.messaging.incoming.temperatures-out.producer.topic=temperatures
----

1. When running in dev mode or tests dev services will automatically start a Solace PubSub+ broker and if broker configuration details are not provided the extension automatically picks up the details of broker started by dev services.

2. If `producer.topic` property is not specified, channel name will be used as topic name.

Using the previous configuration Quarkus application can publish messages as follows

[source,java]
----
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class TemperaturesProducer {

private final Random random = new Random();

@Outgoing("temperatures-out")
public Multi<Double> generate() {
// Emit 1000 records
return Multi.createFrom().range(0, 1000)
.map(x -> random.nextDouble());
}

}
----

You can also generate a `org.eclipse.microprofile.reactive.messaging.Message` with required metadata and publish to Solace.

[source,java]
----
@ApplicationScoped
public class TemperaturesProducer {
private final Random random = new Random();

@Outgoing("temperatures-out")
Multi<Message<Double>> publishTemperatures() {
return Multi.createFrom().range(0, 1000)
.map(i -> {
SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
.setApplicationMessageId(Integer.toString(i)).createPubSubOutboundMetadata();
return Message.of(random.nextDouble(), Metadata.of(outboundMetadata));
});
}
}
----

*SolaceOutboundMetadata* allows to configure metadata for the message. It supports all the headers supported by Solace and custom user properties. In addition to this it also supports configuring dynamic topic which overrides the default topic in application configuration file.

Generating `org.eclipse.microprofile.reactive.messaging.Message` with dynamic topic and publish to Solace.

[source,java]
----
@ApplicationScoped
public class TemperaturesProducer {
private final Random random = new Random();

@Outgoing("temperatures-out")
Multi<Message<Double>> publishTemperatures() {
return Multi.createFrom().range(0, 1000)
.map(i -> {
SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
.setApplicationMessageId(Integer.toString(i))
.setDynamicDestination("device/" + Integer.toString(i) + "/temperature").createPubSubOutboundMetadata();
return Message.of(random.nextDouble(), Metadata.of(outboundMetadata));
});
}
}
----

Sending messages with __@Emitter__

[source,java]
----
@Path("/temperatures")
public class PublisherResource {

@Channel("temperatures-out")
MutinyEmitter<Double> temperatureEmitter;

@POST
@Path("/publish")
public Uni<Void> publish(Double temperature) {
return temperatureEmitter.send(person);
}
}
----

== Producer Back-Pressure strategies

Quarkus Solace Messaging connector provides three different strategies to handle back-pressure when publishing messages

{empty}1.Reject - Publisher will start rejecting messages once specified limit is reached

{empty}2.Wait - Publisher is throttled when a specified limit is reached

{empty}3.Elastic - Use an unlimited internal buffer (default)

ozangunalp marked this conversation as resolved.
Show resolved Hide resolved
CAUTION: In the current version we don't recommend to use back-pressure strategy `Reject` as it is in evolving phase.

Refer to <<extension-outgoing-configuration-reference>><<quarkus-solace_quarkus.producer.back-pressure.strategy>> and <<extension-outgoing-configuration-reference>><<quarkus-solace_quarkus.producer.back-pressure.buffer-capacity>> on how to configure back-pressure for producer.

[[processing-messages]]
== Processing Messages

Applications streaming data often need to consume some events from a topic, process them and publish the result to a different topic. A processor method can be simply implemented using both the *@Incoming* and *@Outgoing* annotations:

[source,java]
----
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class TemperaturesProcessor {

@Incoming("temperatures-in")
@Outgoing("temperatures-out")
public double process(double temperature) {
return (temperature - 32) * 5 / 9;
}

}
----

[[health-checks]]
== Health Checks

Quarkus provides several health checks for Solace. These checks are used in combination with the *quarkus-smallrye-health* extension.

=== Reactive Messaging Health Checks

When using Reactive Messaging and the Quarkus Solace Messaging Connector, each configured channel (incoming or outgoing) provides startup, liveness and readiness checks.

The startup check verifies that the communication with Solace Broker is established.

The liveness check captures any unrecoverable failure happening during the communication with Solace.

The readiness check verifies that the Quarkus Solace Messaging Connector is ready to consume/produce messages to the configured Solace queues/topics.

[[dev-services]]
Dev Services

Solace Dev Services for Quarkus will spin up latest version of Solace PubSub standard with label `solace` when running tests or in dev mode. Solace Dev Services are enabled by default and will check for any existing containers with same label to reuse. If none is present a new container is started.

[[metrics]]
== Metrics

Quarkus Solace Messaging Connector exposes different metrics provided by Solace Java Messaging API. The metrics are enabled by default and can be accessed at `http://localhost:8080/q/dev-ui/io.quarkus.quarkus-micrometer/prometheus`

Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
@ConnectorAttribute(name = "producer.waitForPublishReceipt", type = "boolean", direction = OUTGOING, description = "Whether the client waits to receive the publish receipt from Solace broker before acknowledging the message", defaultValue = "true")
@ConnectorAttribute(name = "producer.delivery.ack.timeout", type = "int", direction = OUTGOING, description = "Timeout to receive the publish receipt from broker.")
@ConnectorAttribute(name = "producer.delivery.ack.window.size", type = "int", direction = OUTGOING, description = "Publish Window will determine the maximum number of messages the application can send before the Solace API must receive an acknowledgment from the Solace.")
@ConnectorAttribute(name = "producer.back-pressure.strategy", type = "string", direction = OUTGOING, description = "It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "wait")
@ConnectorAttribute(name = "producer.back-pressure.strategy", type = "string", direction = OUTGOING, description = "It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "elastic")
@ConnectorAttribute(name = "producer.back-pressure.buffer-capacity", type = "int", direction = OUTGOING, description = "Outgoing messages backpressure buffer capacity", defaultValue = "1024")
public class SolaceConnector implements InboundConnector, OutboundConnector, HealthReporter {

Expand Down
Loading
Loading