Skip to content

Commit

Permalink
Merge pull request #4 from PostHog/brett/produce
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner authored Dec 14, 2023
2 parents 7ddeb89 + eae1cb1 commit 98b9c4a
Show file tree
Hide file tree
Showing 14 changed files with 562 additions and 163 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 14 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
[workspace]
resolver = "2"

members = [
"hook-common",
"hook-producer",
"hook-consumer",
]
members = ["hook-common", "hook-producer", "hook-consumer"]

[workspace.dependencies]
chrono = { version = "0.4" }
serde = { version = "1.0" }
serde_derive = { version = "1.0" }
serde_json = { version = "1.0" }
thiserror = { version = "1.0" }
sqlx = { version = "0.7", features = [ "runtime-tokio", "tls-native-tls", "postgres", "uuid", "json", "chrono" ] }
sqlx = { version = "0.7", features = [
"runtime-tokio",
"tls-native-tls",
"postgres",
"uuid",
"json",
"chrono",
] }
tokio = { version = "1.34.0", features = ["full"] }
eyre = "0.6.9"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
envconfig = "0.10.0"
metrics = "0.21.1"
metrics-exporter-prometheus = "0.12.1"
http = { version = "0.2" }
url = { version = "2.5.0 " }
tower = "0.4.13"
http-body-util = "0.1.0"
7 changes: 4 additions & 3 deletions hook-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
chrono = { workspace = true}
chrono = { workspace = true }
http = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true}
thiserror = { workspace = true }
serde_derive = { workspace = true }
sqlx = { workspace = true }
thiserror = { workspace = true }

[dev-dependencies]
tokio = { workspace = true } # We need a runtime for async tests
1 change: 1 addition & 0 deletions hook-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod pgqueue;
pub mod webhook;
10 changes: 7 additions & 3 deletions hook-common/src/pgqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ use thiserror::Error;
/// Errors that can originate from sqlx and are wrapped by us to provide additional context.
#[derive(Error, Debug)]
pub enum PgQueueError {
#[error("pool creation failed with: {error}")]
PoolCreationError { error: sqlx::Error },
#[error("connection failed with: {error}")]
ConnectionError { error: sqlx::Error },
#[error("{command} query failed with: {error}")]
QueryError { command: String, error: sqlx::Error },
#[error("{0} is not a valid JobStatus")]
ParseJobStatusError(String),
#[error("{0} is not a valid HttpMethod")]
ParseHttpMethodError(String),
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -528,6 +532,7 @@ impl Default for RetryPolicy {
}

/// A queue implemented on top of a PostgreSQL table.
#[derive(Clone)]
pub struct PgQueue {
/// A name to identify this PgQueue as multiple may share a table.
name: String,
Expand Down Expand Up @@ -560,9 +565,8 @@ impl PgQueue {
let name = queue_name.to_owned();
let table = table_name.to_owned();
let pool = PgPoolOptions::new()
.connect(url)
.await
.map_err(|error| PgQueueError::ConnectionError { error })?;
.connect_lazy(url)
.map_err(|error| PgQueueError::PoolCreationError { error })?;

Ok(Self {
name,
Expand Down
139 changes: 139 additions & 0 deletions hook-common/src/webhook.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use std::collections;
use std::fmt;
use std::str::FromStr;

use serde::{de::Visitor, Deserialize, Serialize};

use crate::pgqueue::PgQueueError;

/// Supported HTTP methods for webhooks.
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum HttpMethod {
DELETE,
GET,
PATCH,
POST,
PUT,
}

/// Allow casting `HttpMethod` from strings.
impl FromStr for HttpMethod {
type Err = PgQueueError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_uppercase().as_ref() {
"DELETE" => Ok(HttpMethod::DELETE),
"GET" => Ok(HttpMethod::GET),
"PATCH" => Ok(HttpMethod::PATCH),
"POST" => Ok(HttpMethod::POST),
"PUT" => Ok(HttpMethod::PUT),
invalid => Err(PgQueueError::ParseHttpMethodError(invalid.to_owned())),
}
}
}

/// Implement `std::fmt::Display` to convert HttpMethod to string.
impl fmt::Display for HttpMethod {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
HttpMethod::DELETE => write!(f, "DELETE"),
HttpMethod::GET => write!(f, "GET"),
HttpMethod::PATCH => write!(f, "PATCH"),
HttpMethod::POST => write!(f, "POST"),
HttpMethod::PUT => write!(f, "PUT"),
}
}
}

struct HttpMethodVisitor;

impl<'de> Visitor<'de> for HttpMethodVisitor {
type Value = HttpMethod;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "the string representation of HttpMethod")
}

fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
match HttpMethod::from_str(s) {
Ok(method) => Ok(method),
Err(_) => Err(serde::de::Error::invalid_value(
serde::de::Unexpected::Str(s),
&self,
)),
}
}
}

/// Deserialize required to read `HttpMethod` from database.
impl<'de> Deserialize<'de> for HttpMethod {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_str(HttpMethodVisitor)
}
}

/// Serialize required to write `HttpMethod` to database.
impl Serialize for HttpMethod {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}

/// Convenience to cast `HttpMethod` to `http::Method`.
/// Not all `http::Method` variants are valid `HttpMethod` variants, hence why we
/// can't just use the former or implement `From<HttpMethod>`.
impl From<HttpMethod> for http::Method {
fn from(val: HttpMethod) -> Self {
match val {
HttpMethod::DELETE => http::Method::DELETE,
HttpMethod::GET => http::Method::GET,
HttpMethod::PATCH => http::Method::PATCH,
HttpMethod::POST => http::Method::POST,
HttpMethod::PUT => http::Method::PUT,
}
}
}

impl From<&HttpMethod> for http::Method {
fn from(val: &HttpMethod) -> Self {
match val {
HttpMethod::DELETE => http::Method::DELETE,
HttpMethod::GET => http::Method::GET,
HttpMethod::PATCH => http::Method::PATCH,
HttpMethod::POST => http::Method::POST,
HttpMethod::PUT => http::Method::PUT,
}
}
}

/// `JobParameters` required for the `WebhookConsumer` to execute a webhook.
/// These parameters should match the exported Webhook interface that PostHog plugins.
/// implement. See: https://github.com/PostHog/plugin-scaffold/blob/main/src/types.ts#L15.
#[derive(Deserialize, Serialize, Debug, PartialEq, Clone)]
pub struct WebhookJobParameters {
pub body: String,
pub headers: collections::HashMap<String, String>,
pub method: HttpMethod,
pub url: String,

// These should be set if the Webhook is associated with a plugin `composeWebhook` invocation.
pub team_id: Option<i32>,
pub plugin_id: Option<i32>,
pub plugin_config_id: Option<i32>,

#[serde(default = "default_max_attempts")]
pub max_attempts: i32,
}

fn default_max_attempts() -> i32 {
3
}
Loading

0 comments on commit 98b9c4a

Please sign in to comment.