diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ee0e5b3ebc785..b811c1a52e668 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -47,14 +47,8 @@ jobs: shell: bash run: | docker compose -f docker-compose.yml down - docker compose -f docker-compose.yml up -d --wait - - - name: Run migrations - shell: bash - run: | - cargo install sqlx-cli --no-default-features --features native-tls,postgres - DATABASE_URL=postgres://posthog:posthog@localhost:15432/test_database sqlx database create - DATABASE_URL=postgres://posthog:posthog@localhost:15432/test_database sqlx migrate run + docker compose -f docker-compose.yml up db echo_server -d --wait + docker compose -f docker-compose.yml up setup_test_db - uses: actions/cache@v3 with: diff --git a/Cargo.lock b/Cargo.lock index 451d42483e301..b24af98ae2169 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -51,6 +51,150 @@ dependencies = [ "libc", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c" +dependencies = [ + "concurrent-queue", + "event-listener 4.0.0", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17ae5ebefcc48e7452b4987947920dac9450be1110cadf34d1b8c116bdbaf97c" +dependencies = [ + "async-lock 3.2.0", + "async-task", + "concurrent-queue", + "fastrand 2.0.1", + "futures-lite 2.1.0", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b4353121d5644cdf2beb5726ab752e79a8db1ebb52031770ec47db31d245526" +dependencies = [ + "async-channel 2.1.1", + "async-executor", + "async-io 2.2.1", + "async-lock 3.2.0", + "blocking", + "futures-lite 2.1.0", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock 2.8.0", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite 1.13.0", + "log", + "parking", + "polling 2.8.0", + "rustix 0.37.27", + "slab", + "socket2 0.4.10", + "waker-fn", +] + +[[package]] +name = "async-io" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6d3b15875ba253d1110c740755e246537483f152fa334f91abd7fe84c88b3ff" +dependencies = [ + "async-lock 3.2.0", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite 2.1.0", + "parking", + "polling 3.3.1", + "rustix 0.38.27", + "slab", + "tracing", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener 2.5.3", +] + +[[package]] +name = "async-lock" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7125e42787d53db9dd54261812ef17e937c95a51e4d291373b670342fa44310c" +dependencies = [ + "event-listener 4.0.0", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-channel 1.9.0", + "async-global-executor", + "async-io 1.13.0", + "async-lock 2.8.0", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite 1.13.0", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4eb2cdb97421e01129ccb49169d8279ed21e829929144f4a22a6e54ac549ca1" + [[package]] name = "async-trait" version = "0.1.74" @@ -71,6 +215,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atomic-write-file" version = "0.1.2" @@ -191,6 +341,22 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a37913e8dc4ddcc604f0c6d3bf2887c995153af3611de9e23c352b44c1b9118" +dependencies = [ + "async-channel 2.1.1", + "async-lock 3.2.0", + "async-task", + "fastrand 2.0.1", + "futures-io", + "futures-lite 2.1.0", + "piper", + "tracing", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -238,6 +404,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "const-oid" version = "0.9.5" @@ -364,6 +539,15 @@ dependencies = [ "serde", ] +[[package]] +name = "encoding_rs" +version = "0.8.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +dependencies = [ + "cfg-if", +] + [[package]] name = "envconfig" version = "0.10.0" @@ -417,16 +601,46 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "event-listener" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "770d968249b5d99410d61f5bf89057f3199a077a04d087092f58e7d10692baae" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +dependencies = [ + "event-listener 4.0.0", + "pin-project-lite", +] + [[package]] name = "eyre" -version = "0.6.9" +version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80f656be11ddf91bd709454d15d5bd896fbaf4cc3314e69349e4d1569f5b46cd" +checksum = "8bbb8258be8305fb0237d7b295f47bb24ff1b136a535f473baf40e70468515aa" dependencies = [ "indenter", "once_cell", ] +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -480,6 +694,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.29" @@ -524,6 +753,45 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-lite" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aeee267a1883f7ebef3700f262d2d54de95dfaf38189015a74fdc4e0c7ad8143" +dependencies = [ + "fastrand 2.0.1", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + +[[package]] +name = "futures-macro" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "futures-sink" version = "0.3.29" @@ -542,8 +810,10 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -579,6 +849,37 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "h2" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.11", + "indexmap 2.1.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.0" @@ -695,6 +996,21 @@ dependencies = [ [[package]] name = "hook-consumer" version = "0.1.0" +dependencies = [ + "async-std", + "chrono", + "envconfig", + "futures", + "hook-common", + "http 0.2.11", + "reqwest", + "serde", + "serde_derive", + "sqlx", + "thiserror", + "tokio", + "url", +] [[package]] name = "hook-producer" @@ -788,6 +1104,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", + "h2 0.3.22", "http 0.2.11", "http-body 0.4.5", "httparse", @@ -810,7 +1127,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2", + "h2 0.4.0", "http 1.0.0", "http-body 1.0.0", "httparse", @@ -820,6 +1137,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.27", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-util" version = "0.1.1" @@ -899,6 +1229,26 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -907,9 +1257,9 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "itertools" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" dependencies = [ "either", ] @@ -929,6 +1279,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -961,6 +1320,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.12" @@ -982,6 +1347,9 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "value-bag", +] [[package]] name = "mach2" @@ -1101,9 +1469,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "wasi", @@ -1228,15 +1596,15 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl" -version = "0.10.60" +version = "0.10.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800" +checksum = "6b8419dc8cc6d866deb801274bba2e6f8f6108c1bb7fcc10ee5ab864931dbb45" dependencies = [ "bitflags 2.4.1", "cfg-if", @@ -1266,9 +1634,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.96" +version = "0.9.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f" +checksum = "c3eaad34cdd97d81de97964fc7f29e2d104f483840d906ef56daa1912338460b" dependencies = [ "cc", "libc", @@ -1282,6 +1650,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1358,6 +1732,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.1", + "futures-io", +] + [[package]] name = "pkcs1" version = "0.7.5" @@ -1385,11 +1770,41 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + +[[package]] +name = "polling" +version = "3.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf63fa624ab313c11656b4cda960bfc46c410187ad493c41f6ba2d8c1e991c9e" +dependencies = [ + "cfg-if", + "concurrent-queue", + "pin-project-lite", + "rustix 0.38.27", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "portable-atomic" -version = "1.5.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bccab0e7fd7cc19f820a1c8c91720af652d0c88dc9664dd72aef2614f04af3b" +checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" [[package]] name = "ppv-lite86" @@ -1479,6 +1894,44 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "reqwest" +version = "0.11.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.22", + "http 0.2.11", + "http-body 0.4.5", + "hyper 0.14.27", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "rsa" version = "0.9.6" @@ -1507,14 +1960,28 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.26" +version = "0.37.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9470c4bf8246c8daf25f9598dca807fb6510347b1e1cfa55749113850c79d88a" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + +[[package]] +name = "rustix" +version = "0.38.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfeae074e687625746172d639330f1de242a178bf3189b51e35a7a21573513ac" dependencies = [ "bitflags 2.4.1", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.12", "windows-sys 0.52.0", ] @@ -1739,9 +2206,9 @@ dependencies = [ [[package]] name = "sqlformat" -version = "0.2.2" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b7b278788e7be4d0d29c0f39497a0eef3fba6bbc8e70d8bf7fde46edeaa9e85" +checksum = "ce81b7bd7c4493975347ef60d8c7e8b742d4694f4c49f93e0a12ea263938176c" dependencies = [ "itertools", "nom", @@ -1776,7 +2243,7 @@ dependencies = [ "crossbeam-queue", "dotenvy", "either", - "event-listener", + "event-listener 2.5.3", "futures-channel", "futures-core", "futures-intrusive", @@ -1999,6 +2466,27 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tempfile" version = "3.8.1" @@ -2006,9 +2494,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.0.1", "redox_syscall", - "rustix", + "rustix 0.38.27", "windows-sys 0.48.0", ] @@ -2087,6 +2575,16 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.14" @@ -2200,9 +2698,9 @@ dependencies = [ [[package]] name = "try-lock" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "typenum" @@ -2212,9 +2710,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "unicode-bidi" -version = "0.3.13" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" +checksum = "6f2528f27a9eb2b21e69c95319b30bd0efd85d09c379741b0f78ea1d86be2416" [[package]] name = "unicode-ident" @@ -2272,6 +2770,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a72e1902dde2bd6441347de2b70b7f5d59bf157c6c62f0c44572607a1d55bbe" + [[package]] name = "vcpkg" version = "0.2.15" @@ -2284,6 +2788,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "want" version = "0.3.1" @@ -2324,6 +2834,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac36a15a220124ac510204aec1c3e5db8a22ab06fd6706d881dc6149f8ed9a12" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.89" @@ -2532,20 +3054,30 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "zerocopy" -version = "0.7.28" +version = "0.7.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d6f15f7ade05d2a4935e34a457b936c23dc70a05cc1d97133dc99e7a3fe0f0e" +checksum = "5d075cf85bbb114e933343e087b92f2146bac0d55b534cbb8188becf0039948e" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.28" +version = "0.7.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbbad221e3f78500350ecbd7dfa4e63ef945c05f4c61cb7f4d3f84cd0bba649b" +checksum = "86cd5ca076997b97ef09d3ad65efe811fa68c9e874cb636ccb211223a813b0c2" dependencies = [ "proc-macro2", "quote", diff --git a/Dockerfile.sqlx b/Dockerfile.sqlx new file mode 100644 index 0000000000000..c55dfaa8a960a --- /dev/null +++ b/Dockerfile.sqlx @@ -0,0 +1,5 @@ +FROM docker.io/library/rust:1.74.0 + +RUN cargo install sqlx-cli --no-default-features --features native-tls,postgres + +WORKDIR /sqlx diff --git a/README.md b/README.md index b17e7ae6ffbe7..a3a674c28ff38 100644 --- a/README.md +++ b/README.md @@ -4,24 +4,16 @@ A reliable and performant webhook system for PostHog ## Requirements 1. [Rust](https://www.rust-lang.org/tools/install). -2. [sqlx-cli](https://crates.io/crates/sqlx-cli): To setup database and run migrations. -3. [Docker](https://docs.docker.com/engine/install/) or [podman](https://podman.io/docs/installation) (and [podman-compose](https://github.com/containers/podman-compose#installation)): To setup testing services. +2. [Docker](https://docs.docker.com/engine/install/), or [podman](https://podman.io/docs/installation) and [podman-compose](https://github.com/containers/podman-compose#installation): To setup development stack. ## Testing -1. Start a PostgreSQL instance: +1. Start development stack: ```bash docker compose -f docker-compose.yml up -d --wait ``` -2. Prepare test database: -```bash -export DATABASE_URL=postgres://posthog:posthog@localhost:15432/test_database -sqlx database create -sqlx migrate run -``` - -3. Test: +2. Test: ```bash cargo test ``` diff --git a/docker-compose.yml b/docker-compose.yml index 35b7a498d44b4..afaf48ef86d06 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,6 @@ services: db: + container_name: db image: docker.io/library/postgres:16-alpine restart: on-failure environment: @@ -13,3 +14,29 @@ services: ports: - '15432:5432' command: postgres -c max_connections=1000 -c idle_in_transaction_session_timeout=300000 + + setup_test_db: + container_name: setup-test-db + build: + context: . + dockerfile: Dockerfile.sqlx + restart: on-failure + command: > + sh -c "sqlx database create && sqlx migrate run" + depends_on: + db: + condition: service_healthy + restart: true + environment: + DATABASE_URL: postgres://posthog:posthog@db:5432/test_database + volumes: + - ./migrations:/sqlx/migrations/ + + echo_server: + image: docker.io/library/caddy:2 + container_name: echo-server + restart: on-failure + ports: + - '18081:8081' + volumes: + - ./docker/echo-server/Caddyfile:/etc/caddy/Caddyfile diff --git a/docker/echo-server/Caddyfile b/docker/echo-server/Caddyfile new file mode 100644 index 0000000000000..a13ac68a24d6b --- /dev/null +++ b/docker/echo-server/Caddyfile @@ -0,0 +1,17 @@ +{ + auto_https off +} + +:8081 + +route /echo { + respond `{http.request.body}` 200 { + close + } +} + +route /fail { + respond `{http.request.body}` 400 { + close + } +} diff --git a/hook-common/src/pgqueue.rs b/hook-common/src/pgqueue.rs index 89aab7ee45ab9..7ed46550bd8bd 100644 --- a/hook-common/src/pgqueue.rs +++ b/hook-common/src/pgqueue.rs @@ -4,9 +4,10 @@ use std::default::Default; use std::str::FromStr; +use std::time; -use chrono::{prelude::*, Duration}; -use serde::{de::DeserializeOwned, Serialize}; +use chrono; +use serde; use sqlx::postgres::{PgPool, PgPoolOptions}; use thiserror::Error; @@ -18,12 +19,18 @@ pub enum PgQueueError { ConnectionError { error: sqlx::Error }, #[error("{command} query failed with: {error}")] QueryError { command: String, error: sqlx::Error }, - #[error("transaction {command} failed with: {error}")] - TransactionError { command: String, error: sqlx::Error }, #[error("{0} is not a valid JobStatus")] ParseJobStatusError(String), - #[error("{0} Job has reached max attempts and cannot be retried further")] - MaxAttemptsReachedError(String), +} + +#[derive(Error, Debug)] +pub enum PgJobError { + #[error("retry is an invalid state for this PgJob: {error}")] + RetryInvalidError { job: T, error: String }, + #[error("{command} query failed with: {error}")] + QueryError { command: String, error: sqlx::Error }, + #[error("transaction {command} failed with: {error}")] + TransactionError { command: String, error: sqlx::Error }, } /// Enumeration of possible statuses for a Job. @@ -64,18 +71,18 @@ impl FromStr for JobStatus { pub type JobParameters = sqlx::types::Json; /// A Job to be executed by a worker dequeueing a PgQueue. -#[derive(sqlx::FromRow)] +#[derive(sqlx::FromRow, Debug)] pub struct Job { /// A unique id identifying a job. pub id: i64, /// A number corresponding to the current job attempt. pub attempt: i32, /// A datetime corresponding to when the job was attempted. - pub attempted_at: DateTime, + pub attempted_at: chrono::DateTime, /// A vector of identifiers that have attempted this job. E.g. thread ids, pod names, etc... pub attempted_by: Vec, /// A datetime corresponding to when the job was created. - pub created_at: DateTime, + pub created_at: chrono::DateTime, /// The current job's number of max attempts. pub max_attempts: i32, /// Arbitrary job parameters stored as JSON. @@ -89,28 +96,29 @@ pub struct Job { } impl Job { + /// Return true if this job attempt is greater or equal to the maximum number of possible attempts. + pub fn is_gte_max_attempts(&self) -> bool { + self.attempt >= self.max_attempts + } + /// Consume Job to retry it. /// This returns a RetryableJob that can be enqueued by PgQueue. /// /// # Arguments /// /// * `error`: Any JSON-serializable value to be stored as an error. - pub fn retry(self, error: E) -> Result, PgQueueError> { - if self.attempt >= self.max_attempts { - Err(PgQueueError::MaxAttemptsReachedError(self.target)) - } else { - Ok(RetryableJob { - id: self.id, - attempt: self.attempt, - error: sqlx::types::Json(error), - queue: self.queue, - }) + fn retry(self, error: E) -> RetryableJob { + RetryableJob { + id: self.id, + attempt: self.attempt, + error: sqlx::types::Json(error), + queue: self.queue, } } /// Consume Job to complete it. /// This returns a CompletedJob that can be marked as completed by PgQueue. - pub fn complete(self) -> CompletedJob { + fn complete(self) -> CompletedJob { CompletedJob { id: self.id, queue: self.queue, @@ -123,7 +131,7 @@ impl Job { /// # Arguments /// /// * `error`: Any JSON-serializable value to be stored as an error. - pub fn fail(self, error: E) -> FailedJob { + fn fail(self, error: E) -> FailedJob { FailedJob { id: self.id, error: sqlx::types::Json(error), @@ -133,6 +141,7 @@ impl Job { } /// A Job that can be updated in PostgreSQL. +#[derive(Debug)] pub struct PgJob { pub job: Job, pub table: String, @@ -141,11 +150,21 @@ pub struct PgJob { } impl PgJob { - pub async fn retry( + pub async fn retry( mut self, error: E, - ) -> Result, PgQueueError> { - let retryable_job = self.job.retry(error)?; + preferred_retry_interval: Option, + ) -> Result, PgJobError>> { + if self.job.is_gte_max_attempts() { + return Err(PgJobError::RetryInvalidError { + job: self, + error: "Maximum attempts reached".to_owned(), + }); + } + let retryable_job = self.job.retry(error); + let retry_interval = self + .retry_policy + .time_until_next_retry(&retryable_job, preferred_retry_interval); let base_query = format!( r#" @@ -161,7 +180,6 @@ WHERE AND queue = $1 RETURNING "{0}".* - "#, &self.table ); @@ -169,11 +187,11 @@ RETURNING sqlx::query(&base_query) .bind(&retryable_job.queue) .bind(retryable_job.id) - .bind(self.retry_policy.time_until_next_retry(&retryable_job)) + .bind(retry_interval) .bind(&retryable_job.error) .execute(&mut *self.connection) .await - .map_err(|error| PgQueueError::QueryError { + .map_err(|error| PgJobError::QueryError { command: "UPDATE".to_owned(), error, })?; @@ -181,7 +199,7 @@ RETURNING Ok(retryable_job) } - pub async fn complete(mut self) -> Result { + pub async fn complete(mut self) -> Result>> { let completed_job = self.job.complete(); let base_query = format!( @@ -190,13 +208,12 @@ UPDATE "{0}" SET finished_at = NOW(), - status = 'completed'::job_status, + status = 'completed'::job_status WHERE "{0}".id = $2 AND queue = $1 RETURNING "{0}".* - "#, &self.table ); @@ -206,7 +223,7 @@ RETURNING .bind(completed_job.id) .execute(&mut *self.connection) .await - .map_err(|error| PgQueueError::QueryError { + .map_err(|error| PgJobError::QueryError { command: "UPDATE".to_owned(), error, })?; @@ -214,10 +231,10 @@ RETURNING Ok(completed_job) } - pub async fn fail( + pub async fn fail( mut self, error: E, - ) -> Result, PgQueueError> { + ) -> Result, PgJobError>> { let failed_job = self.job.fail(error); let base_query = format!( @@ -226,7 +243,7 @@ UPDATE "{0}" SET finished_at = NOW(), - status = 'failed'::job_status, + status = 'failed'::job_status WHERE "{0}".id = $2 AND queue = $1 @@ -242,7 +259,7 @@ RETURNING .bind(failed_job.id) .execute(&mut *self.connection) .await - .map_err(|error| PgQueueError::QueryError { + .map_err(|error| PgJobError::QueryError { command: "UPDATE".to_owned(), error, })?; @@ -253,6 +270,7 @@ RETURNING /// A Job within an open PostgreSQL transaction. /// This implementation allows 'hiding' the job from any other workers running SKIP LOCKED queries. +#[derive(Debug)] pub struct PgTransactionJob<'c, J> { pub job: Job, pub table: String, @@ -261,11 +279,21 @@ pub struct PgTransactionJob<'c, J> { } impl<'c, J> PgTransactionJob<'c, J> { - pub async fn retry( + pub async fn retry( mut self, error: E, - ) -> Result, PgQueueError> { - let retryable_job = self.job.retry(error)?; + preferred_retry_interval: Option, + ) -> Result, PgJobError>> { + if self.job.is_gte_max_attempts() { + return Err(PgJobError::RetryInvalidError { + job: self, + error: "Maximum attempts reached".to_owned(), + }); + } + let retryable_job = self.job.retry(error); + let retry_interval = self + .retry_policy + .time_until_next_retry(&retryable_job, preferred_retry_interval); let base_query = format!( r#" @@ -289,11 +317,11 @@ RETURNING sqlx::query(&base_query) .bind(&retryable_job.queue) .bind(retryable_job.id) - .bind(self.retry_policy.time_until_next_retry(&retryable_job)) + .bind(retry_interval) .bind(&retryable_job.error) .execute(&mut *self.transaction) .await - .map_err(|error| PgQueueError::QueryError { + .map_err(|error| PgJobError::QueryError { command: "UPDATE".to_owned(), error, })?; @@ -301,7 +329,7 @@ RETURNING self.transaction .commit() .await - .map_err(|error| PgQueueError::TransactionError { + .map_err(|error| PgJobError::TransactionError { command: "COMMIT".to_owned(), error, })?; @@ -309,7 +337,7 @@ RETURNING Ok(retryable_job) } - pub async fn complete(mut self) -> Result { + pub async fn complete(mut self) -> Result>> { let completed_job = self.job.complete(); let base_query = format!( @@ -318,13 +346,12 @@ UPDATE "{0}" SET finished_at = NOW(), - status = 'completed'::job_status, + status = 'completed'::job_status WHERE "{0}".id = $2 AND queue = $1 RETURNING "{0}".* - "#, &self.table ); @@ -334,7 +361,7 @@ RETURNING .bind(completed_job.id) .execute(&mut *self.transaction) .await - .map_err(|error| PgQueueError::QueryError { + .map_err(|error| PgJobError::QueryError { command: "UPDATE".to_owned(), error, })?; @@ -342,7 +369,7 @@ RETURNING self.transaction .commit() .await - .map_err(|error| PgQueueError::TransactionError { + .map_err(|error| PgJobError::TransactionError { command: "COMMIT".to_owned(), error, })?; @@ -350,10 +377,10 @@ RETURNING Ok(completed_job) } - pub async fn fail( + pub async fn fail( mut self, error: E, - ) -> Result, PgQueueError> { + ) -> Result, PgJobError>> { let failed_job = self.job.fail(error); let base_query = format!( @@ -362,13 +389,12 @@ UPDATE "{0}" SET finished_at = NOW(), - status = 'failed'::job_status, + status = 'failed'::job_status WHERE "{0}".id = $2 AND queue = $1 RETURNING "{0}".* - "#, &self.table ); @@ -378,7 +404,7 @@ RETURNING .bind(failed_job.id) .execute(&mut *self.transaction) .await - .map_err(|error| PgQueueError::QueryError { + .map_err(|error| PgJobError::QueryError { command: "UPDATE".to_owned(), error, })?; @@ -386,7 +412,7 @@ RETURNING self.transaction .commit() .await - .map_err(|error| PgQueueError::TransactionError { + .map_err(|error| PgJobError::TransactionError { command: "COMMIT".to_owned(), error, })?; @@ -399,7 +425,7 @@ RETURNING /// The time until retry will depend on the PgQueue's RetryPolicy. pub struct RetryableJob { /// A unique id identifying a job. - pub id: i64, + id: i64, /// A number corresponding to the current job attempt. pub attempt: i32, /// Any JSON-serializable value to be stored as an error. @@ -411,7 +437,7 @@ pub struct RetryableJob { /// A Job that has completed to be enqueued into a PgQueue and marked as completed. pub struct CompletedJob { /// A unique id identifying a job. - pub id: i64, + id: i64, /// A unique id identifying a job queue. pub queue: String, } @@ -419,7 +445,7 @@ pub struct CompletedJob { /// A Job that has failed to be enqueued into a PgQueue and marked as failed. pub struct FailedJob { /// A unique id identifying a job. - pub id: i64, + id: i64, /// Any JSON-serializable value to be stored as an error. pub error: sqlx::types::Json, /// A unique id identifying a job queue. @@ -446,27 +472,47 @@ impl NewJob { } } -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Debug)] /// The retry policy that PgQueue will use to determine how to set scheduled_at when enqueuing a retry. pub struct RetryPolicy { /// Coefficient to multiply initial_interval with for every past attempt. - backoff_coefficient: i32, + backoff_coefficient: u32, /// The backoff interval for the first retry. - initial_interval: Duration, + initial_interval: time::Duration, /// The maximum possible backoff between retries. - maximum_interval: Option, + maximum_interval: Option, } impl RetryPolicy { + pub fn new( + backoff_coefficient: u32, + initial_interval: time::Duration, + maximum_interval: Option, + ) -> Self { + Self { + backoff_coefficient, + initial_interval, + maximum_interval, + } + } + /// Calculate the time until the next retry for a given RetryableJob. - pub fn time_until_next_retry(&self, job: &RetryableJob) -> Duration { + pub fn time_until_next_retry( + &self, + job: &RetryableJob, + preferred_retry_interval: Option, + ) -> time::Duration { let candidate_interval = self.initial_interval * self.backoff_coefficient.pow(job.attempt as u32); - if let Some(max_interval) = self.maximum_interval { - std::cmp::min(candidate_interval, max_interval) - } else { - candidate_interval + match (preferred_retry_interval, self.maximum_interval) { + (Some(duration), Some(max_interval)) => std::cmp::min( + std::cmp::max(std::cmp::min(candidate_interval, max_interval), duration), + max_interval, + ), + (Some(duration), None) => std::cmp::max(candidate_interval, duration), + (None, Some(max_interval)) => std::cmp::min(candidate_interval, max_interval), + (None, None) => candidate_interval, } } } @@ -475,7 +521,7 @@ impl Default for RetryPolicy { fn default() -> Self { Self { backoff_coefficient: 2, - initial_interval: Duration::seconds(1), + initial_interval: time::Duration::from_secs(1), maximum_interval: None, } } @@ -491,8 +537,6 @@ pub struct PgQueue { retry_policy: RetryPolicy, /// The identifier of the PostgreSQL table this queue runs on. table: String, - /// The identifier of the worker listening on this queue. - worker: String, } pub type PgQueueResult = std::result::Result; @@ -511,12 +555,10 @@ impl PgQueue { queue_name: &str, table_name: &str, url: &str, - worker_name: &str, retry_policy: RetryPolicy, ) -> PgQueueResult { let name = queue_name.to_owned(); let table = table_name.to_owned(); - let worker = worker_name.to_owned(); let pool = PgPoolOptions::new() .connect(url) .await @@ -527,13 +569,15 @@ impl PgQueue { pool, retry_policy, table, - worker, }) } /// Dequeue a Job from this PgQueue to work on it. - pub async fn dequeue( + pub async fn dequeue< + J: for<'d> serde::Deserialize<'d> + std::marker::Send + std::marker::Unpin + 'static, + >( &self, + attempted_by: &str, ) -> PgQueueResult>> { let mut connection = self .pool @@ -578,7 +622,7 @@ RETURNING let query_result: Result, sqlx::Error> = sqlx::query_as(&base_query) .bind(&self.name) - .bind(&self.worker) + .bind(attempted_by) .fetch_one(&mut *connection) .await; @@ -608,10 +652,12 @@ RETURNING /// Dequeue a Job from this PgQueue to work on it. pub async fn dequeue_tx< - J: DeserializeOwned + std::marker::Send + std::marker::Unpin + 'static, + 'a, + J: for<'d> serde::Deserialize<'d> + std::marker::Send + std::marker::Unpin + 'static, >( &self, - ) -> PgQueueResult>> { + attempted_by: &str, + ) -> PgQueueResult>> { let mut tx = self .pool .begin() @@ -655,7 +701,7 @@ RETURNING let query_result: Result, sqlx::Error> = sqlx::query_as(&base_query) .bind(&self.name) - .bind(&self.worker) + .bind(attempted_by) .fetch_one(&mut *tx) .await; @@ -678,7 +724,7 @@ RETURNING /// Enqueue a Job into this PgQueue. /// We take ownership of NewJob to enforce a specific NewJob is only enqueued once. - pub async fn enqueue( + pub async fn enqueue( &self, job: NewJob, ) -> PgQueueResult<()> { @@ -712,9 +758,8 @@ VALUES #[cfg(test)] mod tests { use super::*; - use serde::Deserialize; - #[derive(Serialize, Deserialize, PartialEq, Debug)] + #[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug)] struct JobParameters { method: String, body: String, @@ -752,7 +797,6 @@ mod tests { "test_can_dequeue_job", "job_queue", "postgres://posthog:posthog@localhost:15432/test_database", - &worker_id, RetryPolicy::default(), ) .await @@ -761,7 +805,7 @@ mod tests { queue.enqueue(new_job).await.expect("failed to enqueue job"); let pg_job: PgJob = queue - .dequeue() + .dequeue(&worker_id) .await .expect("failed to dequeue job") .expect("didn't find a job to dequeue"); @@ -782,14 +826,15 @@ mod tests { "test_dequeue_returns_none_on_no_jobs", "job_queue", "postgres://posthog:posthog@localhost:15432/test_database", - &worker_id, RetryPolicy::default(), ) .await .expect("failed to connect to local test postgresql database"); - let pg_job: Option> = - queue.dequeue().await.expect("failed to dequeue job"); + let pg_job: Option> = queue + .dequeue(&worker_id) + .await + .expect("failed to dequeue job"); assert!(pg_job.is_none()); } @@ -805,7 +850,6 @@ mod tests { "test_can_dequeue_tx_job", "job_queue", "postgres://posthog:posthog@localhost:15432/test_database", - &worker_id, RetryPolicy::default(), ) .await @@ -814,7 +858,7 @@ mod tests { queue.enqueue(new_job).await.expect("failed to enqueue job"); let tx_job: PgTransactionJob<'_, JobParameters> = queue - .dequeue_tx() + .dequeue_tx(&worker_id) .await .expect("failed to dequeue job") .expect("didn't find a job to dequeue"); @@ -835,14 +879,15 @@ mod tests { "test_dequeue_tx_returns_none_on_no_jobs", "job_queue", "postgres://posthog:posthog@localhost:15432/test_database", - &worker_id, RetryPolicy::default(), ) .await .expect("failed to connect to local test postgresql database"); - let tx_job: Option> = - queue.dequeue_tx().await.expect("failed to dequeue job"); + let tx_job: Option> = queue + .dequeue_tx(&worker_id) + .await + .expect("failed to dequeue job"); assert!(tx_job.is_none()); } @@ -855,7 +900,7 @@ mod tests { let new_job = NewJob::new(2, job_parameters, &job_target); let retry_policy = RetryPolicy { backoff_coefficient: 0, - initial_interval: Duration::seconds(0), + initial_interval: time::Duration::from_secs(0), maximum_interval: None, }; @@ -863,7 +908,6 @@ mod tests { "test_can_retry_job_with_remaining_attempts", "job_queue", "postgres://posthog:posthog@localhost:15432/test_database", - &worker_id, retry_policy, ) .await @@ -871,16 +915,16 @@ mod tests { queue.enqueue(new_job).await.expect("failed to enqueue job"); let job: PgJob = queue - .dequeue() + .dequeue(&worker_id) .await .expect("failed to dequeue job") .expect("didn't find a job to dequeue"); let _ = job - .retry("a very reasonable failure reason") + .retry("a very reasonable failure reason", None) .await .expect("failed to retry job"); let retried_job: PgJob = queue - .dequeue() + .dequeue(&worker_id) .await .expect("failed to dequeue job") .expect("didn't find retried job to dequeue"); @@ -906,7 +950,7 @@ mod tests { let new_job = NewJob::new(1, job_parameters, &job_target); let retry_policy = RetryPolicy { backoff_coefficient: 0, - initial_interval: Duration::seconds(0), + initial_interval: time::Duration::from_secs(0), maximum_interval: None, }; @@ -914,7 +958,6 @@ mod tests { "test_cannot_retry_job_without_remaining_attempts", "job_queue", "postgres://posthog:posthog@localhost:15432/test_database", - &worker_id, retry_policy, ) .await @@ -923,11 +966,11 @@ mod tests { queue.enqueue(new_job).await.expect("failed to enqueue job"); let job: PgJob = queue - .dequeue() + .dequeue(&worker_id) .await .expect("failed to dequeue job") .expect("didn't find a job to dequeue"); - job.retry("a very reasonable failure reason") + job.retry("a very reasonable failure reason", None) .await .expect("failed to retry job"); } diff --git a/hook-consumer/Cargo.toml b/hook-consumer/Cargo.toml index 49c2d9f84b17d..2e95a6b071903 100644 --- a/hook-consumer/Cargo.toml +++ b/hook-consumer/Cargo.toml @@ -6,3 +6,16 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-std = { version = "1.12" } +chrono = { version = "0.4" } +envconfig = { version = "0.10" } +futures = "0.3" +hook-common = { path = "../hook-common" } +http = { version = "0.2" } +reqwest = { version = "0.11" } +serde = { version = "1.0" } +serde_derive = { version = "1.0" } +sqlx = { version = "0.7", features = [ "runtime-tokio", "tls-native-tls", "postgres", "uuid", "json", "chrono" ] } +thiserror = { version = "1.0" } +tokio = { version = "1.34", features = ["macros", "rt", "rt-multi-thread"] } +url = { version = "2.2" } diff --git a/hook-consumer/README.md b/hook-consumer/README.md new file mode 100644 index 0000000000000..1adab6ea571f4 --- /dev/null +++ b/hook-consumer/README.md @@ -0,0 +1,2 @@ +# hook-consumer +Consume and process webhook jobs diff --git a/hook-consumer/src/config.rs b/hook-consumer/src/config.rs new file mode 100644 index 0000000000000..fde137337e8f8 --- /dev/null +++ b/hook-consumer/src/config.rs @@ -0,0 +1,56 @@ +use std::str::FromStr; +use std::time; + +use envconfig::Envconfig; + +#[derive(Envconfig, Clone)] +pub struct Config { + #[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")] + pub database_url: String, + + #[envconfig(default = "consumer")] + pub consumer_name: String, + + #[envconfig(default = "default")] + pub queue_name: String, + + #[envconfig(default = "100")] + pub poll_interval: EnvMsDuration, + + #[envconfig(default = "5000")] + pub request_timeout: EnvMsDuration, + + #[envconfig(nested = true)] + pub retry_policy: RetryPolicyConfig, + + #[envconfig(default = "job_queue")] + pub table_name: String, +} + +#[derive(Debug, Clone, Copy)] +pub struct EnvMsDuration(pub time::Duration); + +#[derive(Debug, PartialEq, Eq)] +pub struct ParseEnvMsDurationError; + +impl FromStr for EnvMsDuration { + type Err = ParseEnvMsDurationError; + + fn from_str(s: &str) -> Result { + let ms = s.parse::().map_err(|_| ParseEnvMsDurationError)?; + + Ok(EnvMsDuration(time::Duration::from_millis(ms))) + } +} + +#[derive(Envconfig, Clone)] +pub struct RetryPolicyConfig { + #[envconfig(default = "2")] + pub backoff_coefficient: u32, + + #[envconfig(default = "1000")] + pub initial_interval: EnvMsDuration, + + #[envconfig(default = "100000")] + pub maximum_interval: EnvMsDuration, +} diff --git a/hook-consumer/src/consumer.rs b/hook-consumer/src/consumer.rs new file mode 100644 index 0000000000000..cff54f68f21ef --- /dev/null +++ b/hook-consumer/src/consumer.rs @@ -0,0 +1,491 @@ +use std::collections; +use std::fmt; +use std::str::FromStr; +use std::time; + +use async_std::task; +use hook_common::pgqueue::{PgJobError, PgQueue, PgQueueError, PgTransactionJob}; +use http::StatusCode; +use serde::{de::Visitor, Deserialize, Serialize}; +use thiserror::Error; + +/// Enumeration of errors for operations with WebhookConsumer. +#[derive(Error, Debug)] +pub enum WebhookConsumerError { + #[error("timed out while waiting for jobs to be available")] + TimeoutError, + #[error("{0} is not a valid HttpMethod")] + ParseHttpMethodError(String), + #[error("error parsing webhook headers")] + ParseHeadersError(http::Error), + #[error("error parsing webhook url")] + ParseUrlError(url::ParseError), + #[error("an error occurred in the underlying queue")] + QueueError(#[from] PgQueueError), + #[error("an error occurred in the underlying job")] + PgJobError(String), + #[error("an error occurred when attempting to send a request")] + RequestError(#[from] reqwest::Error), + #[error("a webhook could not be delivered but it could be retried later: {reason}")] + RetryableWebhookError { + reason: String, + retry_after: Option, + }, + #[error("a webhook could not be delivered and it cannot be retried further: {0}")] + NonRetryableWebhookError(String), +} + +/// Supported HTTP methods for webhooks. +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum HttpMethod { + DELETE, + GET, + PATCH, + POST, + PUT, +} + +/// Allow casting `HttpMethod` from strings. +impl FromStr for HttpMethod { + type Err = WebhookConsumerError; + + fn from_str(s: &str) -> Result { + match s.to_ascii_uppercase().as_ref() { + "DELETE" => Ok(HttpMethod::DELETE), + "GET" => Ok(HttpMethod::GET), + "PATCH" => Ok(HttpMethod::PATCH), + "POST" => Ok(HttpMethod::POST), + "PUT" => Ok(HttpMethod::PUT), + invalid => Err(WebhookConsumerError::ParseHttpMethodError( + invalid.to_owned(), + )), + } + } +} + +/// Implement `std::fmt::Display` to convert HttpMethod to string. +impl fmt::Display for HttpMethod { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + HttpMethod::DELETE => write!(f, "DELETE"), + HttpMethod::GET => write!(f, "GET"), + HttpMethod::PATCH => write!(f, "PATCH"), + HttpMethod::POST => write!(f, "POST"), + HttpMethod::PUT => write!(f, "PUT"), + } + } +} + +struct HttpMethodVisitor; + +impl<'de> Visitor<'de> for HttpMethodVisitor { + type Value = HttpMethod; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "the string representation of HttpMethod") + } + + fn visit_str(self, s: &str) -> Result + where + E: serde::de::Error, + { + match HttpMethod::from_str(s) { + Ok(method) => Ok(method), + Err(_) => Err(serde::de::Error::invalid_value( + serde::de::Unexpected::Str(s), + &self, + )), + } + } +} + +/// Deserialize required to read `HttpMethod` from database. +impl<'de> Deserialize<'de> for HttpMethod { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + deserializer.deserialize_str(HttpMethodVisitor) + } +} + +/// Serialize required to write `HttpMethod` to database. +impl Serialize for HttpMethod { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +/// Convenience to cast `HttpMethod` to `http::Method`. +/// Not all `http::Method` variants are valid `HttpMethod` variants, hence why we +/// can't just use the former or implement `From`. +impl From for http::Method { + fn from(val: HttpMethod) -> Self { + match val { + HttpMethod::DELETE => http::Method::DELETE, + HttpMethod::GET => http::Method::GET, + HttpMethod::PATCH => http::Method::PATCH, + HttpMethod::POST => http::Method::POST, + HttpMethod::PUT => http::Method::PUT, + } + } +} + +impl From<&HttpMethod> for http::Method { + fn from(val: &HttpMethod) -> Self { + match val { + HttpMethod::DELETE => http::Method::DELETE, + HttpMethod::GET => http::Method::GET, + HttpMethod::PATCH => http::Method::PATCH, + HttpMethod::POST => http::Method::POST, + HttpMethod::PUT => http::Method::PUT, + } + } +} + +/// `JobParameters` required for the `WebhookConsumer` to execute a webhook. +/// These parameters should match the exported Webhook interface that PostHog plugins. +/// implement. See: https://github.com/PostHog/plugin-scaffold/blob/main/src/types.ts#L15. +#[derive(Deserialize, Serialize, Debug, PartialEq, Clone)] +pub struct WebhookJobParameters { + body: String, + headers: collections::HashMap, + method: HttpMethod, + url: String, +} + +/// A consumer to poll `PgQueue` and spawn tasks to process webhooks when a job becomes available. +pub struct WebhookConsumer<'p> { + /// An identifier for this consumer. Used to mark jobs we have consumed. + name: String, + /// The queue we will be dequeuing jobs from. + queue: &'p PgQueue, + /// The interval for polling the queue. + poll_interval: time::Duration, + /// A timeout for webhook requests. + request_timeout: time::Duration, +} + +impl<'p> WebhookConsumer<'p> { + pub fn new( + name: &str, + queue: &'p PgQueue, + poll_interval: time::Duration, + request_timeout: time::Duration, + ) -> Self { + Self { + name: name.to_owned(), + queue, + poll_interval, + request_timeout, + } + } + + /// Wait until a job becomes available in our queue. + async fn wait_for_job<'a>( + &self, + ) -> Result, WebhookConsumerError> { + loop { + if let Some(job) = self.queue.dequeue_tx(&self.name).await? { + return Ok(job); + } else { + task::sleep(self.poll_interval).await; + } + } + } + + /// Run this consumer to continuously process any jobs that become available. + pub async fn run(&self) -> Result<(), WebhookConsumerError> { + loop { + // TODO: The number of jobs processed will be capped by the PG connection limit when running in transactional mode. + let webhook_job = self.wait_for_job().await?; + + let request_timeout = self.request_timeout; // Required to avoid capturing self in closure. + tokio::spawn(async move { process_webhook_job(webhook_job, request_timeout).await }); + } + } +} + +/// Process a webhook job by transitioning it to its appropriate state after its request is sent. +/// After we finish, the webhook job will be set as completed (if the request was successful), retryable (if the request +/// was unsuccessful but we can still attempt a retry), or failed (if the request was unsuccessful and no more retries +/// may be attempted). +/// +/// A webhook job is considered retryable after a failing request if: +/// 1. The job has attempts remaining (i.e. hasn't reached `max_attempts`), and... +/// 2. The status code indicates retrying at a later point could resolve the issue. This means: 429 and any 5XX. +/// +/// # Arguments +/// +/// * `webhook_job`: The webhook job to process as dequeued from `hook_common::pgqueue::PgQueue`. +/// * `request_timeout`: A timeout for the HTTP request. +async fn process_webhook_job( + webhook_job: PgTransactionJob<'_, WebhookJobParameters>, + request_timeout: std::time::Duration, +) -> Result<(), WebhookConsumerError> { + match send_webhook( + &webhook_job.job.parameters.method, + &webhook_job.job.parameters.url, + &webhook_job.job.parameters.headers, + webhook_job.job.parameters.body.clone(), + request_timeout, + ) + .await + { + Ok(_) => { + webhook_job + .complete() + .await + .map_err(|error| WebhookConsumerError::PgJobError(error.to_string()))?; + Ok(()) + } + Err(WebhookConsumerError::RetryableWebhookError { + reason, + retry_after, + }) => match webhook_job.retry(reason.to_string(), retry_after).await { + Ok(_) => Ok(()), + Err(PgJobError::RetryInvalidError { + job: webhook_job, + error: fail_error, + }) => { + webhook_job + .fail(fail_error.to_string()) + .await + .map_err(|job_error| WebhookConsumerError::PgJobError(job_error.to_string()))?; + Ok(()) + } + Err(job_error) => Err(WebhookConsumerError::PgJobError(job_error.to_string())), + }, + Err(error) => { + webhook_job + .fail(error.to_string()) + .await + .map_err(|job_error| WebhookConsumerError::PgJobError(job_error.to_string()))?; + Ok(()) + } + } +} + +/// Make an HTTP request to a webhook endpoint. +/// +/// # Arguments +/// +/// * `method`: The HTTP method to use in the HTTP request. +/// * `url`: The URL we are targetting with our request. Parsing this URL fail. +/// * `headers`: Key, value pairs of HTTP headers in a `std::collections::HashMap`. Can fail if headers are not valid. +/// * `body`: The body of the request. Ownership is required. +/// * `timeout`: A timeout for the HTTP request. +async fn send_webhook( + method: &HttpMethod, + url: &str, + headers: &collections::HashMap, + body: String, + timeout: std::time::Duration, +) -> Result { + let client = reqwest::Client::new(); + let method: http::Method = method.into(); + let url: reqwest::Url = (url).parse().map_err(WebhookConsumerError::ParseUrlError)?; + let headers: reqwest::header::HeaderMap = (headers) + .try_into() + .map_err(WebhookConsumerError::ParseHeadersError)?; + + let body = reqwest::Body::from(body); + let response = client + .request(method, url) + .headers(headers) + .timeout(timeout) + .body(body) + .send() + .await?; + + let status = response.status(); + + if status.is_success() { + Ok(response) + } else if is_retryable_status(status) { + let retry_after = parse_retry_after_header(response.headers()); + + Err(WebhookConsumerError::RetryableWebhookError { + reason: format!("retryable status code {}", status), + retry_after, + }) + } else { + Err(WebhookConsumerError::NonRetryableWebhookError(format!( + "non-retryable status code {}", + status + ))) + } +} + +fn is_retryable_status(status: StatusCode) -> bool { + status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error() +} + +/// Attempt to parse a chrono::Duration from a Retry-After header, returning None if not possible. +/// Retry-After header can specify a date in RFC2822 or a number of seconds; we try to parse both. +/// If a Retry-After header is not present in the provided `header_map`, `None` is returned. +/// +/// # Arguments +/// +/// * `header_map`: A `&reqwest::HeaderMap` of response headers that could contain Retry-After. +fn parse_retry_after_header(header_map: &reqwest::header::HeaderMap) -> Option { + let retry_after_header = header_map.get(reqwest::header::RETRY_AFTER); + + let retry_after = match retry_after_header { + Some(header_value) => match header_value.to_str() { + Ok(s) => s, + Err(_) => { + return None; + } + }, + None => { + return None; + } + }; + + if let Ok(u) = retry_after.parse::() { + let duration = time::Duration::from_secs(u); + return Some(duration); + } + + if let Ok(dt) = chrono::DateTime::parse_from_rfc2822(retry_after) { + let duration = + chrono::DateTime::::from(dt) - chrono::offset::Utc::now(); + + // This can only fail when negative, in which case we return None. + return duration.to_std().ok(); + } + + None +} + +mod tests { + use super::*; + // Note we are ignoring some warnings in this module. + // This is due to a long-standing cargo bug that reports imports and helper functions as unused. + // See: https://github.com/rust-lang/rust/issues/46379. + #[allow(unused_imports)] + use hook_common::pgqueue::{JobStatus, NewJob, RetryPolicy}; + + /// Use process id as a worker id for tests. + #[allow(dead_code)] + fn worker_id() -> String { + std::process::id().to_string() + } + + #[allow(dead_code)] + async fn enqueue_job( + queue: &PgQueue, + max_attempts: i32, + job_parameters: WebhookJobParameters, + ) -> Result<(), PgQueueError> { + let job_target = job_parameters.url.to_owned(); + let new_job = NewJob::new(max_attempts, job_parameters, &job_target); + queue.enqueue(new_job).await?; + Ok(()) + } + + #[test] + fn test_is_retryable_status() { + assert!(!is_retryable_status(http::StatusCode::FORBIDDEN)); + assert!(!is_retryable_status(http::StatusCode::BAD_REQUEST)); + assert!(is_retryable_status(http::StatusCode::TOO_MANY_REQUESTS)); + assert!(is_retryable_status(http::StatusCode::INTERNAL_SERVER_ERROR)); + } + + #[test] + fn test_parse_retry_after_header() { + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert(reqwest::header::RETRY_AFTER, "120".parse().unwrap()); + + let duration = parse_retry_after_header(&headers).unwrap(); + assert_eq!(duration, time::Duration::from_secs(120)); + + headers.remove(reqwest::header::RETRY_AFTER); + + let duration = parse_retry_after_header(&headers); + assert_eq!(duration, None); + + headers.insert( + reqwest::header::RETRY_AFTER, + "Wed, 21 Oct 2015 07:28:00 GMT".parse().unwrap(), + ); + + let duration = parse_retry_after_header(&headers); + assert_eq!(duration, None); + } + + #[tokio::test] + async fn test_wait_for_job() { + let worker_id = worker_id(); + let queue_name = "test_wait_for_job".to_string(); + let table_name = "job_queue".to_string(); + let db_url = "postgres://posthog:posthog@localhost:15432/test_database".to_string(); + let queue = PgQueue::new(&queue_name, &table_name, &db_url, RetryPolicy::default()) + .await + .expect("failed to connect to PG"); + + let webhook_job = WebhookJobParameters { + body: "a webhook job body. much wow.".to_owned(), + headers: collections::HashMap::new(), + method: HttpMethod::POST, + url: "localhost".to_owned(), + }; + // enqueue takes ownership of the job enqueued to avoid bugs that can cause duplicate jobs. + // Normally, a separate application would be enqueueing jobs for us to consume, so no ownership + // conflicts would arise. However, in this test we need to do the enqueueing ourselves. + // So, we clone the job to keep it around and assert the values returned by wait_for_job. + enqueue_job(&queue, 1, webhook_job.clone()) + .await + .expect("failed to enqueue job"); + let consumer = WebhookConsumer::new( + &worker_id, + &queue, + time::Duration::from_millis(100), + time::Duration::from_millis(5000), + ); + let consumed_job = consumer + .wait_for_job() + .await + .expect("failed to wait and read job"); + + assert_eq!(consumed_job.job.attempt, 1); + assert!(consumed_job.job.attempted_by.contains(&worker_id)); + assert_eq!(consumed_job.job.attempted_by.len(), 1); + assert_eq!(consumed_job.job.max_attempts, 1); + assert_eq!(*consumed_job.job.parameters.as_ref(), webhook_job); + assert_eq!(consumed_job.job.status, JobStatus::Running); + assert_eq!(consumed_job.job.target, webhook_job.url); + + consumed_job + .complete() + .await + .expect("job not successfully completed"); + } + + #[tokio::test] + async fn test_send_webhook() { + let method = HttpMethod::POST; + let url = "http://localhost:18081/echo"; + let headers = collections::HashMap::new(); + let body = "a very relevant request body"; + let response = send_webhook( + &method, + url, + &headers, + body.to_owned(), + time::Duration::from_millis(5000), + ) + .await + .expect("send_webhook failed"); + + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response.text().await.expect("failed to read response body"), + body.to_owned(), + ); + } +} diff --git a/hook-consumer/src/lib.rs b/hook-consumer/src/lib.rs new file mode 100644 index 0000000000000..cc746b0833b0c --- /dev/null +++ b/hook-consumer/src/lib.rs @@ -0,0 +1,2 @@ +pub mod config; +pub mod consumer; diff --git a/hook-consumer/src/main.rs b/hook-consumer/src/main.rs index e7a11a969c037..22acee1263ef8 100644 --- a/hook-consumer/src/main.rs +++ b/hook-consumer/src/main.rs @@ -1,3 +1,33 @@ -fn main() { - println!("Hello, world!"); +use envconfig::Envconfig; + +use hook_common::pgqueue::{PgQueue, RetryPolicy}; +use hook_consumer::config::Config; +use hook_consumer::consumer::WebhookConsumer; + +#[tokio::main] +async fn main() { + let config = Config::init_from_env().expect("Invalid configuration:"); + + let retry_policy = RetryPolicy::new( + config.retry_policy.backoff_coefficient, + config.retry_policy.initial_interval.0, + Some(config.retry_policy.maximum_interval.0), + ); + let queue = PgQueue::new( + &config.queue_name, + &config.table_name, + &config.database_url, + retry_policy, + ) + .await + .expect("failed to initialize queue"); + + let consumer = WebhookConsumer::new( + &config.consumer_name, + &queue, + config.poll_interval.0, + config.request_timeout.0, + ); + + let _ = consumer.run().await; }