Skip to content

Commit

Permalink
[eclipse-hono#3442] Support Google Pub/Sub as messaging infrastructure.
Browse files Browse the repository at this point in the history
This is the second issue to qualify Hono as a replacement for Google IoT Core customers.
As part of this architecture we added a Pub/Sub based client for downstream messages.

Added new modules to support Google Pub/Sub as messaging infrastructure.

Used Google's TopicAdminClient to create topics.

Used Google's Publisher to publish messages to Google Pub/Sub.

Added unittests.

Signed-off-by: michelle <[email protected]>
  • Loading branch information
michelleFranke committed Jan 25, 2023
1 parent 78305e3 commit 21dc422
Show file tree
Hide file tree
Showing 32 changed files with 2,092 additions and 3 deletions.
4 changes: 4 additions & 0 deletions adapter-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client-telemetry-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client-telemetry-pubsub</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client-notification-amqp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties;
import org.eclipse.hono.client.pubsub.CachingPubSubPublisherFactory;
import org.eclipse.hono.client.pubsub.PubSubConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubPublisherFactory;
import org.eclipse.hono.client.pubsub.PubSubPublisherOptions;
import org.eclipse.hono.client.registry.CredentialsClient;
import org.eclipse.hono.client.registry.DeviceRegistrationClient;
import org.eclipse.hono.client.registry.TenantClient;
Expand All @@ -70,13 +74,16 @@
import org.eclipse.hono.client.telemetry.amqp.ProtonBasedDownstreamSender;
import org.eclipse.hono.client.telemetry.kafka.KafkaBasedEventSender;
import org.eclipse.hono.client.telemetry.kafka.KafkaBasedTelemetrySender;
import org.eclipse.hono.client.telemetry.pubsub.PubSubBasedDownstreamSender;
import org.eclipse.hono.client.util.MessagingClientProvider;
import org.eclipse.hono.service.NotificationSupportingServiceApplication;
import org.eclipse.hono.service.cache.Caches;
import org.eclipse.hono.util.CredentialsObject;
import org.eclipse.hono.util.CredentialsResult;
import org.eclipse.hono.util.EventConstants;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.RegistrationResult;
import org.eclipse.hono.util.TelemetryConstants;
import org.eclipse.hono.util.TenantObject;
import org.eclipse.hono.util.TenantResult;
import org.eclipse.hono.util.WrappedLifecycleComponentVerticle;
Expand Down Expand Up @@ -145,6 +152,8 @@ public abstract class AbstractProtocolAdapterApplication<C extends ProtocolAdapt
private Cache<Object, RegistrationResult> registrationResponseCache;
private Cache<Object, CredentialsResult<CredentialsObject>> credentialsResponseCache;

private PubSubConfigProperties pubSubConfigProperties;

/**
* Creates an instance of the protocol adapter.
*
Expand Down Expand Up @@ -174,6 +183,12 @@ void setDownstreamSenderOptions(
this.downstreamSenderConfig = props;
}

@Inject
void setPubSubClientOptions(
@ConfigMapping(prefix = "hono.pubsub") final PubSubPublisherOptions options) {
this.pubSubConfigProperties = new PubSubConfigProperties(options);
}

@Inject
void setTenantServiceClientConfig(
@ConfigMapping(prefix = "hono.tenant")
Expand Down Expand Up @@ -357,6 +372,15 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
messageSamplerFactory,
protocolAdapterProperties.isJmsVendorPropsEnabled()));
}
if (!appConfig.isPubSubMessagingDisabled() && pubSubConfigProperties.isProjectIdConfigured()) {
LOG.info("Pub/Sub client configuration present, adding Pub/Sub messaging clients");

final PubSubPublisherFactory pubSubFactory = CachingPubSubPublisherFactory.createFactory();

telemetrySenderProvider
.setClient(pubSubDownstreamSender(pubSubFactory, TelemetryConstants.TELEMETRY_ENDPOINT));
eventSenderProvider.setClient(pubSubDownstreamSender(pubSubFactory, EventConstants.EVENT_ENDPOINT));
}

final MessagingClientProviders messagingClientProviders = new MessagingClientProviders(
telemetrySenderProvider,
Expand Down Expand Up @@ -507,6 +531,17 @@ private ProtonBasedDownstreamSender downstreamSender() {
protocolAdapterProperties.isJmsVendorPropsEnabled());
}

/**
* Creates a new Pub/Sub downstream sender for telemetry and event messages.
*
* @return The sender.
*/
private PubSubBasedDownstreamSender pubSubDownstreamSender(final PubSubPublisherFactory pubSubFactory,
final String topic) {
return new PubSubBasedDownstreamSender(vertx, pubSubFactory, topic, pubSubConfigProperties.getProjectId(),
protocolAdapterProperties.isDefaultsEnabled(), tracer);
}

/**
* Creates a new connection to the AMQP Messaging Network's Command &amp; Control endpoint.
*
Expand Down
10 changes: 10 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,11 @@ quarkus.vertx.max-event-loop-execute-time=${max.event-loop.execute-time:20000}
<artifactId>hono-client-kafka-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client-pubsub-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client-command</artifactId>
Expand Down Expand Up @@ -384,6 +389,11 @@ quarkus.vertx.max-event-loop-execute-time=${max.event-loop.execute-time:20000}
<artifactId>hono-client-telemetry-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client-telemetry-pubsub</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-adapter-base</artifactId>
Expand Down
4 changes: 3 additions & 1 deletion clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@
<module>notification</module>
<module>notification-amqp</module>
<module>notification-kafka</module>
<module>pubsub-common</module>
<module>registry</module>
<module>registry-amqp</module>
<module>telemetry</module>
<module>telemetry-amqp</module>
<module>telemetry-kafka</module>
<module>telemetry-pubsub</module>
</modules>

<dependencies>
Expand All @@ -58,7 +60,7 @@
<!-- test -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
64 changes: 64 additions & 0 deletions clients/pubsub-common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-clients-parent</artifactId>
<version>2.3.0-SNAPSHOT</version>
</parent>
<artifactId>hono-client-pubsub-common</artifactId>

<name>Hono Client PubSub Common</name>
<description>Classes required for implementing PubSub based Hono clients</description>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.1.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye</groupId>
<artifactId>smallrye-health</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-core</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>hono-client-common</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>

<!-- Testing -->
<dependency>
<groupId>org.eclipse.hono</groupId>
<artifactId>core-test-utils</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-opentracing-shim</artifactId>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 21dc422

Please sign in to comment.