From 416b137f5399a53564b1b383743af0549fcf477a Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Sun, 8 Jan 2023 17:39:34 +0100 Subject: [PATCH] Add BeamFnControlClient.control loop scaffolding and dispatch on InstructionRequest type --- sdks/rust/Cargo.lock | 502 +++++++++++++++++++++++++++++ sdks/rust/worker/Cargo.toml | 1 + sdks/rust/worker/src/sdk_worker.rs | 153 ++++++++- 3 files changed, 646 insertions(+), 10 deletions(-) diff --git a/sdks/rust/Cargo.lock b/sdks/rust/Cargo.lock index 9a32d62ba24a..a255adc10afa 100644 --- a/sdks/rust/Cargo.lock +++ b/sdks/rust/Cargo.lock @@ -8,6 +8,36 @@ version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" +[[package]] +name = "async-io" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c374dda1ed3e7d8f0d9ba58715f924862c63eae6849c92d3a18e7fbde9e2794" +dependencies = [ + "async-lock", + "autocfg", + "concurrent-queue", + "futures-lite", + "libc", + "log", + "parking", + "polling", + "slab", + "socket2", + "waker-fn", + "windows-sys", +] + +[[package]] +name = "async-lock" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685" +dependencies = [ + "event-listener", + "futures-lite", +] + [[package]] name = "async-stream" version = "0.2.1" @@ -124,12 +154,55 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bumpalo" +version = "3.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" + +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "bytes" version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" +[[package]] +name = "camino" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c77df041dc383319cc661b428b6961a005db4d6808d5e12536931b1ca9556055" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbdb825da8a5df079a43676dbe042702f1707b1109f713a01420fbb4cc71fa27" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "cc" version = "1.0.78" @@ -191,6 +264,47 @@ dependencies = [ "strum_macros", ] +[[package]] +name = "concurrent-queue" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01a9af1f4c2ef74bb8aa1f7e19706bc72d03598c8a570bb5de72243c7a9d9d5a" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f" +dependencies = [ + "cfg-if", +] + [[package]] name = "either" version = "1.8.0" @@ -218,6 +332,21 @@ dependencies = [ "libc", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fastrand" version = "1.8.0" @@ -287,6 +416,21 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.25" @@ -350,6 +494,12 @@ dependencies = [ "wasi 0.11.0+wasi-snapshot-preview1", ] +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.15" @@ -548,6 +698,15 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc" +[[package]] +name = "js-sys" +version = "0.3.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -566,6 +725,16 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" +[[package]] +name = "lock_api" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.17" @@ -575,6 +744,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "mach" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" +dependencies = [ + "libc", +] + [[package]] name = "matchit" version = "0.5.0" @@ -587,6 +765,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.16" @@ -605,6 +792,32 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "moka" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b49a05f67020456541f4f29cbaa812016a266a86ec76f96d3873d459c68fe5e" +dependencies = [ + "async-io", + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "futures-util", + "num_cpus", + "once_cell", + "parking_lot", + "quanta", + "rustc_version", + "scheduled-thread-pool", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "multimap" version = "0.8.3" @@ -633,6 +846,35 @@ version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ff9f3fef3968a3ec5945535ed654cb38ff72d7495a25619e2247fb15a2ed9ba" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + [[package]] name = "percent-encoding" version = "2.2.0" @@ -681,6 +923,20 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "polling" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22122d5ec4f9fe1b3916419b76be1e80bcb93f618d071d2edf841b137b2a2bd6" +dependencies = [ + "autocfg", + "cfg-if", + "libc", + "log", + "wepoll-ffi", + "windows-sys", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -795,6 +1051,33 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + +[[package]] +name = "quanta" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27" +dependencies = [ + "crossbeam-utils", + "libc", + "mach", + "once_cell", + "raw-cpuid", + "wasi 0.10.2+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.21" @@ -875,6 +1158,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "raw-cpuid" +version = "10.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6823ea29436221176fe662da99998ad3b4db2c7f31e7b6f5fe43adccd6320bb" +dependencies = [ + "bitflags", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -934,6 +1226,15 @@ dependencies = [ "worker", ] +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.36.6" @@ -960,6 +1261,39 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "scheduled-thread-pool" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf" +dependencies = [ + "parking_lot", +] + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "semver" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a" +dependencies = [ + "serde", +] + [[package]] name = "serde" version = "1.0.147" @@ -1004,6 +1338,21 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.7" @@ -1013,6 +1362,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "smallvec" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" + [[package]] name = "socket2" version = "0.4.7" @@ -1059,6 +1414,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tempfile" version = "3.3.0" @@ -1082,6 +1443,26 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "thiserror" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio" version = "1.21.2" @@ -1297,12 +1678,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "triomphe" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" + [[package]] name = "try-lock" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-ident" version = "1.0.5" @@ -1315,12 +1711,38 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1e5fa573d8ac5f1a856f8d7be41d390ee973daf97c806b2c1a465e4e1406e68" +[[package]] +name = "uuid" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" +dependencies = [ + "getrandom 0.2.8", +] + [[package]] name = "version_check" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + +[[package]] +name = "walkdir" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +dependencies = [ + "same-file", + "winapi", + "winapi-util", +] + [[package]] name = "want" version = "0.3.0" @@ -1337,12 +1759,91 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaf9f5aceeec8be17c128b2e93e031fb8a4d469bb9c4ae2d7dc1888b26887268" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c8ffb332579b0557b52d268b91feab8df3615f265d5270fec2a8c95b17c1142" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "052be0f94026e6cbc75cdefc9bae13fd6052cdcaf532fa6c45e7ae33a1e6c810" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" + +[[package]] +name = "web-sys" +version = "0.3.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "wepoll-ffi" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb" +dependencies = [ + "cc", +] + [[package]] name = "which" version = "4.3.0" @@ -1452,6 +1953,7 @@ dependencies = [ "futures-util", "http", "internals", + "moka", "once_cell", "proto", "rand 0.7.3", diff --git a/sdks/rust/worker/Cargo.toml b/sdks/rust/worker/Cargo.toml index 5c72e90d55a5..35c8df6edb36 100644 --- a/sdks/rust/worker/Cargo.toml +++ b/sdks/rust/worker/Cargo.toml @@ -26,6 +26,7 @@ proto = { path = "../proto" } clap = { version = "4.0", features = ["derive"] } http = "0.2.8" +moka = { version = "0.9.6", features = ["future"] } once_cell = "1.16.0" strum_macros = "0.24" diff --git a/sdks/rust/worker/src/sdk_worker.rs b/sdks/rust/worker/src/sdk_worker.rs index 01ef40764c7f..0379e1633732 100644 --- a/sdks/rust/worker/src/sdk_worker.rs +++ b/sdks/rust/worker/src/sdk_worker.rs @@ -17,25 +17,61 @@ */ use std::collections::HashMap; +use std::error::Error; use std::sync::{Arc, Mutex, RwLock}; use http::Uri; use proto::beam_api::pipeline::PTransform; +use tokio::sync::mpsc; +use tonic::codegen::InterceptedService; +use tonic::metadata::{Ascii, MetadataValue}; +use tonic::service::Interceptor; use tonic::transport::Channel; +use tonic::Status; +use proto::beam_api::fn_execution::instruction_request; use proto::beam_api::fn_execution::{ - beam_fn_control_client::BeamFnControlClient, InstructionRequest, InstructionResponse, - ProcessBundleDescriptor, ProcessBundleResponse, ProcessBundleSplitRequest, - ProcessBundleSplitResponse, + beam_fn_control_client::BeamFnControlClient, FinalizeBundleRequest, + GetProcessBundleDescriptorRequest, HarnessMonitoringInfosRequest, InstructionRequest, + InstructionResponse, MonitoringInfosMetadataRequest, ProcessBundleDescriptor, + ProcessBundleProgressRequest, ProcessBundleRequest, ProcessBundleResponse, + ProcessBundleSplitRequest, ProcessBundleSplitResponse, RegisterRequest, }; use crate::operators::{create_operator, Operator, OperatorContext, OperatorI, Receiver}; +#[derive(Clone)] +struct WorkerIdInterceptor { + id: MetadataValue, +} + +impl WorkerIdInterceptor { + fn new(id: String) -> Self { + Self { + id: id.parse().unwrap(), + } + } +} + +impl Interceptor for WorkerIdInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result, Status> { + request.metadata_mut().insert("worker_id", self.id.clone()); + Ok(request) + } +} + +type BundleDescriptorId = String; +type InstructionId = String; + +// TODO(sjvanrossum): Convert simple map caches to concurrent caches. +// Using concurrent caches removes the need to synchronize on the worker instance in every context. #[derive(Debug)] pub struct Worker { - control_client: Arc>, - - process_bundle_descriptors: HashMap, + // Cheap and safe to clone + control_client: BeamFnControlClient>, + // Cheap and safe to clone + process_bundle_descriptors: + moka::future::Cache>, bundle_processors: HashMap, active_bundle_processors: HashMap, id: String, @@ -44,17 +80,21 @@ pub struct Worker { } impl Worker { + // TODO(sjvanrossum): Remove Arc and Mutex once the worker's state uses + // concurrent data structures and/or finer grained locks. pub async fn new(id: String, endpoints: WorkerEndpoints) -> Arc> { // TODO: parse URIs in the endpoint struct let channel = Channel::builder(endpoints.get_endpoint().parse::().unwrap()) .connect() .await - .expect("Failed to connect to worker"); + .expect("Failed to connect to control service"); + let client = + BeamFnControlClient::with_interceptor(channel, WorkerIdInterceptor::new(id.clone())); Arc::new(Mutex::new(Self { - control_client: Arc::new(BeamFnControlClient::new(channel)), - - process_bundle_descriptors: HashMap::new(), + control_client: client, + // TODO(sjvanrossum): Maybe define the eviction policy + process_bundle_descriptors: moka::future::Cache::builder().build(), bundle_processors: HashMap::new(), active_bundle_processors: HashMap::new(), id, @@ -63,10 +103,103 @@ impl Worker { })) } + pub async fn start(&mut self) -> Result<(), Box> { + let (control_res_tx, mut control_res_rx) = mpsc::channel::(100); + + let outbound = async_stream::stream! { + while let Some(control_res) = control_res_rx.recv().await { + yield control_res + } + }; + let response = self.control_client.control(outbound).await?; + let mut inbound = response.into_inner(); + + while let Some(control_req) = inbound.message().await? { + match control_req.request { + Some(instruction_request::Request::ProcessBundle(instr_req)) => { + self.process_bundle(instr_req); + } + Some(instruction_request::Request::ProcessBundleProgress(instr_req)) => { + self.process_bundle_progress(instr_req); + } + Some(instruction_request::Request::ProcessBundleSplit(instr_req)) => { + self.process_bundle_split(instr_req); + } + Some(instruction_request::Request::FinalizeBundle(instr_req)) => { + self.finalize_bundle(instr_req); + } + Some(instruction_request::Request::MonitoringInfos(instr_req)) => { + self.monitoring_infos(instr_req); + } + Some(instruction_request::Request::HarnessMonitoringInfos(instr_req)) => { + self.harness_monitoring_infos(instr_req); + } + Some(instruction_request::Request::Register(instr_req)) => { + self.register(instr_req); + } + _ => { + control_res_tx + .send(InstructionResponse { + instruction_id: control_req.instruction_id.clone(), + error: format!("Unexpected request: {:?}", control_req), + response: None, + }) + .await?; + } + }; + } + Ok(()) + } + // TODO pub fn stop(&mut self) { unimplemented!() } + + fn process_bundle(&self, request: ProcessBundleRequest) -> () { + let mut client = self.control_client.clone(); + let descriptor_cache = self.process_bundle_descriptors.clone(); + tokio::spawn(async move { + let descriptor = descriptor_cache + .try_get_with::<_, Status>(request.process_bundle_descriptor_id.clone(), async { + let res = client + .get_process_bundle_descriptor(GetProcessBundleDescriptorRequest { + process_bundle_descriptor_id: request + .process_bundle_descriptor_id + .clone(), + }) + .await?; + Ok(Arc::new(res.into_inner())) + }) + .await + .unwrap(); + // TODO(sjvanrossum): Fetch bundle processor and process bundle + }); + } + + fn process_bundle_progress(&self, request: ProcessBundleProgressRequest) -> () { + // TODO(sjvanrossum): Flesh out after process_bundle is sufficiently implemented + } + + fn process_bundle_split(&self, request: ProcessBundleSplitRequest) -> () { + // TODO(sjvanrossum): Flesh out after process_bundle is sufficiently implemented + } + + fn finalize_bundle(&self, request: FinalizeBundleRequest) -> () { + // TODO(sjvanrossum): Flesh out after process_bundle is sufficiently implemented. + } + + fn monitoring_infos(&self, request: MonitoringInfosMetadataRequest) -> () { + // TODO: Implement + } + + fn harness_monitoring_infos(&self, request: HarnessMonitoringInfosRequest) -> () { + // TODO: Implement + } + + fn register(&self, request: RegisterRequest) -> () { + // TODO: Implement or maybe respond with a failure since this is deprecated + } } #[derive(Clone, Debug)]