diff --git a/Cargo.lock b/Cargo.lock index 61ce8258..d3c1099e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "arc-swap" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" + [[package]] name = "ascii" version = "1.0.0" @@ -45,6 +51,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-recursion" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.52" @@ -431,6 +448,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base-x" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4521f3e3d031370679b3b140beb36dfe4801b09ac77e30c61941f97df3ef28b" + [[package]] name = "base58" version = "0.2.0" @@ -554,11 +577,33 @@ dependencies = [ "bitflags", "clap_lex", "indexmap", + "os_str_bytes", "strsim", "termcolor", "textwrap", ] +[[package]] +name = "cloud-pubsub" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fe98334bc3400fe5b4dc6447fbbc33fcc5bfe5d896aff2bc49f33dd9f656aa4" +dependencies = [ + "base64 0.13.0", + "bytes", + "goauth", + "hyper", + "hyper-tls", + "lazy_static", + "log 0.4.16", + "rand", + "serde", + "serde_derive", + "serde_json", + "smpl_jwt", + "tokio", +] + [[package]] name = "clap_lex" version = "0.1.1" @@ -582,6 +627,12 @@ dependencies = [ "toml", ] +[[package]] +name = "const_fn" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbdcdcb6d86f71c5e97409ad45898af11cbc995b4ee8112d59095a28d376c935" + [[package]] name = "core-foundation" version = "0.9.3" @@ -707,6 +758,12 @@ dependencies = [ "syn", ] +[[package]] +name = "discard" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" + [[package]] name = "dyn-clone" version = "1.0.4" @@ -842,6 +899,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.21" @@ -849,6 +921,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -857,6 +930,17 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +[[package]] +name = "futures-executor" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.21" @@ -892,9 +976,11 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -919,6 +1005,25 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" +[[package]] +name = "goauth" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d94101e84ede813c04773b0a43396c01b5a3a9376537dbce1125858ae090ae60" +dependencies = [ + "arc-swap", + "futures", + "log 0.4.16", + "reqwest", + "serde", + "serde_derive", + "serde_json", + "simpl", + "smpl_jwt", + "time 0.2.27", + "tokio", +] + [[package]] name = "h2" version = "0.3.11" @@ -1157,9 +1262,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.117" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e74d72e0f9b65b5b4ca49a346af3976df0f9c61d550727f349ecd559f251a26c" +checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" [[package]] name = "lock_api" @@ -1477,12 +1582,14 @@ checksum = "8e22443d1643a904602595ba1cd8f7d896afe56d26712531c5ff73a15b2fbf64" name = "oura" version = "1.3.0" dependencies = [ + "async-recursion", "aws-config", "aws-sdk-lambda", "aws-sdk-s3", "aws-sdk-sqs", "bech32", "clap", + "cloud-pubsub", "config", "crossterm 0.23.2", "elasticsearch", @@ -1708,6 +1815,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + [[package]] name = "proc-macro2" version = "1.0.36" @@ -2083,6 +2196,21 @@ dependencies = [ "syn", ] +[[package]] +name = "sha1" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1da05c97445caa12d05e848c4a4fcbbea29e748ac28f7e80e9b010392063770" +dependencies = [ + "sha1_smol", +] + +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "signal-hook" version = "0.3.13" @@ -2114,6 +2242,12 @@ dependencies = [ "libc", ] +[[package]] +name = "simpl" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a30f10c911c0355f80f1c2faa8096efc4a58cdf8590b954d5b395efa071c711" + [[package]] name = "slab" version = "0.4.5" @@ -2126,6 +2260,22 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +[[package]] +name = "smpl_jwt" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4370044f8b20f944e05c35d77edd3518e6f21fc4de77e593919f287c6a3f428a" +dependencies = [ + "base64 0.13.0", + "log 0.4.16", + "openssl", + "serde", + "serde_derive", + "serde_json", + "simpl", + "time 0.2.27", +] + [[package]] name = "snap" version = "0.2.5" @@ -2152,12 +2302,70 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "standback" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e113fb6f3de07a243d434a56ec6f186dfd51cb08448239fe7bcae73f87ff28ff" +dependencies = [ + "version_check", +] + [[package]] name = "static_assertions" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stdweb" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d022496b16281348b52d0e30ae99e01a73d737b2f45d38fed4edf79f9325a1d5" +dependencies = [ + "discard", + "rustc_version 0.2.3", + "stdweb-derive", + "stdweb-internal-macros", + "stdweb-internal-runtime", + "wasm-bindgen", +] + +[[package]] +name = "stdweb-derive" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c87a60a40fccc84bef0652345bbbbbe20a605bf5d0ce81719fc476f5c03b50ef" +dependencies = [ + "proc-macro2", + "quote", + "serde", + "serde_derive", + "syn", +] + +[[package]] +name = "stdweb-internal-macros" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58fa5ff6ad0d98d1ffa8cb115892b6e69d67799f6763e162a1c9db421dc22e11" +dependencies = [ + "base-x", + "proc-macro2", + "quote", + "serde", + "serde_derive", + "serde_json", + "sha1", + "syn", +] + +[[package]] +name = "stdweb-internal-runtime" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" + [[package]] name = "strsim" version = "0.10.0" @@ -2254,6 +2462,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "time" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4752a97f8eebd6854ff91f1c1824cd6160626ac4bd44287f7f4ea2035a02a242" +dependencies = [ + "const_fn", + "libc", + "standback", + "stdweb", + "time-macros 0.1.1", + "version_check", + "winapi", +] + [[package]] name = "time" version = "0.3.7" @@ -2263,7 +2486,17 @@ dependencies = [ "itoa", "libc", "num_threads", - "time-macros", + "time-macros 0.2.3", +] + +[[package]] +name = "time-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "957e9c6e26f12cb6d0dd7fc776bb67a706312e7299aed74c8dd5b17ebb27e2f1" +dependencies = [ + "proc-macro-hack", + "time-macros-impl", ] [[package]] @@ -2272,6 +2505,19 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25eb0ca3468fc0acc11828786797f6ef9aa1555e4a211a60d64cc8e4d1be47d6" +[[package]] +name = "time-macros-impl" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd3c141a1b43194f3f56a1411225df8646c55781d5f26db825b3d98507eb482f" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "standback", + "syn", +] + [[package]] name = "tiny_http" version = "0.10.0" diff --git a/Cargo.toml b/Cargo.toml index 63429d66..33a890ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,10 @@ tokio = { version = "1.17.0", optional = true, features = ["rt"] } # required for CI to complete successfully openssl = { version = "0.10", optional = true, features = ["vendored"] } +# features: gcp +cloud-pubsub = { version = "0.8.0", optional = true } +async-recursion = { version = "1.0.0", optional = true } + [features] default = [] logs = ["file-rotate"] @@ -70,3 +74,4 @@ kafkasink = ["kafka", "openssl"] elasticsink = ["elasticsearch", "tokio"] fingerprint = ["murmur3"] aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"] +gcp = ["cloud-pubsub", "tokio", "async-recursion"] diff --git a/book/src/sinks/README.md b/book/src/sinks/README.md index e62a734d..ea4a2ae1 100644 --- a/book/src/sinks/README.md +++ b/book/src/sinks/README.md @@ -14,5 +14,6 @@ These are the existing sinks that are included as part the main _Oura_ codebase: - [AWS SQS](aws_sqs.md): a sink that sends each event as message to an AWS SQS queue. - [AWS Lamda](aws_lambda.md): a sink that invokes an AWS Lambda function for each event. - [AWS S3](aws_s3.md): a sink that saves the CBOR content of the blocks as an AWS S3 object. +- [GCP PubSub](gcp_pubsub.md): a sink that sends each event as a message to a goolge cloud PubSub topic. New sinks are being developed, information will be added in this documentation to reflect the updated list. Contributions and feature request are welcome in our [Github Repo](https://github.com/txpipe/oura). \ No newline at end of file diff --git a/book/src/sinks/gcp_pubsub.md b/book/src/sinks/gcp_pubsub.md new file mode 100644 index 00000000..1814860f --- /dev/null +++ b/book/src/sinks/gcp_pubsub.md @@ -0,0 +1,18 @@ +# Google Cloud PubSub + +A sink that sends each event as a message to a PubSub topic. Each event is json-encoded and sent to a configurable PubSub topic. + +## Configuration + +```toml +[sink] +type = "GcpPubSub" +credentials = "oura-test-347101-ff3f7b2d69cc.json" +topic = "test" +``` + +### Section: `sink` + +- `type`: the literal value `GcpPubSub`. +- `credentials`: the path to the service account json file downloaded from the cloud console. +- `topic`: the short name of the topic to send message to. diff --git a/src/bin/oura/daemon.rs b/src/bin/oura/daemon.rs index fad3e2ef..3a1e875f 100644 --- a/src/bin/oura/daemon.rs +++ b/src/bin/oura/daemon.rs @@ -44,6 +44,9 @@ use oura::sinks::aws_lambda::Config as AwsLambdaConfig; #[cfg(feature = "aws")] use oura::sinks::aws_s3::Config as AwsS3Config; +#[cfg(feature = "gcp")] +use oura::sinks::gcp_pubsub::Config as GcpPubSubConfig; + #[cfg(feature = "fingerprint")] use oura::filters::fingerprint::Config as FingerprintConfig; @@ -117,6 +120,9 @@ enum Sink { #[cfg(feature = "aws")] AwsS3(AwsS3Config), + + #[cfg(feature = "gcp")] + GcpPubSub(GcpPubSubConfig), } fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc) -> BootstrapResult { @@ -145,6 +151,9 @@ fn bootstrap_sink(config: Sink, input: StageReceiver, utils: Arc) -> Boot #[cfg(feature = "aws")] Sink::AwsS3(c) => WithUtils::new(c, utils).bootstrap(input), + + #[cfg(feature = "gcp")] + Sink::GcpPubSub(c) => WithUtils::new(c, utils).bootstrap(input), } } diff --git a/src/sinks/common.rs b/src/sinks/common.rs new file mode 100644 index 00000000..f0c8b908 --- /dev/null +++ b/src/sinks/common.rs @@ -0,0 +1,7 @@ +use serde::Deserialize; + +#[derive(Debug, Deserialize, Clone)] +pub enum ErrorPolicy { + Continue, + Exit, +} diff --git a/src/sinks/gcp_pubsub/mod.rs b/src/sinks/gcp_pubsub/mod.rs new file mode 100644 index 00000000..0a447c1d --- /dev/null +++ b/src/sinks/gcp_pubsub/mod.rs @@ -0,0 +1,4 @@ +mod run; +mod setup; + +pub use setup::*; diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs new file mode 100644 index 00000000..a96d75f0 --- /dev/null +++ b/src/sinks/gcp_pubsub/run.rs @@ -0,0 +1,70 @@ +use std::{sync::Arc, time::Duration}; + +use async_recursion::async_recursion; +use cloud_pubsub::{error::Error, Client, Topic}; +use serde_json::json; + +use crate::{model::Event, pipelining::StageReceiver, sinks::ErrorPolicy, utils::Utils}; + +#[async_recursion] +async fn send_pubsub_msg( + client: &Topic, + event: &Event, + policy: &ErrorPolicy, + retry_quota: usize, + backoff_delay: Duration, +) -> Result<(), Error> { + let body = json!(event).to_string(); + + let result = client.publish(body).await; + + match (result, policy, retry_quota) { + (Ok(_), _, _) => { + log::info!("successful pubsub publish"); + Ok(()) + } + (Err(x), ErrorPolicy::Exit, 0) => Err(x), + (Err(x), ErrorPolicy::Continue, 0) => { + log::warn!("failed to publish to pubsub: {:?}", x); + Ok(()) + } + (Err(x), _, quota) => { + log::warn!("failed attempt to execute pubsub publish: {:?}", x); + std::thread::sleep(backoff_delay); + send_pubsub_msg(client, event, policy, quota - 1, backoff_delay).await + } + } +} + +pub fn writer_loop( + input: StageReceiver, + credentials: String, + topic_name: String, + error_policy: &ErrorPolicy, + max_retries: usize, + backoff_delay: Duration, + utils: Arc, +) -> Result<(), crate::Error> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .enable_io() + .build()?; + + let publisher = rt.block_on(Client::new(credentials))?; + let topic = publisher.topic(topic_name); + + for event in input.iter() { + // notify the pipeline where we are + utils.track_sink_progress(&event); + + rt.block_on(send_pubsub_msg( + &topic, + &event, + error_policy, + max_retries, + backoff_delay, + ))?; + } + + Ok(()) +} diff --git a/src/sinks/gcp_pubsub/setup.rs b/src/sinks/gcp_pubsub/setup.rs new file mode 100644 index 00000000..2e6f07d2 --- /dev/null +++ b/src/sinks/gcp_pubsub/setup.rs @@ -0,0 +1,59 @@ +use std::time::Duration; + +use serde::Deserialize; + +use crate::{ + pipelining::{BootstrapResult, SinkProvider, StageReceiver}, + sinks::ErrorPolicy, + utils::WithUtils, +}; + +use super::run::writer_loop; + +#[derive(Debug, Default, Deserialize)] +pub struct Config { + pub topic: String, + pub credentials: String, + pub error_policy: Option, + pub max_retries: Option, + pub backoff_delay: Option, +} + +const DEFAULT_MAX_RETRIES: usize = 20; +const DEFAULT_BACKOFF_DELAY: u64 = 5_000; + +impl SinkProvider for WithUtils { + fn bootstrap(&self, input: StageReceiver) -> BootstrapResult { + let credentials = self.inner.credentials.to_owned(); + let topic_name = self.inner.topic.to_owned(); + + let error_policy = self + .inner + .error_policy + .as_ref() + .cloned() + .unwrap_or(ErrorPolicy::Exit); + + let max_retries = self.inner.max_retries.unwrap_or(DEFAULT_MAX_RETRIES); + + let backoff_delay = + Duration::from_millis(self.inner.backoff_delay.unwrap_or(DEFAULT_BACKOFF_DELAY)); + + let utils = self.utils.clone(); + + let handle = std::thread::spawn(move || { + writer_loop( + input, + credentials, + topic_name, + &error_policy, + max_retries, + backoff_delay, + utils, + ) + .expect("writer loop failed"); + }); + + Ok(handle) + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 50a89817..41c00ee8 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -1,7 +1,11 @@ +mod common; + pub mod assert; pub mod stdout; pub mod terminal; +pub use common::*; + #[cfg(feature = "logs")] pub mod logs; @@ -22,3 +26,6 @@ pub mod aws_lambda; #[cfg(feature = "aws")] pub mod aws_s3; + +#[cfg(feature = "gcp")] +pub mod gcp_pubsub; diff --git a/src/sinks/webhook/run.rs b/src/sinks/webhook/run.rs index b771191c..20e396f4 100644 --- a/src/sinks/webhook/run.rs +++ b/src/sinks/webhook/run.rs @@ -3,9 +3,7 @@ use std::{sync::Arc, time::Duration}; use reqwest::blocking::Client; use serde::Serialize; -use crate::{model::Event, pipelining::StageReceiver, utils::Utils, Error}; - -use super::ErrorPolicy; +use crate::{model::Event, pipelining::StageReceiver, sinks::ErrorPolicy, utils::Utils, Error}; #[derive(Serialize)] struct RequestBody { diff --git a/src/sinks/webhook/setup.rs b/src/sinks/webhook/setup.rs index 72865e88..fe47f2ad 100644 --- a/src/sinks/webhook/setup.rs +++ b/src/sinks/webhook/setup.rs @@ -5,6 +5,7 @@ use serde::Deserialize; use crate::{ pipelining::{BootstrapResult, SinkProvider, StageReceiver}, + sinks::ErrorPolicy, utils::WithUtils, Error, }; @@ -13,12 +14,6 @@ use super::run::request_loop; static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); -#[derive(Debug, Deserialize, Clone)] -pub enum ErrorPolicy { - Continue, - Exit, -} - #[derive(Default, Debug, Deserialize)] pub struct Config { pub url: String,