diff --git a/.gitignore b/.gitignore index ee653c4..b35df26 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ debug/ target/ -Cargo.lock \ No newline at end of file +Cargo.lock +example-govrl diff --git a/README.md b/README.md index f8cdbf8..aa35802 100644 --- a/README.md +++ b/README.md @@ -5,15 +5,58 @@ Experimental go bindings for [Vector Remap Language](https://vector.dev/docs/ref > Vector Remap Language (VRL) is an expression-oriented language designed for transforming observability data (logs and metrics) in a safe and performant manner. It features a simple syntax and a rich set of built-in functions tailored specifically to observability use cases. +## Versions +There are two major versions of this module and consumers must choose which is a +better fit for their use case. + +They aim to support as similar an interface as possible, with the key +distinction being how VRL programs are executed. + +- **V5** uses `cgo` to interface with a custom library built from VRL. This has + better performance with the main downside being that it relies on `cgo`, which + some applications may not care for. +- **V10** uses `wasm` to execute VRL. It performs worse, on the order of 2-3 times + slower, however VRL is quite efficient so this still offers relatively good + absolute performance. + ## Usage +### Feature Support + +| | V5 | V10 | +|-------------------------- | ------------------- | --- | +| Compiling a VRL Program | ✅ | ✅ | +| Running a VRL Program | ✅ | ✅ | +| VRL Runtime "Basic"\* API | ✅ | ✅ | +| Environment Kinds | 'Byte' and 'Object' | ❌ | +| Secrets | ❌ | ❌ | +| Metadata | ❌ | ❌ | +| Timezones | ❌ | ❌ | +| Requires CGO | ✅ | ❌ | +| Full VRL stdlib support | ✅ | ❌\* | + + +\* "Basic" API currently means: +- `compile` +- `resolve` (run) the compiled program +- `clear` +- `is_empty` + +\* WASM supports almost most of VRL's stdlib functions, the unsupported ones can +be found [with this GH issues +search](https://github.com/vectordotdev/vector/issues?q=is%3Aopen+is%3Aissue+label%3A%22vrl%3A+playground%22+wasm+compatible) + ### Building and importing -Not quite ready yet. It's difficult to distribute a go module that depends on an external build system (However I am open to suggestions) +Not quite ready yet. It's difficult to distribute a go module that depends on an external build system, we have some ideas though. + +To use this repo as-is, its required to manually compile the rust dependency. +For V5: `cd v5; cargo build --release; cd example/; go run .` +For V10: `cd v10; cargo build --target wasm32-wasi --release; cd example/; go run .` -To use this repo as-is. `./run.sh` to build and run `main.go` +### Examples -### Example +#### V5 ```go program, err := govrl.CompileWithExternal(`replace(., "go", "rust")`, govrl.GetExternalEnv(govrl.Bytes, govrl.Bytes)) @@ -35,21 +78,75 @@ $ go run . "hello rust" ``` -[see `./example/main.go` for more examples](./example/main.go) +[see `./v5/example/main.go` for more examples](./v5/example/main.go) -## What works +#### V10 + +```go +package main -- Compiling VRL programs (and handling errors) - - Supports bytes and object external environment kinds -- Initializing the VRL runtime including: - - `resolve` (run) the compled program - - `clear` - - `is_empty` +import ( + "context" + "fmt" + "log" + + govrl "github.com/gh123man/go-vrl/v10" +) + +func main() { + simpleDefault() +} + +func simpleDefault() { + ctx := context.Background() + wasmInterface := govrl.NewWasmInterface(ctx) + program, err := wasmInterface.Compile(` + . = parse_json!(string!(.)) + del(.foo) + + .timestamp = now() + + http_status_code = parse_int!(.http_status) + del(.http_status) + + if http_status_code >= 200 && http_status_code <= 299 { + .status = "success" + } else { + .status = "error" + } + . + `) + + if err != nil { + log.Panicln(err) + return + } + + runtime, err := wasmInterface.NewRuntime() + if err != nil { + log.Panicln(err) + } + + res, err := runtime.Resolve(program, `{ + "message": "Hello VRL", + "foo": "delete me", + "http_status": "200" + } + `) + if err != nil { + fmt.Println(err) + return + } + + fmt.Println(res) + runtime.Clear() +} +``` + +```bash +$ go run . +{ "message": "Hello VRL", "status": "success", "timestamp": t'2022-01-01T00:00:00Z' } +``` -## What doesn't work/missing bindings +[see `./v10/example/main.go` for more examples](./v10/example/main.go) -- secrets -- metadata -- timezone -- environment configuration (partially implemented) -- most input types (other than bytes and object) diff --git a/build.sh b/build.sh deleted file mode 100755 index 0df0928..0000000 --- a/build.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash -cargo build --release -go build . \ No newline at end of file diff --git a/example/go.mod b/example/go.mod deleted file mode 100644 index dd12ea7..0000000 --- a/example/go.mod +++ /dev/null @@ -1,7 +0,0 @@ -module example-govrl - -go 1.18 - -replace github.com/gh123man/go-vrl => ../ - -require github.com/gh123man/go-vrl v0.0.0-00010101000000-000000000000 diff --git a/go.mod b/go.mod deleted file mode 100644 index 1364c9c..0000000 --- a/go.mod +++ /dev/null @@ -1,3 +0,0 @@ -module github.com/gh123man/go-vrl - -go 1.19 diff --git a/v10/Cargo.toml b/v10/Cargo.toml new file mode 100644 index 0000000..d97ea9d --- /dev/null +++ b/v10/Cargo.toml @@ -0,0 +1,157 @@ +[package] +name = "vrl-bridge" +version = "0.1.0" +edition = "2021" + +[profile.release] +debug=true + +[lib] +name = "vrl_bridge" +crate-type = ["rlib", "cdylib"] + +[dependencies] +libc = "0.2" +serde_json = "1.0.87" + +# TODO: pin to stable version +value = { git = "https://github.com/vectordotdev/vector", default-features = false } +vrl = { git = "https://github.com/vectordotdev/vector", default-features = false } +vrl-diagnostic = { git = "https://github.com/vectordotdev/vector", package = "vrl-diagnostic" } + +[dependencies.vrl-stdlib] +package = "vrl-stdlib" +git = "https://github.com/vectordotdev/vector" +default-features = false +# list of wasm-supported features taken from https://github.com/vectordotdev/vector/blob/master/lib/vrl/web-playground/Cargo.toml +features = [ + "append", + "array", + "assert", + "assert_eq", + "ceil", + "chunks", + "compact", + "contains", + "decode_base64", + "decode_percent", + "del", + "downcase", + "encode_base64", + "encode_json", + "encode_key_value", + "encode_logfmt", + "encode_percent", + "ends_with", + "exists", + "filter", + "find", + "flatten", + "float", + "floor", + "for_each", + "format_int", + "format_number", + "format_timestamp", + "get", + "get_env_var", + "includes", + "ip_aton", + "ip_cidr_contains", + "ip_ntoa", + "ip_ntop", + "ip_pton", + "ip_subnet", + "ip_to_ipv6", + "ipv6_to_ipv4", + "is_array", + "is_boolean", + "is_empty", + "is_float", + "is_integer", + "is_ipv4", + "is_ipv6", + "is_json", + "is_null", + "is_nullish", + "is_object", + "is_regex", + "is_string", + "is_timestamp", + "join", + "keys", + "length", + "map_keys", + "map_values", + "match", + "match_any", + "match_array", + "match_datadog_query", + "md5", + "merge", + "mod", + "now", + "object", + "parse_apache_log", + "parse_aws_alb_log", + "parse_aws_cloudwatch_log_subscription_message", + "parse_aws_vpc_flow_log", + "parse_common_log", + "parse_csv", + "parse_duration", + "parse_glog", + "parse_int", + "parse_json", + "parse_key_value", + "parse_klog", + "parse_linux_authorization", + "parse_logfmt", + "parse_nginx_log", + "parse_query_string", + "parse_regex", + "parse_regex_all", + "parse_ruby_hash", + "parse_syslog", + "parse_timestamp", + "parse_tokens", + "parse_url", + "parse_user_agent", + "parse_xml", + "push", + "redact", + "remove", + "replace", + "round", + "set", + "sha1", + "sha2", + "sha3", + "slice", + "split", + "starts_with", + "string", + "strip_ansi_escape_codes", + "strip_whitespace", + "strlen", + "tally", + "tally_value", + "tag_types_externally", + "timestamp", + "to_bool", + "to_float", + "to_int", + "to_regex", + "to_string", + "to_syslog_facility", + "to_syslog_level", + "to_syslog_severity", + "to_timestamp", + "to_unix_timestamp", + "truncate", + "type_def", + "unique", + "unnest", + "upcase", + "values", +] + diff --git a/v10/example/go.mod b/v10/example/go.mod new file mode 100644 index 0000000..31d56a1 --- /dev/null +++ b/v10/example/go.mod @@ -0,0 +1,9 @@ +module example-govrl + +go 1.18 + +replace github.com/gh123man/go-vrl/v10 => ../ + +require github.com/gh123man/go-vrl/v10 v10.0.0 + +require github.com/tetratelabs/wazero v1.0.0-pre.4 // indirect diff --git a/v10/example/go.sum b/v10/example/go.sum new file mode 100644 index 0000000..c4478a8 --- /dev/null +++ b/v10/example/go.sum @@ -0,0 +1,2 @@ +github.com/tetratelabs/wazero v1.0.0-pre.4 h1:RBJQT5OzmORkSp6MmZDWoFEr0zXjk4pmvMKAdeUnsaI= +github.com/tetratelabs/wazero v1.0.0-pre.4/go.mod h1:u8wrFmpdrykiFK0DFPiFm5a4+0RzsdmXYVtijBKqUVo= diff --git a/v10/example/main.go b/v10/example/main.go new file mode 100644 index 0000000..1640ab9 --- /dev/null +++ b/v10/example/main.go @@ -0,0 +1,58 @@ +package main + +import ( + "context" + "fmt" + "log" + + govrl "github.com/gh123man/go-vrl/v10" +) + +func main() { + simpleDefault() +} + +func simpleDefault() { + ctx := context.Background() + wasmInterface := govrl.NewWasmInterface(ctx) + program, err := wasmInterface.Compile(` + . = parse_json!(string!(.)) + del(.foo) + + .timestamp = now() + + http_status_code = parse_int!(.http_status) + del(.http_status) + + if http_status_code >= 200 && http_status_code <= 299 { + .status = "success" + } else { + .status = "error" + } + . + `) + + if err != nil { + log.Panicln(err) + return + } + + runtime, err := wasmInterface.NewRuntime() + if err != nil { + log.Panicln(err) + } + + res, err := runtime.Resolve(program, `{ + "message": "Hello VRL", + "foo": "delete me", + "http_status": "200" + } + `) + if err != nil { + fmt.Println(err) + return + } + + fmt.Println(res) + runtime.Clear() +} diff --git a/v10/go.mod b/v10/go.mod new file mode 100644 index 0000000..6ee24da --- /dev/null +++ b/v10/go.mod @@ -0,0 +1,3 @@ +module github.com/gh123man/go-vrl/v10 + +go 1.18 diff --git a/v10/src/lib.rs b/v10/src/lib.rs new file mode 100644 index 0000000..ee93f2d --- /dev/null +++ b/v10/src/lib.rs @@ -0,0 +1,122 @@ +extern crate alloc; +use std::collections::BTreeMap; +use value::{Secrets, Value}; +use vrl::diagnostic::Formatter; +use vrl::Program; +use vrl::TimeZone; +use vrl::{state, Runtime, TargetValueRef}; +use vrl_diagnostic::DiagnosticList; + +use alloc::vec::Vec; +use std::mem::MaybeUninit; +use std::slice; + +// Compiler + +#[cfg_attr(all(target_arch = "wasm32"), export_name = "compile_vrl")] +#[no_mangle] +/// compile_vrl takes in a string representing a VRL program +/// Returns a 0 (null) if there was an error, otherwise +/// a pointer to the compiled program in linear memory +pub extern "C" fn compile_vrl(ptr: u32, len: u32) -> *mut Program { + let program_string = unsafe { ptr_to_string(ptr, len) }; + + match vrl::compile(&program_string, &vrl_stdlib::all()) { + Ok(res) => return Box::into_raw(Box::new(res.program)), + Err(err) => { + // TODO return formatted string + let f = Formatter::new(&program_string, err); + panic!("{}", f.to_string()); + return std::ptr::null_mut(); + } + } +} + +unsafe fn ptr_to_string(ptr: u32, len: u32) -> String { + let slice = slice::from_raw_parts_mut(ptr as *mut u8, len as usize); + let utf8 = std::str::from_utf8_unchecked_mut(slice); + return String::from(utf8); +} + +// Runtime +#[cfg_attr(all(target_arch = "wasm32"), export_name = "new_runtime")] +#[no_mangle] +pub extern "C" fn new_runtime() -> *mut Runtime { + Box::into_raw(Box::new(Runtime::new(state::Runtime::default()))) +} + +#[no_mangle] +pub extern "C" fn runtime_resolve( + runtime: u32, + program: u32, + input_ptr: u32, + input_len: u32, +) -> u64 { + let rt = unsafe { (runtime as *mut Runtime).as_mut().unwrap() }; + let prog = unsafe { (program as *const Program).as_ref().unwrap() }; + let inpt = unsafe { ptr_to_string(input_ptr, input_len) }; + + let mut value: Value = Value::from(inpt.as_str()); + let mut metadata = Value::Object(BTreeMap::new()); + let mut secrets = Secrets::new(); + let mut target = TargetValueRef { + value: &mut value, + metadata: &mut metadata, + secrets: &mut secrets, + }; + + match rt.resolve(&mut target, &prog, &TimeZone::Local) { + Ok(res) => { + let s = res.to_string(); + return ((s.as_ptr() as u64) << 32) | s.len() as u64; + } + Err(_err) => { + return 0; + } + } +} + +#[cfg_attr(all(target_arch = "wasm32"), export_name = "runtime_clear")] +#[no_mangle] +pub extern "C" fn runtime_clear(runtime: u32) { + let rt = unsafe { (runtime as *mut Runtime).as_mut().unwrap() }; + rt.clear() +} + +#[cfg_attr(all(target_arch = "wasm32"), export_name = "runtime_is_empty")] +#[no_mangle] +pub extern "C" fn runtime_is_empty(runtime: u32) -> bool { + let rt = unsafe { (runtime as *mut Runtime).as_mut().unwrap() }; + return rt.is_empty(); +} + +// WASM Memory-related helper functinos + +/// WebAssembly export that allocates a pointer (linear memory offset) that can +/// be used for a string. +/// +/// This is an ownership transfer, which means the caller must call +/// [`deallocate`] when finished. +#[cfg_attr(all(target_arch = "wasm32"), export_name = "allocate")] +#[no_mangle] +pub extern "C" fn allocate(size: u32) -> *mut u8 { + // Allocate the amount of bytes needed. + let vec: Vec> = Vec::with_capacity(size.try_into().unwrap()); + + // into_raw leaks the memory to the caller. + Box::into_raw(vec.into_boxed_slice()) as *mut u8 +} + +/// WebAssembly export that deallocates a pointer of the given size (linear +/// memory offset, byteCount) allocated by [`allocate`]. +#[cfg_attr(all(target_arch = "wasm32"), export_name = "deallocate")] +#[no_mangle] +pub unsafe extern "C" fn _deallocate(ptr: u32, size: u32) { + deallocate(ptr as *mut u8, size as usize); +} + +/// Retakes the pointer which allows its memory to be freed. +unsafe fn deallocate(ptr: *mut u8, size: usize) { + // TODO - should this be Box::from_raw? (see Box::into_raw docs) + let _ = Vec::from_raw_parts(ptr, 0, size); +} diff --git a/v10/vrl.go b/v10/vrl.go new file mode 100644 index 0000000..bad5c2d --- /dev/null +++ b/v10/vrl.go @@ -0,0 +1,53 @@ +package govrl + +import ( + "log" +) + +type Program struct { + ptr uint32 + wasm *WasmInterface +} + +type Runtime struct { + ptr uint32 + wasm *WasmInterface +} + +func (r *Runtime) Resolve(program *Program, input string) (string, error) { + runtimeResolveFunc := r.wasm.mod.ExportedFunction("runtime_resolve") + + inputWasmString := r.wasm.newWasmString(input) + + results, err := runtimeResolveFunc.Call(r.wasm.ctx, uint64(r.ptr), uint64(program.ptr), uint64(inputWasmString.ptr), uint64(inputWasmString.length)) + if err != nil { + return "", nil + } + + resultPtr, resultLength := unpackUInt64(results[0]) + resultStringBytes, ok := r.wasm.mod.Memory().Read(r.wasm.ctx, resultPtr, resultLength) + if !ok { + log.Panicf("Memory.Read(%d, %d) out of range of memory size %d", + resultPtr, resultLength, r.wasm.mod.Memory().Size(r.wasm.ctx)) + } + res := string(resultStringBytes) + return res, nil +} + +func (r *Runtime) Clear() error { + runtimeClearFunc := r.wasm.mod.ExportedFunction("runtime_clear") + + _, err := runtimeClearFunc.Call(r.wasm.ctx, uint64(r.ptr)) + return err +} + +func (r *Runtime) IsEmpty() (bool, error) { + runtimeIsEmptyFunc := r.wasm.mod.ExportedFunction("runtime_is_empty") + + results, err := runtimeIsEmptyFunc.Call(r.wasm.ctx, uint64(r.ptr)) + + if err != nil { + return false, err + } + return results[0] > 0, err +} diff --git a/v10/wasminterface.go b/v10/wasminterface.go new file mode 100644 index 0000000..c17cdd8 --- /dev/null +++ b/v10/wasminterface.go @@ -0,0 +1,126 @@ +package govrl + +import ( + "context" + _ "embed" + "fmt" + "log" + "runtime" + + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" +) + +//go:embed target/wasm32-wasi/release/vrl_bridge.wasm +var wasmBytes []byte + +type WasmInterface struct { + ctx context.Context + mod api.Module + runtime wazero.Runtime +} + +type WasmString struct { + ptr uint64 + length uint64 +} + +func NewWasmInterface(ctx context.Context) *WasmInterface { + // Create a new WebAssembly Runtime. + r := wazero.NewRuntime(ctx) + + wasi_snapshot_preview1.MustInstantiate(ctx, r) + mod, err := r.InstantiateModuleFromBinary(ctx, wasmBytes) + if err != nil { + log.Panicln(err) + } + + // TODO check for expected exports, return error if they're not present + + return &WasmInterface{ + ctx: ctx, mod: mod, runtime: r, + } +} + +func (wr *WasmInterface) Compile(program string) (*Program, error) { + compileVrlFunc := wr.mod.ExportedFunction("compile_vrl") + + ws := wr.newWasmString(program) + + results, err := compileVrlFunc.Call(wr.ctx, ws.ptr, ws.length) + if err != nil { + return nil, err + } + + programPtr := uint32(results[0]) + if programPtr == 0 { + return nil, fmt.Errorf("Unknown error from compile_vrl rust-side") + } + + return &Program{programPtr, wr}, nil +} + +func (wr *WasmInterface) NewRuntime() (*Runtime, error) { + newRuntimeFunc := wr.mod.ExportedFunction("new_runtime") + + results, err := newRuntimeFunc.Call(wr.ctx) + + if err != nil { + return nil, nil + } + + runtimePtr := uint32(results[0]) + if runtimePtr == 0 { + return nil, nil + } + + return &Runtime{runtimePtr, wr}, nil +} + +// Helpers + +func (wr *WasmInterface) newWasmString(input string) *WasmString { + inputSize := uint64(len(input)) + ptr := wr.allocate(inputSize) + + // The pointer is a linear memory offset, which is where we write the input string. + if !wr.mod.Memory().Write(wr.ctx, uint32(ptr), []byte(input)) { + log.Panicf("Memory.Write(%d, %d) out of range of memory size %d", + ptr, len(input), wr.mod.Memory().Size(wr.ctx)) + } + + ws := WasmString{ + ptr: ptr, + length: uint64(len(input)), + } + + runtime.SetFinalizer(&ws, func(ws *WasmString) { wr.deallocate(ws.ptr, ws.length) }) + + return &ws +} + +func (wr *WasmInterface) allocate(numBytes uint64) uint64 { + allocate := wr.mod.ExportedFunction("allocate") + + results, err := allocate.Call(wr.ctx, numBytes) + if err != nil { + log.Panicln(err) + } + + return results[0] +} + +func (wr *WasmInterface) deallocate(ptr uint64, size uint64) { + deallocate := wr.mod.ExportedFunction("deallocate") + + deallocate.Call(wr.ctx, ptr, size) +} + +func (wr *WasmInterface) Close() { + wr.runtime.Close(wr.ctx) +} + +func unpackUInt64(val uint64) (uint32, uint32) { + return uint32(val >> 32), uint32(val) +} diff --git a/Cargo.toml b/v5/Cargo.toml similarity index 100% rename from Cargo.toml rename to v5/Cargo.toml diff --git a/v5/example/go.mod b/v5/example/go.mod new file mode 100644 index 0000000..df81c01 --- /dev/null +++ b/v5/example/go.mod @@ -0,0 +1,7 @@ +module example-govrl + +go 1.18 + +replace github.com/gh123man/go-vrl/v5 => ../v5 + +require github.com/gh123man/go-vrl/v5 v5.0.0 diff --git a/example/main.go b/v5/example/main.go similarity index 96% rename from example/main.go rename to v5/example/main.go index c19c0b0..7b4c1de 100644 --- a/example/main.go +++ b/v5/example/main.go @@ -4,7 +4,7 @@ import ( "fmt" "log" - govrl "github.com/gh123man/go-vrl" + govrl "github.com/gh123man/go-vrl/v5" ) func main() { diff --git a/v5/go.mod b/v5/go.mod new file mode 100644 index 0000000..75d12df --- /dev/null +++ b/v5/go.mod @@ -0,0 +1,3 @@ +module github.com/gh123man/go-vrl/v5 + +go 1.19 diff --git a/program.go b/v5/program.go similarity index 100% rename from program.go rename to v5/program.go diff --git a/run.sh b/v5/run.sh similarity index 100% rename from run.sh rename to v5/run.sh diff --git a/runtime.go b/v5/runtime.go similarity index 100% rename from runtime.go rename to v5/runtime.go diff --git a/src/lib.rs b/v5/src/lib.rs similarity index 100% rename from src/lib.rs rename to v5/src/lib.rs diff --git a/types.go b/v5/types.go similarity index 100% rename from types.go rename to v5/types.go diff --git a/vrl.h b/v5/vrl.h similarity index 100% rename from vrl.h rename to v5/vrl.h