Skip to content

Commit

Permalink
eclipse-hono#3471 Add Pub/Sub subscriber functionality
Browse files Browse the repository at this point in the history
This PR is a prerequisite in order to use only Pub/Sub as messaging infrastructure. As part of this architecture we added a Pub/Sub based subscriber functionality.

Added new package for the new subscriber functionality.

Used Google's Subscriber to subscribe messages from Google Pub/Sub.

Moved Pub/Sub publisher functionality to publisher package.

Added unittests.

Update documentation.

Signed-off-by: michelle <[email protected]>
  • Loading branch information
michelleFranke committed Mar 15, 2023
1 parent e8d2754 commit 9ed9d0f
Show file tree
Hide file tree
Showing 17 changed files with 538 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@
import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties;
import org.eclipse.hono.client.pubsub.CachingPubSubPublisherFactory;
import org.eclipse.hono.client.pubsub.PubSubConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubPublisherFactory;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.PubSubPublisherOptions;
import org.eclipse.hono.client.pubsub.publisher.CachingPubSubPublisherFactory;
import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory;
import org.eclipse.hono.client.registry.CredentialsClient;
import org.eclipse.hono.client.registry.DeviceRegistrationClient;
import org.eclipse.hono.client.registry.TenantClient;
Expand Down Expand Up @@ -92,6 +93,7 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.api.gax.core.FixedCredentialsProvider;

import io.smallrye.config.ConfigMapping;
import io.vertx.core.CompositeFuture;
Expand Down Expand Up @@ -372,16 +374,19 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
protocolAdapterProperties.isJmsVendorPropsEnabled()));
}
if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured()) {
LOG.info("Pub/Sub client configuration present, adding Pub/Sub messaging clients");

final var pubSubFactory = new CachingPubSubPublisherFactory(
vertx,
pubSubConfigProperties.getProjectId(),
null);

telemetrySenderProvider
.setClient(pubSubDownstreamSender(pubSubFactory, TelemetryConstants.TELEMETRY_ENDPOINT));
eventSenderProvider.setClient(pubSubDownstreamSender(pubSubFactory, EventConstants.EVENT_ENDPOINT));
final Optional<FixedCredentialsProvider> credentialsProvider = PubSubMessageHelper.getCredentialsProvider();
if (credentialsProvider.isPresent()) {
LOG.info("Pub/Sub client configuration present, adding Pub/Sub messaging clients");

final var pubSubFactory = new CachingPubSubPublisherFactory(
vertx,
pubSubConfigProperties.getProjectId(),
credentialsProvider.get());

telemetrySenderProvider
.setClient(pubSubDownstreamSender(pubSubFactory, TelemetryConstants.TELEMETRY_ENDPOINT));
eventSenderProvider.setClient(pubSubDownstreamSender(pubSubFactory, EventConstants.EVENT_ENDPOINT));
}
}

final MessagingClientProviders messagingClientProviders = new MessagingClientProviders(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.stream.Collectors;

import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherClient;
import org.eclipse.hono.client.pubsub.publisher.PubSubPublisherFactory;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Lifecycle;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* https://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.hono.client.pubsub;

import java.io.IOException;
import java.util.Optional;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;

/**
* Utility methods for working with Pub/Sub.
*/
public final class PubSubMessageHelper {

private PubSubMessageHelper() {
}

/**
* Gets the provider for credentials to use for authenticating to the Pub/Sub service.
*
* @return An optional containing a FixedCredentialsProvider to use for authenticating to the Pub/Sub service or an
* empty optional if the given GoogleCredentials is {@code null}.
*/
public static Optional<FixedCredentialsProvider> getCredentialsProvider() {
return Optional.ofNullable(getCredentials())
.map(FixedCredentialsProvider::create);
}

private static GoogleCredentials getCredentials() {
try {
return GoogleCredentials.getApplicationDefault()
.createScoped(PublisherStubSettings.getDefaultServiceScopes());
} catch (IOException e) {
return null;
}
}

/**
* Gets the topic name with the given prefix.
*
* @param topic The endpoint of the topic (e.g. event)
* @param prefix The prefix of the Pub/Sub topic, it's either the tenant ID or the adapter instance ID
* @return The topic containing the prefix identifier and the endpoint.
*/
public static String getTopicName(final String topic, final String prefix) {
return String.format("%s.%s", prefix, topic);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.hono.client.pubsub;
package org.eclipse.hono.client.pubsub.publisher;

import java.net.HttpURLConnection;
import java.util.Map;
Expand All @@ -20,6 +20,7 @@
import java.util.function.Supplier;

import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;

import com.google.api.gax.core.CredentialsProvider;

Expand All @@ -42,17 +43,16 @@ public final class CachingPubSubPublisherFactory implements PubSubPublisherFacto
*
* @param vertx The Vert.x instance that this factory runs on.
* @param projectId The identifier of the Google Cloud Project to connect to.
* @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service
* or {@code null} if the default provider should be used.
* @throws NullPointerException if project ID is {@code null}.
* @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service.
* @throws NullPointerException if any of the parameter is {@code null}.
*/
public CachingPubSubPublisherFactory(
final Vertx vertx,
final String projectId,
final CredentialsProvider credentialsProvider) {
this.vertx = Objects.requireNonNull(vertx);
this.projectId = Objects.requireNonNull(projectId);
this.credentialsProvider = credentialsProvider;
this.credentialsProvider = Objects.requireNonNull(credentialsProvider);
}

/**
Expand All @@ -67,8 +67,8 @@ public void setClientSupplier(final Supplier<PubSubPublisherClient> supplier) {
}

@Override
public Future<Void> closePublisher(final String topic, final String tenantId) {
final String topicName = getTopicTenantName(topic, tenantId);
public Future<Void> closePublisher(final String topic, final String prefix) {
final String topicName = PubSubMessageHelper.getTopicName(topic, prefix);
return removePublisher(topicName);
}

Expand All @@ -83,15 +83,15 @@ public Future<Void> closeAllPublisher() {
}

@Override
public PubSubPublisherClient getOrCreatePublisher(final String topic, final String tenantId) {
final String topicName = getTopicTenantName(topic, tenantId);
public PubSubPublisherClient getOrCreatePublisher(final String topic, final String prefix) {
final String topicName = PubSubMessageHelper.getTopicName(topic, prefix);
return activePublishers.computeIfAbsent(topicName,
s -> getPubSubPublisherClient(projectId, topicName));
}

@Override
public Optional<PubSubPublisherClient> getPublisher(final String topic, final String tenantId) {
final String topicTenantName = getTopicTenantName(topic, tenantId);
public Optional<PubSubPublisherClient> getPublisher(final String topic, final String prefix) {
final String topicTenantName = PubSubMessageHelper.getTopicName(topic, prefix);
return Optional.ofNullable(activePublishers.get(topicTenantName));
}

Expand All @@ -101,10 +101,6 @@ private PubSubPublisherClient getPubSubPublisherClient(final String projectId, f
.orElseGet(() -> new PubSubPublisherClientImpl(vertx, projectId, topic, credentialsProvider));
}

private String getTopicTenantName(final String topic, final String tenantId) {
return String.format("%s.%s", tenantId, topic);
}

private Future<Void> removePublisher(final String topicName) {
final var publisher = activePublishers.remove(topicName);
if (publisher != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.hono.client.pubsub;
package org.eclipse.hono.client.pubsub.publisher;

import com.google.pubsub.v1.PubsubMessage;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.hono.client.pubsub;
package org.eclipse.hono.client.pubsub.publisher;

import java.io.IOException;
import java.net.HttpURLConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.hono.client.pubsub;
package org.eclipse.hono.client.pubsub.publisher;

import java.util.Optional;

Expand All @@ -27,11 +27,11 @@ public interface PubSubPublisherFactory {
* This method is expected to be invoked as soon as the publisher is no longer needed.
*
* @param topic The topic of the publisher to remove.
* @param tenantId The tenantId of the publisher to remove.
* @param prefix The prefix of the topic of the publisher to remove, e.g. the tenantId.
* @return A future that is completed when the close operation completed or a succeeded future if no publisher
* existed with the given topic.
*/
Future<Void> closePublisher(String topic, String tenantId);
Future<Void> closePublisher(String topic, String prefix);

/**
* Closes all cached publisher.
Expand All @@ -53,18 +53,18 @@ public interface PubSubPublisherFactory {
* <p>
*
* @param topic The topic to create the publisher for.
* @param tenantId The tenantId to use.
* @param prefix The prefix of the topic of the publisher to remove, e.g. the tenantId.
* @return an existing or new publisher.
*/
PubSubPublisherClient getOrCreatePublisher(String topic, String tenantId);
PubSubPublisherClient getOrCreatePublisher(String topic, String prefix);

/**
* Gets an existing Publisher for sending data to Pub/Sub if one was already created with the given topicName and
* TenantId.
* prefix.
*
* @param topic The topic to identify the publisher.
* @param tenantId The tenantId to identify the publisher.
* @param prefix The prefix of the topic to identify the publisher, e.g. the tenantId.
* @return An existing publisher or an empty Optional if no such publisher exists.
*/
Optional<PubSubPublisherClient> getPublisher(String topic, String tenantId);
Optional<PubSubPublisherClient> getPublisher(String topic, String prefix);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.hono.client.pubsub.subscriber;

import java.net.HttpURLConnection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.cloud.pubsub.v1.MessageReceiver;

import io.vertx.core.Future;

/**
* A factory for creating PubSubSubscribers. Created subscribers are being cached.
*/
public class CachingPubSubSubscriberFactory implements PubSubSubscriberFactory {

private final Map<String, PubSubSubscriber> activeSubscribers = new ConcurrentHashMap<>();
private final String projectId;
private final FixedCredentialsProvider credentialsProvider;
private Supplier<PubSubSubscriber> clientSupplier;

/**
* Creates a new factory for {@link PubSubSubscriber} instances.
*
* @param projectId The identifier of the Google Cloud Project to connect to.
* @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service.
* @throws NullPointerException If any of the parameters is {@code null}.
*/
public CachingPubSubSubscriberFactory(final String projectId, final FixedCredentialsProvider credentialsProvider) {
this.projectId = Objects.requireNonNull(projectId);
this.credentialsProvider = Objects.requireNonNull(credentialsProvider);
}

/**
* Sets a supplier for the subscriber(s) this factory creates.
* <p>
* This method is mainly intended to be used in test cases.
*
* @param supplier The supplier.
*/
public void setClientSupplier(final Supplier<PubSubSubscriber> supplier) {
this.clientSupplier = supplier;
}

@Override
public Future<Void> closeSubscriber(final String subscription, final String prefix) {
final String subscriptionId = PubSubMessageHelper.getTopicName(subscription, prefix);
return removeSubscriber(subscriptionId);
}

@Override
public Future<Void> closeAllSubscriber() {
activeSubscribers.forEach((k, v) -> removeSubscriber(k));
if (activeSubscribers.isEmpty()) {
return Future.succeededFuture();
}
return Future.failedFuture(
new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE, "Failed to close all subscriber"));
}

@Override
public PubSubSubscriber getOrCreateSubscriber(final String subscriptionId, final MessageReceiver receiver) {
return activeSubscribers.computeIfAbsent(subscriptionId,
s -> createPubSubSubscriber(subscriptionId, receiver));
}

@Override
public Optional<PubSubSubscriber> getSubscriber(final String subscription, final String prefix) {
final String subscriptionId = PubSubMessageHelper.getTopicName(subscription, prefix);
return Optional.ofNullable(activeSubscribers.get(subscriptionId));
}

private PubSubSubscriber createPubSubSubscriber(final String subscriptionId,
final MessageReceiver receiver) {
return Optional.ofNullable(clientSupplier)
.map(Supplier::get)
.orElseGet(() -> new PubSubSubscriber(projectId, subscriptionId, receiver, credentialsProvider));
}

private Future<Void> removeSubscriber(final String subscriptionId) {
final var subscriber = activeSubscribers.remove(subscriptionId);
if (subscriber != null) {
try {
subscriber.close();
} catch (final Exception e) {
// ignore , since there is nothing we can do about it
}
}
return Future.succeededFuture();
}
}
Loading

0 comments on commit 9ed9d0f

Please sign in to comment.