-
Notifications
You must be signed in to change notification settings - Fork 138
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[#3471] Add Pub/Sub subscriber functionality
This PR is a prerequisite in order to use only Pub/Sub as messaging infrastructure. As part of this architecture we added a Pub/Sub based subscriber functionality. * Added new package for the new subscriber functionality. * Used Google's Subscriber to subscribe messages from Google Pub/Sub. * Moved Pub/Sub publisher functionality to publisher package. * Added unit tests. * Update documentation. Signed-off-by: michelle <[email protected]>
- Loading branch information
1 parent
7ff4b93
commit e592a38
Showing
18 changed files
with
583 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 61 additions & 0 deletions
61
clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/** | ||
* Copyright (c) 2023 Contributors to the Eclipse Foundation | ||
* | ||
* See the NOTICE file(s) distributed with this work for additional | ||
* information regarding copyright ownership. | ||
* | ||
* This program and the accompanying materials are made available under the | ||
* terms of the Eclipse Public License 2.0 which is available at | ||
* https://www.eclipse.org/legal/epl-2.0 | ||
* | ||
* SPDX-License-Identifier: EPL-2.0 | ||
*/ | ||
package org.eclipse.hono.client.pubsub; | ||
|
||
import java.io.IOException; | ||
import java.util.Optional; | ||
|
||
import com.google.api.gax.core.FixedCredentialsProvider; | ||
import com.google.auth.oauth2.GoogleCredentials; | ||
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; | ||
|
||
/** | ||
* Utility methods for working with Pub/Sub. | ||
*/ | ||
public final class PubSubMessageHelper { | ||
|
||
private PubSubMessageHelper() { | ||
} | ||
|
||
/** | ||
* Gets the provider for credentials to use for authenticating to the Pub/Sub service. | ||
* | ||
* @return An optional containing a FixedCredentialsProvider to use for authenticating to the Pub/Sub service or an | ||
* empty optional if the given GoogleCredentials is {@code null}. | ||
*/ | ||
public static Optional<FixedCredentialsProvider> getCredentialsProvider() { | ||
return Optional.ofNullable(getCredentials()) | ||
.map(FixedCredentialsProvider::create); | ||
} | ||
|
||
private static GoogleCredentials getCredentials() { | ||
try { | ||
return GoogleCredentials.getApplicationDefault() | ||
.createScoped(PublisherStubSettings.getDefaultServiceScopes()); | ||
} catch (IOException e) { | ||
return null; | ||
} | ||
} | ||
|
||
/** | ||
* Gets the topic name with the given prefix. | ||
* | ||
* @param topic The endpoint of the topic (e.g. event) | ||
* @param prefix The prefix of the Pub/Sub topic, it's either the tenant ID or the adapter instance ID | ||
* @return The topic containing the prefix identifier and the endpoint. | ||
*/ | ||
public static String getTopicName(final String topic, final String prefix) { | ||
return String.format("%s.%s", prefix, topic); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
116 changes: 116 additions & 0 deletions
116
...c/main/java/org/eclipse/hono/client/pubsub/subscriber/CachingPubSubSubscriberFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
/** | ||
* Copyright (c) 2023 Contributors to the Eclipse Foundation | ||
* | ||
* See the NOTICE file(s) distributed with this work for additional | ||
* information regarding copyright ownership. | ||
* | ||
* This program and the accompanying materials are made available under the | ||
* terms of the Eclipse Public License 2.0 which is available at | ||
* http://www.eclipse.org/legal/epl-2.0 | ||
* | ||
* SPDX-License-Identifier: EPL-2.0 | ||
*/ | ||
package org.eclipse.hono.client.pubsub.subscriber; | ||
|
||
import java.net.HttpURLConnection; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.function.Supplier; | ||
|
||
import org.eclipse.hono.client.ServerErrorException; | ||
import org.eclipse.hono.client.pubsub.PubSubMessageHelper; | ||
|
||
import com.google.api.gax.core.CredentialsProvider; | ||
import com.google.cloud.pubsub.v1.MessageReceiver; | ||
|
||
import io.vertx.core.Future; | ||
import io.vertx.core.Vertx; | ||
|
||
/** | ||
* A factory for creating PubSubSubscribers. Created subscribers are being cached. | ||
*/ | ||
public class CachingPubSubSubscriberFactory implements PubSubSubscriberFactory { | ||
|
||
private final Vertx vertx; | ||
private final Map<String, PubSubSubscriberClient> activeSubscribers = new ConcurrentHashMap<>(); | ||
private final String projectId; | ||
private final CredentialsProvider credentialsProvider; | ||
private Supplier<PubSubSubscriberClient> clientSupplier; | ||
|
||
/** | ||
* Creates a new factory for {@link PubSubSubscriberClient} instances. | ||
* | ||
* @param vertx The Vert.x instance that this factory runs on. | ||
* @param projectId The identifier of the Google Cloud Project to connect to. | ||
* @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. | ||
* @throws NullPointerException If any of the parameters is {@code null}. | ||
*/ | ||
public CachingPubSubSubscriberFactory( | ||
final Vertx vertx, | ||
final String projectId, | ||
final CredentialsProvider credentialsProvider) { | ||
this.vertx = Objects.requireNonNull(vertx); | ||
this.projectId = Objects.requireNonNull(projectId); | ||
this.credentialsProvider = Objects.requireNonNull(credentialsProvider); | ||
} | ||
|
||
/** | ||
* Sets a supplier for the subscriber(s) this factory creates. | ||
* <p> | ||
* This method is mainly intended to be used in test cases. | ||
* | ||
* @param supplier The supplier. | ||
*/ | ||
public void setClientSupplier(final Supplier<PubSubSubscriberClient> supplier) { | ||
this.clientSupplier = supplier; | ||
} | ||
|
||
@Override | ||
public Future<Void> closeSubscriber(final String subscription, final String prefix) { | ||
final String subscriptionId = PubSubMessageHelper.getTopicName(subscription, prefix); | ||
return removeSubscriber(subscriptionId); | ||
} | ||
|
||
@Override | ||
public Future<Void> closeAllSubscribers() { | ||
activeSubscribers.forEach((k, v) -> removeSubscriber(k)); | ||
if (activeSubscribers.isEmpty()) { | ||
return Future.succeededFuture(); | ||
} | ||
return Future.failedFuture( | ||
new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE, "Failed to close all subscriber")); | ||
} | ||
|
||
@Override | ||
public PubSubSubscriberClient getOrCreateSubscriber(final String subscriptionId, final MessageReceiver receiver) { | ||
return activeSubscribers.computeIfAbsent(subscriptionId, | ||
s -> createPubSubSubscriber(subscriptionId, receiver)); | ||
} | ||
|
||
@Override | ||
public Optional<PubSubSubscriberClient> getSubscriber(final String subscription, final String prefix) { | ||
final String subscriptionId = PubSubMessageHelper.getTopicName(subscription, prefix); | ||
return Optional.ofNullable(activeSubscribers.get(subscriptionId)); | ||
} | ||
|
||
private PubSubSubscriberClient createPubSubSubscriber(final String subscriptionId, | ||
final MessageReceiver receiver) { | ||
return Optional.ofNullable(clientSupplier) | ||
.map(Supplier::get) | ||
.orElseGet(() -> new PubSubSubscriberClientImpl(vertx, projectId, subscriptionId, receiver, credentialsProvider)); | ||
} | ||
|
||
private Future<Void> removeSubscriber(final String subscriptionId) { | ||
final var subscriber = activeSubscribers.remove(subscriptionId); | ||
if (subscriber != null) { | ||
try { | ||
subscriber.close(); | ||
} catch (final Exception e) { | ||
// ignore , since there is nothing we can do about it | ||
} | ||
} | ||
return Future.succeededFuture(); | ||
} | ||
} |
Oops, something went wrong.