From c9e781208fc6fd2467295808261605b789296e52 Mon Sep 17 00:00:00 2001 From: Joaquin Hoyos Date: Thu, 25 Jul 2024 17:52:11 -0300 Subject: [PATCH] add support for pubsub emulator --- Cargo.toml | 2 +- src/sinks/gcp_pubsub/run.rs | 15 ++++++++++++++- src/sinks/gcp_pubsub/setup.rs | 12 ++++++++++++ 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 632fab7a..e4a904d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,5 +78,5 @@ elasticsink = ["elasticsearch", "tokio"] fingerprint = ["murmur3"] aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"] redissink = ["redis", "tokio"] -gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web"] +gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web" ,"google-cloud-gax"] rabbitmqsink = ["lapin", "tokio"] diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs index b1b52881..4ae7c893 100644 --- a/src/sinks/gcp_pubsub/run.rs +++ b/src/sinks/gcp_pubsub/run.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc}; +use google_cloud_gax::conn::Environment; use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::{ client::{Client, ClientConfig}, @@ -48,6 +49,9 @@ pub fn writer_loop( retry_policy: &retry::Policy, ordering_key: &str, attributes: &GenericKV, + emulator: bool, + emulator_endpoint: &Option, + emulator_project_id: &Option, utils: Arc, ) -> Result<(), crate::Error> { let rt = tokio::runtime::Builder::new_current_thread() @@ -56,7 +60,16 @@ pub fn writer_loop( .build()?; let publisher: Publisher = rt.block_on(async { - let client = Client::new(ClientConfig::default()).await?; + let client_config = if emulator { + ClientConfig { + project_id: Some(emulator_project_id.clone().unwrap_or_default()), + environment: Environment::Emulator(emulator_endpoint.clone().unwrap_or_default()), + ..Default::default() + } + } else { + ClientConfig::default().with_auth().await? + }; + let client = Client::new(client_config).await?; let topic = client.topic(topic_name); Result::<_, crate::Error>::Ok(topic.new_publisher(None)) })?; diff --git a/src/sinks/gcp_pubsub/setup.rs b/src/sinks/gcp_pubsub/setup.rs index 666de1d0..a1cdd064 100644 --- a/src/sinks/gcp_pubsub/setup.rs +++ b/src/sinks/gcp_pubsub/setup.rs @@ -16,6 +16,9 @@ pub struct Config { pub retry_policy: Option, pub ordering_key: Option, pub attributes: Option, + pub emulator: Option, + pub emulator_endpoint: Option, + pub emulator_project_id: Option, #[warn(deprecated)] pub credentials: Option, @@ -24,6 +27,12 @@ pub struct Config { impl SinkProvider for WithUtils { fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { let topic_name = self.inner.topic.to_owned(); + let mut use_emulator = self.inner.emulator.unwrap_or(false); + let emulator_endpoint = self.inner.emulator_endpoint.to_owned(); + let emulator_project_id = self.inner.emulator_project_id.to_owned(); + if use_emulator && (emulator_endpoint.is_none() || emulator_project_id.is_none()) { + use_emulator = false; + } let error_policy = self .inner @@ -47,6 +56,9 @@ impl SinkProvider for WithUtils { &retry_policy, &ordering_key, &attributes, + use_emulator, + &emulator_endpoint, + &emulator_project_id, utils, ) .expect("writer loop failed");