From 9ed9d0f6478c3fd72dd1979eea23fb5541b96cdb Mon Sep 17 00:00:00 2001 From: michelle Date: Thu, 9 Mar 2023 11:23:04 +0100 Subject: [PATCH] #3471 Add Pub/Sub subscriber functionality This PR is a prerequisite in order to use only Pub/Sub as messaging infrastructure. As part of this architecture we added a Pub/Sub based subscriber functionality. Added new package for the new subscriber functionality. Used Google's Subscriber to subscribe messages from Google Pub/Sub. Moved Pub/Sub publisher functionality to publisher package. Added unittests. Update documentation. Signed-off-by: michelle --- .../AbstractProtocolAdapterApplication.java | 29 +++-- .../AbstractPubSubBasedMessageSender.java | 2 + .../client/pubsub/PubSubMessageHelper.java | 61 ++++++++++ .../CachingPubSubPublisherFactory.java | 26 ++-- .../PubSubPublisherClient.java | 2 +- .../PubSubPublisherClientImpl.java | 2 +- .../PubSubPublisherFactory.java | 16 +-- .../CachingPubSubSubscriberFactory.java | 109 +++++++++++++++++ .../pubsub/subscriber/PubSubSubscriber.java | 115 ++++++++++++++++++ .../subscriber/PubSubSubscriberFactory.java | 68 +++++++++++ .../AbstractPubSubBasedMessageSenderTest.java | 2 + .../pubsub/PubSubMessageHelperTest.java | 36 ++++++ .../CachingPubSubPublisherFactoryTest.java | 7 +- .../CachingPubSubSubscriberFactoryTest.java | 94 ++++++++++++++ .../pubsub/PubSubBasedDownstreamSender.java | 2 +- .../PubSubBasedDownstreamSenderTest.java | 4 +- .../content/admin-guide/pubsub-config.md | 9 +- 17 files changed, 538 insertions(+), 46 deletions(-) create mode 100644 clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java rename clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/{ => publisher}/CachingPubSubPublisherFactory.java (83%) rename clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/{ => publisher}/PubSubPublisherClient.java (94%) rename clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/{ => publisher}/PubSubPublisherClientImpl.java (99%) rename clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/{ => publisher}/PubSubPublisherFactory.java (84%) create mode 100644 clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactory.java create mode 100644 clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/PubSubSubscriber.java create mode 100644 clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/PubSubSubscriberFactory.java create mode 100644 clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/PubSubMessageHelperTest.java rename clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/{ => publisher}/CachingPubSubPublisherFactoryTest.java (93%) create mode 100644 clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactoryTest.java diff --git a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java index 40221f345e..b7b900a549 100644 --- a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java +++ b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java @@ -59,10 +59,11 @@ import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions; import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties; import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties; -import org.eclipse.hono.client.pubsub.CachingPubSubPublisherFactory; import org.eclipse.hono.client.pubsub.PubSubConfigProperties; -import org.eclipse.hono.client.pubsub.PubSubPublisherFactory; +import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import org.eclipse.hono.client.pubsub.PubSubPublisherOptions; +import org.eclipse.hono.client.pubsub.publisher.CachingPubSubPublisherFactory; +import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory; import org.eclipse.hono.client.registry.CredentialsClient; import org.eclipse.hono.client.registry.DeviceRegistrationClient; import org.eclipse.hono.client.registry.TenantClient; @@ -92,6 +93,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.api.gax.core.FixedCredentialsProvider; import io.smallrye.config.ConfigMapping; import io.vertx.core.CompositeFuture; @@ -372,16 +374,19 @@ protected void setCollaborators(final AbstractProtocolAdapterBase adapter) { protocolAdapterProperties.isJmsVendorPropsEnabled())); } if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured()) { - LOG.info("Pub/Sub client configuration present, adding Pub/Sub messaging clients"); - - final var pubSubFactory = new CachingPubSubPublisherFactory( - vertx, - pubSubConfigProperties.getProjectId(), - null); - - telemetrySenderProvider - .setClient(pubSubDownstreamSender(pubSubFactory, TelemetryConstants.TELEMETRY_ENDPOINT)); - eventSenderProvider.setClient(pubSubDownstreamSender(pubSubFactory, EventConstants.EVENT_ENDPOINT)); + final Optional credentialsProvider = PubSubMessageHelper.getCredentialsProvider(); + if (credentialsProvider.isPresent()) { + LOG.info("Pub/Sub client configuration present, adding Pub/Sub messaging clients"); + + final var pubSubFactory = new CachingPubSubPublisherFactory( + vertx, + pubSubConfigProperties.getProjectId(), + credentialsProvider.get()); + + telemetrySenderProvider + .setClient(pubSubDownstreamSender(pubSubFactory, TelemetryConstants.TELEMETRY_ENDPOINT)); + eventSenderProvider.setClient(pubSubDownstreamSender(pubSubFactory, EventConstants.EVENT_ENDPOINT)); + } } final MessagingClientProviders messagingClientProviders = new MessagingClientProviders( diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSender.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSender.java index 333d5c69c2..94c10fe4d2 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSender.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSender.java @@ -20,6 +20,8 @@ import java.util.stream.Collectors; import org.eclipse.hono.client.ServerErrorException; +import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherClient; +import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory; import org.eclipse.hono.client.util.ServiceClient; import org.eclipse.hono.tracing.TracingHelper; import org.eclipse.hono.util.Lifecycle; diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java new file mode 100644 index 0000000000..8020fe9b6f --- /dev/null +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java @@ -0,0 +1,61 @@ +/** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * https://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hono.client.pubsub; + +import java.io.IOException; +import java.util.Optional; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; + +/** + * Utility methods for working with Pub/Sub. + */ +public final class PubSubMessageHelper { + + private PubSubMessageHelper() { + } + + /** + * Gets the provider for credentials to use for authenticating to the Pub/Sub service. + * + * @return An optional containing a FixedCredentialsProvider to use for authenticating to the Pub/Sub service or an + * empty optional if the given GoogleCredentials is {@code null}. + */ + public static Optional getCredentialsProvider() { + return Optional.ofNullable(getCredentials()) + .map(FixedCredentialsProvider::create); + } + + private static GoogleCredentials getCredentials() { + try { + return GoogleCredentials.getApplicationDefault() + .createScoped(PublisherStubSettings.getDefaultServiceScopes()); + } catch (IOException e) { + return null; + } + } + + /** + * Gets the topic name with the given prefix. + * + * @param topic The endpoint of the topic (e.g. event) + * @param prefix The prefix of the Pub/Sub topic, it's either the tenant ID or the adapter instance ID + * @return The topic containing the prefix identifier and the endpoint. + */ + public static String getTopicName(final String topic, final String prefix) { + return String.format("%s.%s", prefix, topic); + } + +} diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/CachingPubSubPublisherFactory.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactory.java similarity index 83% rename from clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/CachingPubSubPublisherFactory.java rename to clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactory.java index 97087c1e94..b9c50f1ea8 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/CachingPubSubPublisherFactory.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactory.java @@ -10,7 +10,7 @@ * * SPDX-License-Identifier: EPL-2.0 */ -package org.eclipse.hono.client.pubsub; +package org.eclipse.hono.client.pubsub.publisher; import java.net.HttpURLConnection; import java.util.Map; @@ -20,6 +20,7 @@ import java.util.function.Supplier; import org.eclipse.hono.client.ServerErrorException; +import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import com.google.api.gax.core.CredentialsProvider; @@ -42,9 +43,8 @@ public final class CachingPubSubPublisherFactory implements PubSubPublisherFacto * * @param vertx The Vert.x instance that this factory runs on. * @param projectId The identifier of the Google Cloud Project to connect to. - * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service - * or {@code null} if the default provider should be used. - * @throws NullPointerException if project ID is {@code null}. + * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. + * @throws NullPointerException if any of the parameter is {@code null}. */ public CachingPubSubPublisherFactory( final Vertx vertx, @@ -52,7 +52,7 @@ public CachingPubSubPublisherFactory( final CredentialsProvider credentialsProvider) { this.vertx = Objects.requireNonNull(vertx); this.projectId = Objects.requireNonNull(projectId); - this.credentialsProvider = credentialsProvider; + this.credentialsProvider = Objects.requireNonNull(credentialsProvider); } /** @@ -67,8 +67,8 @@ public void setClientSupplier(final Supplier supplier) { } @Override - public Future closePublisher(final String topic, final String tenantId) { - final String topicName = getTopicTenantName(topic, tenantId); + public Future closePublisher(final String topic, final String prefix) { + final String topicName = PubSubMessageHelper.getTopicName(topic, prefix); return removePublisher(topicName); } @@ -83,15 +83,15 @@ public Future closeAllPublisher() { } @Override - public PubSubPublisherClient getOrCreatePublisher(final String topic, final String tenantId) { - final String topicName = getTopicTenantName(topic, tenantId); + public PubSubPublisherClient getOrCreatePublisher(final String topic, final String prefix) { + final String topicName = PubSubMessageHelper.getTopicName(topic, prefix); return activePublishers.computeIfAbsent(topicName, s -> getPubSubPublisherClient(projectId, topicName)); } @Override - public Optional getPublisher(final String topic, final String tenantId) { - final String topicTenantName = getTopicTenantName(topic, tenantId); + public Optional getPublisher(final String topic, final String prefix) { + final String topicTenantName = PubSubMessageHelper.getTopicName(topic, prefix); return Optional.ofNullable(activePublishers.get(topicTenantName)); } @@ -101,10 +101,6 @@ private PubSubPublisherClient getPubSubPublisherClient(final String projectId, f .orElseGet(() -> new PubSubPublisherClientImpl(vertx, projectId, topic, credentialsProvider)); } - private String getTopicTenantName(final String topic, final String tenantId) { - return String.format("%s.%s", tenantId, topic); - } - private Future removePublisher(final String topicName) { final var publisher = activePublishers.remove(topicName); if (publisher != null) { diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherClient.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherClient.java similarity index 94% rename from clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherClient.java rename to clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherClient.java index 215f108d0a..7d37f2c868 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherClient.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherClient.java @@ -10,7 +10,7 @@ * * SPDX-License-Identifier: EPL-2.0 */ -package org.eclipse.hono.client.pubsub; +package org.eclipse.hono.client.pubsub.publisher; import com.google.pubsub.v1.PubsubMessage; diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherClientImpl.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherClientImpl.java similarity index 99% rename from clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherClientImpl.java rename to clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherClientImpl.java index 947749b6a5..c83ca80004 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherClientImpl.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherClientImpl.java @@ -10,7 +10,7 @@ * * SPDX-License-Identifier: EPL-2.0 */ -package org.eclipse.hono.client.pubsub; +package org.eclipse.hono.client.pubsub.publisher; import java.io.IOException; import java.net.HttpURLConnection; diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherFactory.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherFactory.java similarity index 84% rename from clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherFactory.java rename to clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherFactory.java index 533dba1203..fd6ae6d15f 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubPublisherFactory.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/publisher/PubSubPublisherFactory.java @@ -10,7 +10,7 @@ * * SPDX-License-Identifier: EPL-2.0 */ -package org.eclipse.hono.client.pubsub; +package org.eclipse.hono.client.pubsub.publisher; import java.util.Optional; @@ -27,11 +27,11 @@ public interface PubSubPublisherFactory { * This method is expected to be invoked as soon as the publisher is no longer needed. * * @param topic The topic of the publisher to remove. - * @param tenantId The tenantId of the publisher to remove. + * @param prefix The prefix of the topic of the publisher to remove, e.g. the tenantId. * @return A future that is completed when the close operation completed or a succeeded future if no publisher * existed with the given topic. */ - Future closePublisher(String topic, String tenantId); + Future closePublisher(String topic, String prefix); /** * Closes all cached publisher. @@ -53,18 +53,18 @@ public interface PubSubPublisherFactory { *

* * @param topic The topic to create the publisher for. - * @param tenantId The tenantId to use. + * @param prefix The prefix of the topic of the publisher to remove, e.g. the tenantId. * @return an existing or new publisher. */ - PubSubPublisherClient getOrCreatePublisher(String topic, String tenantId); + PubSubPublisherClient getOrCreatePublisher(String topic, String prefix); /** * Gets an existing Publisher for sending data to Pub/Sub if one was already created with the given topicName and - * TenantId. + * prefix. * * @param topic The topic to identify the publisher. - * @param tenantId The tenantId to identify the publisher. + * @param prefix The prefix of the topic to identify the publisher, e.g. the tenantId. * @return An existing publisher or an empty Optional if no such publisher exists. */ - Optional getPublisher(String topic, String tenantId); + Optional getPublisher(String topic, String prefix); } diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactory.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactory.java new file mode 100644 index 0000000000..99709593ae --- /dev/null +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactory.java @@ -0,0 +1,109 @@ +/** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hono.client.pubsub.subscriber; + +import java.net.HttpURLConnection; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +import org.eclipse.hono.client.ServerErrorException; +import org.eclipse.hono.client.pubsub.PubSubMessageHelper; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.cloud.pubsub.v1.MessageReceiver; + +import io.vertx.core.Future; + +/** + * A factory for creating PubSubSubscribers. Created subscribers are being cached. + */ +public class CachingPubSubSubscriberFactory implements PubSubSubscriberFactory { + + private final Map activeSubscribers = new ConcurrentHashMap<>(); + private final String projectId; + private final FixedCredentialsProvider credentialsProvider; + private Supplier clientSupplier; + + /** + * Creates a new factory for {@link PubSubSubscriber} instances. + * + * @param projectId The identifier of the Google Cloud Project to connect to. + * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. + * @throws NullPointerException If any of the parameters is {@code null}. + */ + public CachingPubSubSubscriberFactory(final String projectId, final FixedCredentialsProvider credentialsProvider) { + this.projectId = Objects.requireNonNull(projectId); + this.credentialsProvider = Objects.requireNonNull(credentialsProvider); + } + + /** + * Sets a supplier for the subscriber(s) this factory creates. + *

+ * This method is mainly intended to be used in test cases. + * + * @param supplier The supplier. + */ + public void setClientSupplier(final Supplier supplier) { + this.clientSupplier = supplier; + } + + @Override + public Future closeSubscriber(final String subscription, final String prefix) { + final String subscriptionId = PubSubMessageHelper.getTopicName(subscription, prefix); + return removeSubscriber(subscriptionId); + } + + @Override + public Future closeAllSubscriber() { + activeSubscribers.forEach((k, v) -> removeSubscriber(k)); + if (activeSubscribers.isEmpty()) { + return Future.succeededFuture(); + } + return Future.failedFuture( + new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE, "Failed to close all subscriber")); + } + + @Override + public PubSubSubscriber getOrCreateSubscriber(final String subscriptionId, final MessageReceiver receiver) { + return activeSubscribers.computeIfAbsent(subscriptionId, + s -> createPubSubSubscriber(subscriptionId, receiver)); + } + + @Override + public Optional getSubscriber(final String subscription, final String prefix) { + final String subscriptionId = PubSubMessageHelper.getTopicName(subscription, prefix); + return Optional.ofNullable(activeSubscribers.get(subscriptionId)); + } + + private PubSubSubscriber createPubSubSubscriber(final String subscriptionId, + final MessageReceiver receiver) { + return Optional.ofNullable(clientSupplier) + .map(Supplier::get) + .orElseGet(() -> new PubSubSubscriber(projectId, subscriptionId, receiver, credentialsProvider)); + } + + private Future removeSubscriber(final String subscriptionId) { + final var subscriber = activeSubscribers.remove(subscriptionId); + if (subscriber != null) { + try { + subscriber.close(); + } catch (final Exception e) { + // ignore , since there is nothing we can do about it + } + } + return Future.succeededFuture(); + } +} diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/PubSubSubscriber.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/PubSubSubscriber.java new file mode 100644 index 0000000000..0024567d95 --- /dev/null +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/PubSubSubscriber.java @@ -0,0 +1,115 @@ +/** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * https://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hono.client.pubsub.subscriber; + +import java.net.HttpURLConnection; +import java.util.Objects; + +import org.eclipse.hono.client.ServerErrorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.pubsub.v1.ProjectSubscriptionName; + +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Vertx; + +/** + * A client for receiving messages from Pub/Sub. + *

+ * Wraps a Pub/Sub Subscriber. + *

+ */ +public class PubSubSubscriber implements AutoCloseable { + + private final Logger log = LoggerFactory.getLogger(PubSubSubscriber.class); + private final Subscriber subscriber; + + /** + * Creates a new instance of PubSubSubscriberClient where a Pub/Sub Subscriber is initialized. The Subscriber is + * based on a created subscription, which follows the format: projects/{project}/subscriptions/{subscription} + * + * @param projectId The identifier of the Google Cloud Project to connect to. + * @param subscriptionId The name of the subscription to create the subscriber for. + * @param receiver The message receiver used to process the received message. + * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. + * @throws NullPointerException If any of these parameters is {@code null}. + */ + public PubSubSubscriber( + final String projectId, + final String subscriptionId, + final MessageReceiver receiver, + final FixedCredentialsProvider credentialsProvider) { + Objects.requireNonNull(projectId); + Objects.requireNonNull(subscriptionId); + Objects.requireNonNull(receiver); + Objects.requireNonNull(credentialsProvider); + + final ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); + this.subscriber = Subscriber + .newBuilder(subscriptionName, receiver) + .setCredentialsProvider(credentialsProvider) + .build(); + } + + /** + * Subscribes messages from Pub/Sub. + * + * @return A future indicating the outcome of the operation. + * @throws ServerErrorException If subscribing was not successful. + */ + public Future subscribe() { + try { + subscriber.addListener( + new Subscriber.Listener() { + + @Override + public void failed(final Subscriber.State from, final Throwable failure) { + log.error("Error subscribing message from Pub/Sub", failure); + throw new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE, + "Error subscribing message from Pub/Sub", failure); + } + }, + MoreExecutors.directExecutor()); + subscriber.startAsync().awaitRunning(); + return Future.succeededFuture(); + } catch (IllegalStateException e) { + log.error("Service reached illegal state", e); + return Future.failedFuture(e); + } + } + + /** + * Shuts the subscriber down and frees resources. + */ + @Override + public void close() { + final Context currentContext = Vertx.currentContext(); + if (currentContext == null) { + throw new IllegalStateException("Client is not running on a Vert.x Context"); + } else { + currentContext.executeBlocking(blockingHandler -> { + if (subscriber != null) { + subscriber.stopAsync(); + blockingHandler.complete(); + } + }); + } + } + +} diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/PubSubSubscriberFactory.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/PubSubSubscriberFactory.java new file mode 100644 index 0000000000..b0e4d01c1a --- /dev/null +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/subscriber/PubSubSubscriberFactory.java @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hono.client.pubsub.subscriber; + +import java.util.Optional; + +import com.google.cloud.pubsub.v1.MessageReceiver; + +import io.vertx.core.Future; + +/** + * A factory for creating Pub/Sub subscribers scoped to a Google Cloud Project. + */ +public interface PubSubSubscriberFactory { + + /** + * Closes the subscriber with the given topic if it exists. + *

+ * This method is expected to be invoked as soon as the subscriber is no longer needed. + * + * @param subscription The subscription of the subscriber to remove. + * @param prefix The prefix of the topic of the subscriber to remove, e.g. the tenantId. + * @return A future that is completed when the close operation completed or a succeeded future if no subscriber + * existed with the given topic. + */ + Future closeSubscriber(String subscription, String prefix); + + /** + * Closes all cached subscriber. This method is expected to be invoked especially before the application shuts down. + * + * @return A future that is succeeded when all subscriber are closed or a failed future if any subscriber can not be + * closed. + */ + Future closeAllSubscriber(); + + /** + * Gets a subscriber for receiving data from Pub/Sub. + *

+ * The subscriber returned may be either newly created or it may be an existing subscriber for the given + * subscription. + * + * @param subscriptionId The subscription to create the subscriber for. + * @param receiver The message receiver used to process the received message. + * @return an existing or new subscriber. + */ + PubSubSubscriber getOrCreateSubscriber(String subscriptionId, MessageReceiver receiver); + + /** + * Gets an existing Subscriber for receiving data from Pub/Sub if one was already created with the given + * subscription and prefix. + * + * @param subscription The subscription to identify the subscriber. + * @param prefix The prefix of the subscription to identify the subscriber, e.g. the tenantId. + * @return An existing subscriber or an empty Optional if no such subscriber exists. + */ + Optional getSubscriber(String subscription, String prefix); + +} diff --git a/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSenderTest.java b/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSenderTest.java index 9f102b49ea..56cbe50608 100644 --- a/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSenderTest.java +++ b/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/AbstractPubSubBasedMessageSenderTest.java @@ -25,6 +25,8 @@ import org.eclipse.hono.client.ClientErrorException; import org.eclipse.hono.client.ServerErrorException; +import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherClient; +import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; diff --git a/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/PubSubMessageHelperTest.java b/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/PubSubMessageHelperTest.java new file mode 100644 index 0000000000..60fdc00e71 --- /dev/null +++ b/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/PubSubMessageHelperTest.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * https://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hono.client.pubsub; + +import static com.google.common.truth.Truth.assertThat; + +import org.junit.jupiter.api.Test; + +/** + * Verifies the generic behavior of {@link PubSubMessageHelper}. + */ +public class PubSubMessageHelperTest { + + /** + * Verifies that the getTopicName method returns the formatted topic. + */ + @Test + public void testThatGetTopicNameReturnsFormattedString() { + final String topic = "event"; + final String prefix = "testTenant"; + + final String result = PubSubMessageHelper.getTopicName(topic, prefix); + assertThat(result).isEqualTo("testTenant.event"); + } + +} diff --git a/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/CachingPubSubPublisherFactoryTest.java b/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactoryTest.java similarity index 93% rename from clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/CachingPubSubPublisherFactoryTest.java rename to clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactoryTest.java index 3fcaf73e81..c9dad93b81 100644 --- a/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/CachingPubSubPublisherFactoryTest.java +++ b/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/publisher/CachingPubSubPublisherFactoryTest.java @@ -10,7 +10,7 @@ * * SPDX-License-Identifier: EPL-2.0 */ -package org.eclipse.hono.client.pubsub; +package org.eclipse.hono.client.pubsub.publisher; import static org.mockito.Mockito.mock; @@ -23,6 +23,8 @@ import io.vertx.core.Vertx; +import com.google.api.gax.core.FixedCredentialsProvider; + /** * Verifies behavior of {@link CachingPubSubPublisherFactory}. */ @@ -42,7 +44,8 @@ public class CachingPubSubPublisherFactoryTest { void setUp() { vertx = mock(Vertx.class); client = mock(PubSubPublisherClient.class); - factory = new CachingPubSubPublisherFactory(vertx, PROJECT_ID, null); + final FixedCredentialsProvider credentialsProvider = mock(FixedCredentialsProvider.class); + factory = new CachingPubSubPublisherFactory(vertx, PROJECT_ID, credentialsProvider); factory.setClientSupplier(() -> client); } diff --git a/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactoryTest.java b/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactoryTest.java new file mode 100644 index 0000000000..f197fad1f8 --- /dev/null +++ b/clients/pubsub-common/src/test/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactoryTest.java @@ -0,0 +1,94 @@ +/** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hono.client.pubsub.subscriber; + +import static org.mockito.Mockito.mock; + +import static com.google.common.truth.Truth.assertThat; + +import java.util.Optional; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.cloud.pubsub.v1.MessageReceiver; + +/** + * Verifies behavior of {@link CachingPubSubSubscriberFactory}. + */ +public class CachingPubSubSubscriberFactoryTest { + + private static final String PROJECT_ID = "test-project"; + + private static final String TENANT_ID = "test-tenant"; + + private static final String TOPIC_NAME = "command"; + + private CachingPubSubSubscriberFactory factory; + private PubSubSubscriber client; + private String topic; + private MessageReceiver receiver; + + @BeforeEach + void setUp() { + final FixedCredentialsProvider credentialsProvider = mock(FixedCredentialsProvider.class); + topic = String.format("%s.%s", TENANT_ID, TOPIC_NAME); + receiver = mock(MessageReceiver.class); + client = mock(PubSubSubscriber.class); + factory = new CachingPubSubSubscriberFactory(PROJECT_ID, credentialsProvider); + factory.setClientSupplier(() -> client); + } + + /** + * Verifies that the factory creates a subscriber and adds it to the cache. + */ + @Test + public void testThatSubscriberIsAddedToCache() { + assertThat(factory.getSubscriber(TOPIC_NAME, TENANT_ID).isEmpty()).isTrue(); + final PubSubSubscriber createdSubscriber = factory.getOrCreateSubscriber(topic, receiver); + final Optional actual = factory.getSubscriber(TOPIC_NAME, TENANT_ID); + assertThat(actual.isPresent()).isTrue(); + assertThat(actual.get()).isEqualTo(createdSubscriber); + } + + /** + * Verifies that the factory removes a subscriber from the cache when it gets closed. + */ + @Test + public void testCloseSubscriberClosesAndRemovesFromCache() { + assertThat(factory.getSubscriber(TOPIC_NAME, TENANT_ID).isEmpty()).isTrue(); + + final PubSubSubscriber createdSubscriber = factory.getOrCreateSubscriber(topic, receiver); + assertThat(createdSubscriber).isNotNull(); + assertThat(factory.getSubscriber(TOPIC_NAME, TENANT_ID).isPresent()).isTrue(); + + factory.closeSubscriber(TOPIC_NAME, TENANT_ID); + assertThat(factory.getSubscriber(TOPIC_NAME, TENANT_ID).isEmpty()).isTrue(); + } + + /** + * Verifies that the factory closes all active subscribers and removes them from the cache. + */ + @Test + public void testCloseAllSubscriberClosesAndRemovesFromCache() { + assertThat(factory.getSubscriber(TOPIC_NAME, TENANT_ID).isEmpty()).isTrue(); + + final PubSubSubscriber createdSubscriber = factory.getOrCreateSubscriber(topic, receiver); + assertThat(createdSubscriber).isNotNull(); + assertThat(factory.getSubscriber(TOPIC_NAME, TENANT_ID).isPresent()).isTrue(); + + factory.closeAllSubscriber(); + assertThat(factory.getSubscriber(TOPIC_NAME, TENANT_ID).isEmpty()).isTrue(); + } +} diff --git a/clients/telemetry-pubsub/src/main/java/org/eclipse/hono/client/telemetry/pubsub/PubSubBasedDownstreamSender.java b/clients/telemetry-pubsub/src/main/java/org/eclipse/hono/client/telemetry/pubsub/PubSubBasedDownstreamSender.java index 68323bd2d4..f577c0fba0 100644 --- a/clients/telemetry-pubsub/src/main/java/org/eclipse/hono/client/telemetry/pubsub/PubSubBasedDownstreamSender.java +++ b/clients/telemetry-pubsub/src/main/java/org/eclipse/hono/client/telemetry/pubsub/PubSubBasedDownstreamSender.java @@ -18,7 +18,7 @@ import java.util.Optional; import org.eclipse.hono.client.pubsub.AbstractPubSubBasedMessageSender; -import org.eclipse.hono.client.pubsub.PubSubPublisherFactory; +import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory; import org.eclipse.hono.client.telemetry.EventSender; import org.eclipse.hono.client.telemetry.TelemetrySender; import org.eclipse.hono.client.util.DownstreamMessageProperties; diff --git a/clients/telemetry-pubsub/src/test/java/org/eclipse/hono/client/telemetry/pubsub/PubSubBasedDownstreamSenderTest.java b/clients/telemetry-pubsub/src/test/java/org/eclipse/hono/client/telemetry/pubsub/PubSubBasedDownstreamSenderTest.java index db1b4655ef..3831063210 100644 --- a/clients/telemetry-pubsub/src/test/java/org/eclipse/hono/client/telemetry/pubsub/PubSubBasedDownstreamSenderTest.java +++ b/clients/telemetry-pubsub/src/test/java/org/eclipse/hono/client/telemetry/pubsub/PubSubBasedDownstreamSenderTest.java @@ -24,8 +24,8 @@ import java.util.Map; import org.eclipse.hono.client.ServerErrorException; -import org.eclipse.hono.client.pubsub.PubSubPublisherClient; -import org.eclipse.hono.client.pubsub.PubSubPublisherFactory; +import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherClient; +import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory; import org.eclipse.hono.test.TracingMockSupport; import org.eclipse.hono.util.MessageHelper; import org.eclipse.hono.util.QoS; diff --git a/site/documentation/content/admin-guide/pubsub-config.md b/site/documentation/content/admin-guide/pubsub-config.md index cd8f064d10..2af6ff79db 100644 --- a/site/documentation/content/admin-guide/pubsub-config.md +++ b/site/documentation/content/admin-guide/pubsub-config.md @@ -15,13 +15,14 @@ To authenticate to the Google Pub/Sub API, Workload Identity is used and has to Support for Google Pub/Sub based messaging infrastructure is considered **experimental** and may change without further notice. {{% /notice %}} -## Publisher Configuration +## Publisher and Subscriber Configuration -The `org.eclipse.hono.client.pubsub.CachingPubSubPublisherFactory` factory can be used to create Pub/Sub publishers for Hono's -Pub/Sub based APIs. +The `org.eclipse.hono.client.pubsub.publisher.CachingPubSubPublisherFactory` factory can be used to create Pub/Sub +publishers for Hono's Pub/Sub based APIs. The `org.eclipse.hono.client.pubsub.subscriber.CachingPubSubSubscriberFactory` +factory can be used to create Pub/Sub subscribers for Hono's Pub/Sub based APIs. Please refer to the [Quarkus Google Cloud Services extension](https://quarkiverse.github.io/quarkiverse-docs/quarkus-google-cloud-services/main/index.html) -documentation for details regarding configuration of the PubSub client. +documentation for details regarding configuration of the Pub/Sub client. ## Configuring Tenants to use Pub/Sub based Messaging