Skip to content

Commit

Permalink
feat: add attributes field to GCP pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
pocesar committed Dec 1, 2023
1 parent b3f2e1b commit 150e536
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
7 changes: 7 additions & 0 deletions book/src/sinks/gcp_pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@ max_retries = 30
backoff_unit = 5000
backoff_factor = 2
max_backoff = 100000

[sink.attributes]
network = "mainnet"
version = "1"
```

### Section: `sink`

- `type`: the literal value `GcpPubSub`.
- `topic`: the short name of the topic to send message to.
- `error_policy` (optional): either `Continue` or `Exit`. Default value is `Exit`.
- `ordering_key` (optional): the key to use for ordering messages. If not specified, messages will be sent in an unordered manner.
- `attributes` (optional): a map of attributes to add to each message. The key and value must be strings.
- [retry_policy](../advanced/retry_policy.md)

### GCP Authentication
Expand All @@ -29,3 +35,4 @@ The GCP authentication process relies on the following conventions:

- If the `GOOGLE_APPLICATION_CREDENTIALS` environmental variable is specified, the value will be used as the file path to retrieve the JSON file with the credentials.
- If the server is running on GCP, the credentials will be retrieved from the metadata server.
- If `PUBSUB_EMULATOR_HOST` environment variable is present, the sink will skip authentication and connect to the emulator instead of the production service.
7 changes: 6 additions & 1 deletion src/sinks/gcp_pubsub/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ use crate::{
utils::{retry, Utils},
};

pub type GenericKV = HashMap<alloc::string::String, alloc::string::String>;

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable)

cannot find type `HashMap` in this scope

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable)

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable)

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Test Suite

cannot find type `HashMap` in this scope

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Test Suite

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Test Suite

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

cannot find type `HashMap` in this scope

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-unknown-linux-gnu, ubuntu-latest, oura-x86_64-unknown-linux-gnu.ta...

cannot find type `HashMap` in this scope

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-unknown-linux-gnu, ubuntu-latest, oura-x86_64-unknown-linux-gnu.ta...

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-unknown-linux-gnu, ubuntu-latest, oura-x86_64-unknown-linux-gnu.ta...

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (arm-unknown-linux-musleabihf, ubuntu-latest, oura-arm-unknown-linux-musle...

cannot find type `HashMap` in this scope

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (arm-unknown-linux-musleabihf, ubuntu-latest, oura-arm-unknown-linux-musle...

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (arm-unknown-linux-musleabihf, ubuntu-latest, oura-arm-unknown-linux-musle...

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-unknown-linux-musl, ubuntu-latest, oura-x86_64-unknown-linux-musl....

cannot find type `HashMap` in this scope

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-unknown-linux-musl, ubuntu-latest, oura-x86_64-unknown-linux-musl....

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-unknown-linux-musl, ubuntu-latest, oura-x86_64-unknown-linux-musl....

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (aarch64-unknown-linux-musl, ubuntu-latest, oura-aarch64-unknown-linux-mus...

cannot find type `HashMap` in this scope

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (aarch64-unknown-linux-musl, ubuntu-latest, oura-aarch64-unknown-linux-mus...

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (aarch64-unknown-linux-musl, ubuntu-latest, oura-aarch64-unknown-linux-mus...

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-apple-darwin, macOS-latest, oura-x86_64-apple-darwin.tar.gz)

cannot find type `HashMap` in this scope

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-apple-darwin, macOS-latest, oura-x86_64-apple-darwin.tar.gz)

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-apple-darwin, macOS-latest, oura-x86_64-apple-darwin.tar.gz)

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (aarch64-apple-darwin, macOS-latest, oura-aarch64-apple-darwin.tar.gz)

cannot find type `HashMap` in this scope

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (aarch64-apple-darwin, macOS-latest, oura-aarch64-apple-darwin.tar.gz)

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (aarch64-apple-darwin, macOS-latest, oura-aarch64-apple-darwin.tar.gz)

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-pc-windows-msvc, windows-latest, oura-x86_64-pc-windows-msvc.zip)

cannot find type `HashMap` in this scope

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-pc-windows-msvc, windows-latest, oura-x86_64-pc-windows-msvc.zip)

failed to resolve: use of undeclared crate or module `alloc`

Check failure on line 18 in src/sinks/gcp_pubsub/run.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-pc-windows-msvc, windows-latest, oura-x86_64-pc-windows-msvc.zip)

failed to resolve: use of undeclared crate or module `alloc`

async fn send_pubsub_msg(
publisher: &Publisher,
event: &Event,
ordering_key: &str,
attributes: &GenericKV,
) -> Result<(), crate::Error> {
let body = json!(event).to_string();
let msg = PubsubMessage {
data: body.into(),
ordering_key: ordering_key.into(),
attributes: attributes.into(),
..Default::default()
};

Expand All @@ -43,6 +47,7 @@ pub fn writer_loop(
error_policy: &ErrorPolicy,
retry_policy: &retry::Policy,
ordering_key: &str,
attributes: &GenericKV,
utils: Arc<Utils>,
) -> Result<(), crate::Error> {
let rt = tokio::runtime::Builder::new_current_thread()
Expand All @@ -58,7 +63,7 @@ pub fn writer_loop(

for event in input.iter() {
let result = retry::retry_operation(
|| rt.block_on(send_pubsub_msg(&publisher, &event, ordering_key)),
|| rt.block_on(send_pubsub_msg(&publisher, &event, ordering_key, attributes)),
retry_policy,
);

Expand Down
7 changes: 7 additions & 0 deletions src/sinks/gcp_pubsub/setup.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::collections::HashMap;

Check warning on line 1 in src/sinks/gcp_pubsub/setup.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable)

unused import: `std::collections::HashMap`

Check warning on line 1 in src/sinks/gcp_pubsub/setup.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `std::collections::HashMap`

Check warning on line 1 in src/sinks/gcp_pubsub/setup.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

unused import: `std::collections::HashMap`

Check warning on line 1 in src/sinks/gcp_pubsub/setup.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-unknown-linux-gnu, ubuntu-latest, oura-x86_64-unknown-linux-gnu.ta...

unused import: `std::collections::HashMap`

Check warning on line 1 in src/sinks/gcp_pubsub/setup.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (arm-unknown-linux-musleabihf, ubuntu-latest, oura-arm-unknown-linux-musle...

unused import: `std::collections::HashMap`

Check warning on line 1 in src/sinks/gcp_pubsub/setup.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-unknown-linux-musl, ubuntu-latest, oura-x86_64-unknown-linux-musl....

unused import: `std::collections::HashMap`

Check warning on line 1 in src/sinks/gcp_pubsub/setup.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (aarch64-unknown-linux-musl, ubuntu-latest, oura-aarch64-unknown-linux-mus...

unused import: `std::collections::HashMap`

Check warning on line 1 in src/sinks/gcp_pubsub/setup.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-apple-darwin, macOS-latest, oura-x86_64-apple-darwin.tar.gz)

unused import: `std::collections::HashMap`

Check warning on line 1 in src/sinks/gcp_pubsub/setup.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (aarch64-apple-darwin, macOS-latest, oura-aarch64-apple-darwin.tar.gz)

unused import: `std::collections::HashMap`

Check warning on line 1 in src/sinks/gcp_pubsub/setup.rs

View workflow job for this annotation

GitHub Actions / Build release binaries (x86_64-pc-windows-msvc, windows-latest, oura-x86_64-pc-windows-msvc.zip)

unused import: `std::collections::HashMap`

use serde::Deserialize;

use crate::{
pipelining::{BootstrapResult, SinkProvider, StageReceiver},
sinks::common::web::ErrorPolicy,
sinks::gcp_pubsub::run::GenericKV,
utils::{retry, WithUtils},
};

Expand All @@ -14,6 +17,7 @@ pub struct Config {
pub error_policy: Option<ErrorPolicy>,
pub retry_policy: Option<retry::Policy>,
pub ordering_key: Option<String>,
pub attributes: Option<GenericKV>,

#[warn(deprecated)]
pub credentials: Option<String>,
Expand All @@ -39,13 +43,16 @@ impl SinkProvider for WithUtils<Config> {

let utils = self.utils.clone();

let attributes = self.inner.attributes.cloned().unwrap_or_default();

let handle = std::thread::spawn(move || {
writer_loop(
input,
&topic_name,
&error_policy,
&retry_policy,
&ordering_key,
&attributes,
utils,
)
.expect("writer loop failed");
Expand Down

0 comments on commit 150e536

Please sign in to comment.