Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client Side Tracing & OAuth implementation #66

Merged
merged 19 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions quarkus-solace-client/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-devservices-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-oidc-client-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@

import com.solace.messaging.MessagingService;
import com.solace.quarkus.MessagingServiceClientCustomizer;
import com.solace.quarkus.runtime.OidcProvider;
import com.solace.quarkus.runtime.SolaceConfig;
import com.solace.quarkus.runtime.SolaceRecorder;
import com.solace.quarkus.runtime.observability.SolaceMetricBinder;
import com.solacesystems.jcsmp.JCSMPFactory;

import io.quarkus.arc.SyntheticCreationalContext;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.arc.deployment.SyntheticBeansRuntimeInitBuildItem;
import io.quarkus.arc.deployment.UnremovableBeanBuildItem;
import io.quarkus.arc.deployment.*;
import io.quarkus.deployment.annotations.*;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
Expand All @@ -31,13 +30,14 @@
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;

class SolaceProcessor {

private static final String FEATURE = "solace-client";

private static final ParameterizedType SOLACE_CUSTOMIZER_INJECTION_TYPE = ParameterizedType.create(
DotName.createSimple(Instance.class),
new Type[] { ClassType.create(DotName.createSimple(MessagingServiceClientCustomizer.class.getName())) }, null);

private static final Type OIDC_PROVIDER = ClassType.create(DotName.createSimple(OidcProvider.class));

private static final AnnotationInstance[] EMPTY_ANNOTATIONS = new AnnotationInstance[0];

@BuildStep
Expand All @@ -59,21 +59,24 @@ ExtensionSslNativeSupportBuildItem ssl() {
@Record(ExecutionTime.RUNTIME_INIT)
ServiceStartBuildItem init(
SolaceConfig config, SolaceRecorder recorder,
ShutdownContextBuildItem shutdown, BuildProducer<SyntheticBeanBuildItem> syntheticBeans) {
ShutdownContextBuildItem shutdown, BuildProducer<SyntheticBeanBuildItem> syntheticBeans,
BuildProducer<AdditionalBeanBuildItem> additionalBeanBuildItemBuildProducer) {

Function<SyntheticCreationalContext<MessagingService>, MessagingService> function = recorder.init(config, shutdown);

additionalBeanBuildItemBuildProducer.produce(AdditionalBeanBuildItem.unremovableOf(OidcProvider.class));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ozangunalp is there a way we can produce this bean only if oidc properties are configured by user. I see some warnings/errors thrown by oidc-client library when no configuration is available(in case of basic authentication or any other type of authentication). This is not blocking message flow.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the easier option is to make it do nothing when oauth authentication is not configured.


SyntheticBeanBuildItem.ExtendedBeanConfigurator solaceConfigurator = SyntheticBeanBuildItem
.configure(MessagingService.class)
.defaultBean()
.scope(ApplicationScoped.class)
.addInjectionPoint(SOLACE_CUSTOMIZER_INJECTION_TYPE, EMPTY_ANNOTATIONS)
.addInjectionPoint(OIDC_PROVIDER)
.createWith(function)
.unremovable()
.setRuntimeInit();

syntheticBeans.produce(solaceConfigurator.done());

return new ServiceStartBuildItem(FEATURE);
}

Expand Down
5 changes: 4 additions & 1 deletion quarkus-solace-client/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
<groupId>com.solace</groupId>
<artifactId>solace-messaging-client</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-oidc-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-micrometer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.solace.quarkus.runtime;

import java.time.Duration;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import com.solace.messaging.MessagingService;
import com.solace.messaging.config.SolaceProperties;

import io.quarkus.oidc.client.OidcClient;
import io.quarkus.oidc.client.Tokens;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;

@ApplicationScoped
public class OidcProvider {

@ConfigProperty(name = "quarkus.solace.oidc.refresh.interval", defaultValue = "60s")
Duration duration;

@Inject
OidcClient client;

private volatile Tokens lastToken;
private MessagingService service;

Tokens getToken() {
Tokens firstToken = client.getTokens().await().indefinitely();
lastToken = firstToken;
return firstToken;
}

void init(MessagingService service) {
this.service = service;
}

void startup(@Observes StartupEvent event) {
Multi.createFrom().ticks().every(duration)
.emitOn(Infrastructure.getDefaultWorkerPool())
// .filter(aLong -> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented it as we will not receive refresh token in client_credentials OAuth flow

// if (lastToken.isAccessTokenWithinRefreshInterval()) {
// return true;
// } else
// return false;
// })
.call(() -> client.getTokens().invoke(tokens -> {
lastToken = tokens;
}))
.invoke(() -> service.updateProperty(SolaceProperties.AuthenticationProperties.SCHEME_OAUTH2_ACCESS_TOKEN,
lastToken.getAccessToken()))
.subscribe().with(aLong -> {
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@ public MessagingService apply(SyntheticCreationalContext<MessagingService> conte
}
}

MessagingServiceClientBuilder builder = MessagingService.builder(ConfigurationProfile.V1)
.fromProperties(properties);

Instance<MessagingServiceClientCustomizer> reference = context.getInjectedReference(CUSTOMIZER);
OidcProvider oidcProvider = context.getInjectedReference(OidcProvider.class);

String authScheme = config.extra().get("authentication.scheme");

if (oidcProvider != null && authScheme != null && authScheme.equals("AUTHENTICATION_SCHEME_OAUTH2")) {
properties.put(SolaceProperties.AuthenticationProperties.SCHEME_OAUTH2_ACCESS_TOKEN,
oidcProvider.getToken().getAccessToken());
}

MessagingServiceClientBuilder builder = MessagingService.builder(ConfigurationProfile.V1)
.fromProperties(properties);
MessagingService service;
if (reference.isUnsatisfied()) {
service = builder.build();
Expand All @@ -54,12 +61,14 @@ public MessagingService apply(SyntheticCreationalContext<MessagingService> conte
}
}

oidcProvider.init(service);
var tmp = service;
shutdown.addLastShutdownTask(() -> {
if (tmp.isConnected()) {
tmp.disconnect();
}
});

return service.connect();
}
};
Expand Down
12 changes: 12 additions & 0 deletions quarkus-solace-messaging-connector/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-connector-attribute-processor</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-otel</artifactId>
<version>4.16.0</version>
</dependency>
<dependency>
<groupId>com.solace</groupId>
<artifactId>solace-messaging-client</artifactId>
Expand Down Expand Up @@ -132,6 +137,13 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.keycloak</groupId>
<artifactId>keycloak-admin-client</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
//@ConnectorAttribute(name = "client.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Direct or persisted", defaultValue = "persisted")
@ConnectorAttribute(name = "client.lazy.start", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether the receiver or publisher is started at initialization or lazily at subscription time", defaultValue = "false")
@ConnectorAttribute(name = "client.graceful-shutdown", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to shutdown client gracefully", defaultValue = "true")
@ConnectorAttribute(name = "client.tracing-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to enable tracing for incoming and outgoing messages", defaultValue = "false")
@ConnectorAttribute(name = "client.graceful-shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000")
@ConnectorAttribute(name = "consumer.queue.name", type = "string", direction = INCOMING, description = "The queue name of receiver.")
@ConnectorAttribute(name = "consumer.queue.type", type = "string", direction = INCOMING, description = "The queue type of receiver. Supported values `durable-exclusive`, `durable-non-exclusive`, `non-durable-exclusive`", defaultValue = "durable-exclusive")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
Expand All @@ -22,13 +20,16 @@
import com.solace.messaging.config.MissingResourcesCreationConfiguration.MissingResourcesCreationStrategy;
import com.solace.messaging.config.ReceiverActivationPassivationConfiguration;
import com.solace.messaging.config.ReplayStrategy;
import com.solace.messaging.config.SolaceConstants;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.PersistentMessageReceiver;
import com.solace.messaging.resources.Queue;
import com.solace.messaging.resources.TopicSubscription;
import com.solace.quarkus.messaging.SolaceConnectorIncomingConfiguration;
import com.solace.quarkus.messaging.fault.*;
import com.solace.quarkus.messaging.i18n.SolaceLogging;
import com.solace.quarkus.messaging.tracing.SolaceOpenTelemetryInstrumenter;
import com.solace.quarkus.messaging.tracing.SolaceTrace;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand All @@ -44,13 +45,14 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi
private final SolaceAckHandler ackHandler;
private final SolaceFailureHandler failureHandler;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean alive = new AtomicBoolean(false);
private final AtomicBoolean alive = new AtomicBoolean(true);
private final PersistentMessageReceiver receiver;
private final Flow.Publisher<? extends Message<?>> stream;
private final ExecutorService pollerThread;
private final boolean gracefulShutdown;
private final long gracefulShutdownWaitTimeout;
private final List<Throwable> failures = new ArrayList<>();
private final SolaceOpenTelemetryInstrumenter solaceOpenTelemetryInstrumenter;
private volatile MessagingService solace;

// Assuming we won't ever exceed the limit of an unsigned long...
Expand Down Expand Up @@ -107,19 +109,55 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i

// TODO Here use a subscription receiver.receiveAsync with an internal queue
this.pollerThread = Executors.newSingleThreadExecutor();
this.stream = Multi.createBy().repeating()

Multi<? extends Message<?>> incomingMulti = Multi.createBy().repeating()
.uni(() -> Uni.createFrom().item(receiver::receiveMessage)
.runSubscriptionOn(pollerThread))
.until(__ -> closed.get())
.emitOn(context::runOnContext)
.map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler,
unacknowledgedMessageTracker, this::reportFailure))
.plug(m -> lazyStart
? m.onSubscription()
.call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
: m)
unacknowledgedMessageTracker, this::reportFailure));

if (ic.getClientTracingEnabled()) {
solaceOpenTelemetryInstrumenter = SolaceOpenTelemetryInstrumenter.createForIncoming();
incomingMulti = incomingMulti.map(message -> {
InboundMessage consumedMessage = message.getMetadata(SolaceInboundMetadata.class).get().getMessage();
Map<String, String> messageProperties = new HashMap<>();

messageProperties.put("messaging.solace.replication_group_message_id",
consumedMessage.getReplicationGroupMessageId().toString());
messageProperties.put("messaging.solace.priority", Integer.toString(consumedMessage.getPriority()));
if (consumedMessage.getProperties().size() > 0) {
messageProperties.putAll(consumedMessage.getProperties());
}
SolaceTrace solaceTrace = new SolaceTrace.Builder()
.withDestinationKind("queue")
.withTopic(consumedMessage.getDestinationName())
.withMessageID(consumedMessage.getApplicationMessageId())
.withCorrelationID(consumedMessage.getCorrelationId())
.withPartitionKey(
consumedMessage
.hasProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)
? consumedMessage
.getProperty(
SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)
: null)
.withPayloadSize(Long.valueOf(consumedMessage.getPayloadAsBytes().length))
.withProperties(messageProperties)
.build();
return solaceOpenTelemetryInstrumenter.traceIncoming(message, solaceTrace, true);
});
} else {
solaceOpenTelemetryInstrumenter = null;
}

this.stream = incomingMulti.plug(m -> lazyStart
? m.onSubscription()
.call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
: m)
.onItem().invoke(() -> alive.set(true))
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke(this::reportFailure);

if (!lazyStart) {
receiver.start();
}
Expand Down
Loading
Loading