Skip to content

Commit

Permalink
[#3442] Improve unit tests for Pub/Sub client
Browse files Browse the repository at this point in the history
Signed-off-by: Kai Hudalla <[email protected]>
  • Loading branch information
sophokles73 committed Jan 30, 2023
1 parent 5ba30bd commit 339106d
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.api.gax.core.CredentialsProvider;

import io.smallrye.config.ConfigMapping;
import io.vertx.core.CompositeFuture;
Expand Down Expand Up @@ -132,6 +133,9 @@ public abstract class AbstractProtocolAdapterApplication<C extends ProtocolAdapt
@Inject
protected KafkaClientMetricsSupport kafkaClientMetricsSupport;

@Inject
CredentialsProvider credentialsProvider;

private ClientConfigProperties commandConsumerConfig;
private ClientConfigProperties downstreamSenderConfig;
private RequestResponseClientConfigProperties tenantClientConfig;
Expand Down Expand Up @@ -184,8 +188,7 @@ void setDownstreamSenderOptions(
}

@Inject
void setPubSubClientOptions(
@ConfigMapping(prefix = "hono.pubsub") final PubSubPublisherOptions options) {
void setPubSubClientOptions(final PubSubPublisherOptions options) {
this.pubSubConfigProperties = new PubSubConfigProperties(options);
}

Expand Down Expand Up @@ -375,7 +378,7 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured()) {
LOG.info("Pub/Sub client configuration present, adding Pub/Sub messaging clients");

final PubSubPublisherFactory pubSubFactory = CachingPubSubPublisherFactory.createFactory();
final var pubSubFactory = new CachingPubSubPublisherFactory(pubSubConfigProperties.getProjectId(), credentialsProvider);

telemetrySenderProvider
.setClient(pubSubDownstreamSender(pubSubFactory, TelemetryConstants.TELEMETRY_ENDPOINT));
Expand Down
2 changes: 1 addition & 1 deletion clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
<!-- test -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,18 @@
*/
public abstract class AbstractPubSubBasedMessageSender implements MessagingClient, ServiceClient, Lifecycle {

/**
* A logger to be shared by sub-classes.
*/
protected final Logger log = LoggerFactory.getLogger(getClass());
/**
* The identifier of the Google Cloud Project that this sender is scoped to.
*/
protected final String projectId;

private final PubSubPublisherFactory publisherFactory;
private final String topic;
private final Tracer tracer;
private final String projectId;
private final LifecycleStatus lifecycleStatus = new LifecycleStatus();

/**
Expand All @@ -64,7 +71,7 @@ public abstract class AbstractPubSubBasedMessageSender implements MessagingClien
* @param tracer The OpenTracing tracer.
* @throws NullPointerException if any of the parameters are {@code null}.
*/
public AbstractPubSubBasedMessageSender(
protected AbstractPubSubBasedMessageSender(
final PubSubPublisherFactory publisherFactory,
final String topic,
final String projectId,
Expand Down Expand Up @@ -175,8 +182,11 @@ protected final Future<Void> sendAndWaitForOutcome(

final Map<String, String> pubSubAttributes = encodePropertiesAsPubSubAttributes(properties, currentSpan);
final ByteString data = ByteString.copyFrom(payload.getBytes());
final PubsubMessage pubsubMessage = PubsubMessage.newBuilder().putAllAttributes(pubSubAttributes)
.setOrderingKey(deviceId).setData(data).build();
final PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
.putAllAttributes(pubSubAttributes)
.setOrderingKey(deviceId)
.setData(data)
.build();

log.info("sending message to Pub/Sub [topic: {}, registry: {}, deviceId: {}]", topic, tenantId, deviceId);
logPubSubMessage(currentSpan, pubsubMessage, topic, tenantId);
Expand Down Expand Up @@ -214,7 +224,7 @@ protected Span startSpan(final String operationName, final String tenantId,
}

private PubSubPublisherClient getOrCreatePublisher(final String topic, final String tenantId) {
return publisherFactory.getOrCreatePublisher(topic, projectId, tenantId);
return publisherFactory.getOrCreatePublisher(topic, tenantId);
}

private void logPubSubMessageId(final Span span, final String topic, final String messageId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,51 @@

import java.net.HttpURLConnection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import org.eclipse.hono.client.ServerErrorException;

import com.google.api.gax.core.CredentialsProvider;

import io.vertx.core.Future;

/**
* A factory for creating PubSubPublisherClients. Created publishers are being cached.
*/
public final class CachingPubSubPublisherFactory implements PubSubPublisherFactory {

private final Map<String, PubSubPublisherClientImpl> activePublishers = new ConcurrentHashMap<>();
private final Map<String, PubSubPublisherClient> activePublishers = new ConcurrentHashMap<>();
private final String projectId;
private final CredentialsProvider credentialsProvider;
private Supplier<PubSubPublisherClient> clientSupplier;

/**
* Creates a new Factory that will produce {@link PubSubPublisherClientImpl#createShared(String, String)
* publishers}.
* Creates a new factory for {@link PubSubPublisherClient} instances.
*
* @return an instance of the Factory.
* @param projectId The identifier of the Google Cloud Project to connect to.
* @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service
* or {@code null} if the default provider should be used.
* @throws NullPointerException if project ID is {@code null}.
*/
public static CachingPubSubPublisherFactory createFactory() {
return new CachingPubSubPublisherFactory();
public CachingPubSubPublisherFactory(
final String projectId,
final CredentialsProvider credentialsProvider) {
this.projectId = Objects.requireNonNull(projectId);
this.credentialsProvider = credentialsProvider;
}

/**
* Sets a supplier for the publisher(s) this factory creates.
* <p>
* This method is mainly intended to be used in test cases.
*
* @param supplier The supplier.
*/
public void setClientSupplier(final Supplier<PubSubPublisherClient> supplier) {
this.clientSupplier = supplier;
}

@Override
Expand All @@ -55,8 +78,7 @@ public Future<Void> closeAllPublisher() {
}

@Override
public PubSubPublisherClient getOrCreatePublisher(final String topic, final String projectId,
final String tenantId) {
public PubSubPublisherClient getOrCreatePublisher(final String topic, final String tenantId) {
final String topicName = getTopicTenantName(topic, tenantId);
return activePublishers.computeIfAbsent(topicName,
s -> getPubSubPublisherClient(projectId, topicName));
Expand All @@ -68,20 +90,25 @@ public Optional<PubSubPublisherClient> getPublisher(final String topic, final St
return Optional.ofNullable(activePublishers.get(topicTenantName));
}

private PubSubPublisherClientImpl getPubSubPublisherClient(final String projectId, final String topic) {
return PubSubPublisherClientImpl.createShared(projectId, topic);
private PubSubPublisherClient getPubSubPublisherClient(final String projectId, final String topic) {
return Optional.ofNullable(clientSupplier)
.map(Supplier::get)
.orElseGet(() -> new PubSubPublisherClientImpl(projectId, topic, credentialsProvider));
}

private String getTopicTenantName(final String topic, final String tenantId) {
return String.format("%s.%s", tenantId, topic);
}

private Future<Void> removePublisher(final String topicName) {
final PubSubPublisherClientImpl publisher = activePublishers.remove(topicName);
if (publisher == null) {
return Future.succeededFuture();
final var publisher = activePublishers.remove(topicName);
if (publisher != null) {
try {
publisher.close();
} catch (final Exception e) {
// ignore, since there is nothing we can do about it
}
}
publisher.close();
return Future.succeededFuture();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* A client for publishing messages to Pub/Sub.
*/
public interface PubSubPublisherClient {
public interface PubSubPublisherClient extends AutoCloseable {

/**
* Publishes a message to Pub/Sub and transfer the returned ApiFuture into a Future.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.eclipse.hono.client.ClientErrorException;
Expand All @@ -23,6 +25,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
Expand All @@ -39,34 +42,39 @@
* Wraps a Pub/Sub publisher.
* </p>
*/
public final class PubSubPublisherClientImpl implements AutoCloseable, PubSubPublisherClient {
final class PubSubPublisherClientImpl implements PubSubPublisherClient {

private final Logger log = LoggerFactory.getLogger(getClass());
private Publisher publisher;

private PubSubPublisherClientImpl(final String projectId, final String topic) throws ClientErrorException {
try {
final TopicName topicName = TopicName.of(projectId, topic);
this.publisher = Publisher.newBuilder(topicName).setEnableMessageOrdering(true).build();
} catch (IOException e) {
this.publisher = null;
log.debug("Error initializing publisher client: {}", e.getMessage());
throw new ClientErrorException(HttpURLConnection.HTTP_CONFLICT, "Publisher client is null", e);
}
}

/**
* Creates a new instance of PubSubPublisherClientImpl where a Pub/Sub Publisher is initialized. The Publisher is
* based on a created TopicName, which follows the format: projects/projectId/topics/topic.
*
* @param projectId The Google project id to use.
* @param topic The topic to create the publisher for.
* @return An instance of a PubSubPublisherClientImpl.
* @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service
* or {@code null} if the default provider should be used.
* @throws ClientErrorException if the initialization of the Publisher failed.
* @throws NullPointerException if any of project ID or topic are {@code null}.
*/
public static PubSubPublisherClientImpl createShared(final String projectId, final String topic)
throws ClientErrorException {
return new PubSubPublisherClientImpl(projectId, topic);
PubSubPublisherClientImpl(
final String projectId,
final String topic,
final CredentialsProvider credentialsProvider) throws ClientErrorException {
Objects.requireNonNull(projectId);
Objects.requireNonNull(topic);
final TopicName topicName = TopicName.of(projectId, topic);
final var builder = Publisher.newBuilder(topicName)
.setEnableMessageOrdering(true);
Optional.ofNullable(credentialsProvider).ifPresent(builder::setCredentialsProvider);
try {
this.publisher = builder.build();
} catch (final IOException e) {
this.publisher = null;
log.debug("Error initializing publisher client: {}", e.getMessage());
throw new ClientErrorException(HttpURLConnection.HTTP_CONFLICT, "Publisher client is null", e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.vertx.core.Future;

/**
* A factory for creating Pub/Sub publisher.
* A factory for creating Pub/Sub publishers scoped to a Google Cloud Project.
*/
public interface PubSubPublisherFactory {

Expand Down Expand Up @@ -53,12 +53,10 @@ public interface PubSubPublisherFactory {
* <p>
*
* @param topic The topic to create the publisher for.
* @param projectId The Google project id to use.
* @param tenantId The tenantId to use.
* @return an existing or new publisher.
*/
PubSubPublisherClient getOrCreatePublisher(String topic, String projectId,
String tenantId);
PubSubPublisherClient getOrCreatePublisher(String topic, String tenantId);

/**
* Gets an existing Publisher for sending data to Pub/Sub if one was already created with the given topicName and
Expand Down
Loading

0 comments on commit 339106d

Please sign in to comment.