diff --git a/pkg/workflows/wasm/host/errno.go b/pkg/workflows/wasm/host/errno.go new file mode 100644 index 0000000000..2cc1ebb73a --- /dev/null +++ b/pkg/workflows/wasm/host/errno.go @@ -0,0 +1,165 @@ +// NOTE: loosely based on: https://github.com/tetratelabs/wazero/blob/1353ca24fef0a57a3a342d75f20357a6e9d3be35/internal/wasip1/errno.go#L14 +package host + +type Errno = int32 + +// Note: Below prefers POSIX symbol names over WASI ones, even if the docs are from WASI. +// See https://linux.die.net/man/3/errno +// See https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#variants-1 +const ( + // ErrnoSuccess No error occurred. System call completed successfully. + ErrnoSuccess Errno = iota + // Errno2big Argument list too long. + Errno2big + // ErrnoAcces Permission denied. + ErrnoAcces + // ErrnoAddrinuse Address in use. + ErrnoAddrinuse + // ErrnoAddrnotavail Address not available. + ErrnoAddrnotavail + // ErrnoAfnosupport Address family not supported. + ErrnoAfnosupport + // ErrnoAgain Resource unavailable, or operation would block. + ErrnoAgain + // ErrnoAlready Connection already in progress. + ErrnoAlready + // ErrnoBadf Bad file descriptor. + ErrnoBadf + // ErrnoBadmsg Bad message. + ErrnoBadmsg + // ErrnoBusy Device or resource busy. + ErrnoBusy + // ErrnoCanceled Operation canceled. + ErrnoCanceled + // ErrnoChild No child processes. + ErrnoChild + // ErrnoConnaborted Connection aborted. + ErrnoConnaborted + // ErrnoConnrefused Connection refused. + ErrnoConnrefused + // ErrnoConnreset Connection reset. + ErrnoConnreset + // ErrnoDeadlk Resource deadlock would occur. + ErrnoDeadlk + // ErrnoDestaddrreq Destination address required. + ErrnoDestaddrreq + // ErrnoDom Mathematics argument out of domain of function. + ErrnoDom + // ErrnoDquot Reserved. + ErrnoDquot + // ErrnoExist File exists. + ErrnoExist + // ErrnoFault Bad address. + ErrnoFault + // ErrnoFbig File too large. + ErrnoFbig + // ErrnoHostunreach Host is unreachable. + ErrnoHostunreach + // ErrnoIdrm Identifier removed. + ErrnoIdrm + // ErrnoIlseq Illegal byte sequence. + ErrnoIlseq + // ErrnoInprogress Operation in progress. + ErrnoInprogress + // ErrnoIntr Interrupted function. + ErrnoIntr + // ErrnoInval Invalid argument. + ErrnoInval + // ErrnoIo I/O error. + ErrnoIo + // ErrnoIsconn Socket is connected. + ErrnoIsconn + // ErrnoIsdir Is a directory. + ErrnoIsdir + // ErrnoLoop Too many levels of symbolic links. + ErrnoLoop + // ErrnoMfile File descriptor value too large. + ErrnoMfile + // ErrnoMlink Too many links. + ErrnoMlink + // ErrnoMsgsize Message too large. + ErrnoMsgsize + // ErrnoMultihop Reserved. + ErrnoMultihop + // ErrnoNametoolong Filename too long. + ErrnoNametoolong + // ErrnoNetdown Network is down. + ErrnoNetdown + // ErrnoNetreset Connection aborted by network. + ErrnoNetreset + // ErrnoNetunreach Network unreachable. + ErrnoNetunreach + // ErrnoNfile Too many files open in system. + ErrnoNfile + // ErrnoNobufs No buffer space available. + ErrnoNobufs + // ErrnoNodev No such device. + ErrnoNodev + // ErrnoNoent No such file or directory. + ErrnoNoent + // ErrnoNoexec Executable file format error. + ErrnoNoexec + // ErrnoNolck No locks available. + ErrnoNolck + // ErrnoNolink Reserved. + ErrnoNolink + // ErrnoNomem Not enough space. + ErrnoNomem + // ErrnoNomsg No message of the desired type. + ErrnoNomsg + // ErrnoNoprotoopt No message of the desired type. + ErrnoNoprotoopt + // ErrnoNospc No space left on device. + ErrnoNospc + // ErrnoNosys function not supported. + ErrnoNosys + // ErrnoNotconn The socket is not connected. + ErrnoNotconn + // ErrnoNotdir Not a directory or a symbolic link to a directory. + ErrnoNotdir + // ErrnoNotempty Directory not empty. + ErrnoNotempty + // ErrnoNotrecoverable State not recoverable. + ErrnoNotrecoverable + // ErrnoNotsock Not a socket. + ErrnoNotsock + // ErrnoNotsup Not supported, or operation not supported on socket. + ErrnoNotsup + // ErrnoNotty Inappropriate I/O control operation. + ErrnoNotty + // ErrnoNxio No such device or address. + ErrnoNxio + // ErrnoOverflow Value too large to be stored in data type. + ErrnoOverflow + // ErrnoOwnerdead Previous owner died. + ErrnoOwnerdead + // ErrnoPerm Operation not permitted. + ErrnoPerm + // ErrnoPipe Broken pipe. + ErrnoPipe + // ErrnoProto Protocol error. + ErrnoProto + // ErrnoProtonosupport Protocol error. + ErrnoProtonosupport + // ErrnoPrototype Protocol wrong type for socket. + ErrnoPrototype + // ErrnoRange Result too large. + ErrnoRange + // ErrnoRofs Read-only file system. + ErrnoRofs + // ErrnoSpipe Invalid seek. + ErrnoSpipe + // ErrnoSrch No such process. + ErrnoSrch + // ErrnoStale Reserved. + ErrnoStale + // ErrnoTimedout Connection timed out. + ErrnoTimedout + // ErrnoTxtbsy Text file busy. + ErrnoTxtbsy + // ErrnoXdev Cross-device link. + ErrnoXdev + + // Note: ErrnoNotcapable was removed by WASI maintainers. + // See https://github.com/WebAssembly/wasi-libc/pull/294 +) diff --git a/pkg/workflows/wasm/host/module.go b/pkg/workflows/wasm/host/module.go index f4294299a1..3a37b64e1e 100644 --- a/pkg/workflows/wasm/host/module.go +++ b/pkg/workflows/wasm/host/module.go @@ -8,7 +8,6 @@ import ( "strings" "sync" "time" - "unsafe" "github.com/bytecodealliance/wasmtime-go/v23" "google.golang.org/protobuf/proto" @@ -18,18 +17,28 @@ import ( wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb" ) -func safeMem(caller *wasmtime.Caller, ptr unsafe.Pointer, size int32) ([]byte, error) { +func safeMem(caller *wasmtime.Caller, ptr int32, size int32) ([]byte, error) { mem := caller.GetExport("memory").Memory() data := mem.UnsafeData(caller) - iptr := int32(uintptr(ptr)) - if iptr+size > int32(len(data)) { + if ptr+size > int32(len(data)) { return nil, errors.New("out of bounds memory access") } cd := make([]byte, size) - copy(cd, data[iptr:iptr+size]) + copy(cd, data[ptr:ptr+size]) return cd, nil } +func copyBuffer(caller *wasmtime.Caller, src []byte, ptr int32, size int32) int64 { + mem := caller.GetExport("memory").Memory() + rawData := mem.UnsafeData(caller) + if int32(len(rawData)) < ptr+size { + return -1 + } + buffer := rawData[ptr : ptr+size] + dataLen := int64(len(src)) + copy(buffer, src) + return dataLen +} type respStore struct { m map[string]*wasmpb.Response @@ -65,7 +74,7 @@ var ( defaultTickInterval = 100 * time.Millisecond defaultTimeout = 300 * time.Millisecond defaultMaxMemoryMBs = 64 - defaultInitialFuel = uint64(100_000_000) + DefaultInitialFuel = uint64(100_000_000) ) type ModuleConfig struct { @@ -104,10 +113,6 @@ func NewModule(modCfg *ModuleConfig, binary []byte) (*Module, error) { modCfg.Timeout = &defaultTimeout } - if modCfg.InitialFuel == 0 { - modCfg.InitialFuel = defaultInitialFuel - } - // Take the max of the default and the configured max memory mbs. // We do this because Go requires a minimum of 16 megabytes to run, // and local testing has shown that with less than 64 mbs, some @@ -116,7 +121,9 @@ func NewModule(modCfg *ModuleConfig, binary []byte) (*Module, error) { cfg := wasmtime.NewConfig() cfg.SetEpochInterruption(true) - cfg.SetConsumeFuel(true) + if modCfg.InitialFuel > 0 { + cfg.SetConsumeFuel(true) + } engine := wasmtime.NewEngineWithConfig(cfg) @@ -125,56 +132,39 @@ func NewModule(modCfg *ModuleConfig, binary []byte) (*Module, error) { return nil, fmt.Errorf("error creating wasmtime module: %w", err) } - linker := wasmtime.NewLinker(engine) - linker.AllowShadowing(true) - - err = linker.DefineWasi() + linker, err := newWasiLinker(engine) if err != nil { - return nil, err + return nil, fmt.Errorf("error creating wasi linker: %w", err) } r := &respStore{ m: map[string]*wasmpb.Response{}, } - // TODO: Stub out poll_oneoff correctly -- it's unclear what - // the effect of this naive stub is as this syscall powers - // notifications for time.Sleep, but will also effect other system notifications. - // We need this stub to prevent binaries from calling time.Sleep and - // starving our worker pool as a result. - err = linker.FuncWrap( - "wasi_snapshot_preview1", - "poll_oneoff", - func(caller *wasmtime.Caller, a int32, b int32, c int32, d int32) int32 { - return 0 - }, - ) - if err != nil { - return nil, fmt.Errorf("could not wrap poll_oneoff: %w", err) - } - err = linker.FuncWrap( "env", "sendResponse", - func(caller *wasmtime.Caller, ptr int32, ptrlen int32) { - b, innerErr := safeMem(caller, unsafe.Pointer(uintptr(ptr)), ptrlen) + func(caller *wasmtime.Caller, ptr int32, ptrlen int32) int32 { + b, innerErr := safeMem(caller, ptr, ptrlen) if innerErr != nil { logger.Errorf("error calling sendResponse: %s", err) - return + return ErrnoFault } var resp wasmpb.Response innerErr = proto.Unmarshal(b, &resp) if innerErr != nil { logger.Errorf("error calling sendResponse: %s", err) - return + return ErrnoFault } innerErr = r.add(resp.Id, &resp) if innerErr != nil { logger.Errorf("error calling sendResponse: %s", err) - return + return ErrnoFault } + + return ErrnoSuccess }, ) if err != nil { @@ -237,9 +227,12 @@ func (m *Module) Run(request *wasmpb.Request) (*wasmpb.Response, error) { wasi.SetArgv([]string{"wasi", reqstr}) store.SetWasi(wasi) - err = store.SetFuel(m.cfg.InitialFuel) - if err != nil { - return nil, fmt.Errorf("error setting fuel: %w", err) + + if m.cfg.InitialFuel > 0 { + err = store.SetFuel(m.cfg.InitialFuel) + if err != nil { + return nil, fmt.Errorf("error setting fuel: %w", err) + } } // Limit memory to max memory megabytes per instance. @@ -283,6 +276,8 @@ func (m *Module) Run(request *wasmpb.Request) (*wasmpb.Response, error) { } return nil, fmt.Errorf("error executing runner: %s: %w", resp.ErrMsg, innerErr) + case containsCode(err, wasm.CodeHostErr): + return nil, fmt.Errorf("invariant violation: host errored during sendResponse") default: return nil, err } diff --git a/pkg/workflows/wasm/host/test/sleep/cmd/main.go b/pkg/workflows/wasm/host/test/sleep/cmd/main.go index db91fce493..df34fb120d 100644 --- a/pkg/workflows/wasm/host/test/sleep/cmd/main.go +++ b/pkg/workflows/wasm/host/test/sleep/cmd/main.go @@ -2,8 +2,12 @@ package main -import "time" +import ( + "fmt" + "time" +) func main() { - time.Sleep(10 * time.Second) + time.Sleep(1 * time.Hour) + fmt.Printf("hello") } diff --git a/pkg/workflows/wasm/host/wasip1.go b/pkg/workflows/wasm/host/wasip1.go new file mode 100644 index 0000000000..881fe7e542 --- /dev/null +++ b/pkg/workflows/wasm/host/wasip1.go @@ -0,0 +1,173 @@ +package host + +import ( + "encoding/binary" + "time" + + "github.com/bytecodealliance/wasmtime-go/v23" + "github.com/jonboulle/clockwork" +) + +var ( + nanoBase = time.Now() + clock = clockwork.NewFakeClockAt(nanoBase) +) + +func newWasiLinker(engine *wasmtime.Engine) (*wasmtime.Linker, error) { + linker := wasmtime.NewLinker(engine) + linker.AllowShadowing(true) + + err := linker.DefineWasi() + if err != nil { + return nil, err + } + + err = linker.FuncWrap( + "wasi_snapshot_preview1", + "poll_oneoff", + pollOneoff, + ) + if err != nil { + return nil, err + } + + err = linker.FuncWrap( + "wasi_snapshot_preview1", + "clock_time_get", + clockTimeGet, + ) + if err != nil { + return nil, err + } + + return linker, nil +} + +const ( + clockIDRealtime = iota + clockIDMonotonic +) + +func clockTimeGet(caller *wasmtime.Caller, id int32, precision int64, resultTimestamp int32) int32 { + var val int64 + switch id { + case clockIDMonotonic: + clock.Advance(100 * time.Millisecond) + val = clock.Since(nanoBase).Nanoseconds() + case clockIDRealtime: + clock.Advance(100 * time.Millisecond) + val = clock.Now().UnixNano() + default: + return ErrnoInval + } + + uint64Size := int32(8) + trg := make([]byte, uint64Size) + binary.LittleEndian.PutUint64(trg, uint64(val)) + copyBuffer(caller, trg, resultTimestamp, uint64Size) + return ErrnoSuccess +} + +const ( + subscriptionLen = 48 + eventsLen = 32 + + eventTypeClock = iota + eventTypeFDRead + eventTypeFDWrite +) + +// Loosely based off the implementation here: +// https://github.com/tetratelabs/wazero/blob/main/imports/wasi_snapshot_preview1/poll.go#L52 +func pollOneoff(caller *wasmtime.Caller, subscriptionptr int32, eventsptr int32, nsubscriptions int32, resultNevents int32) int32 { + if nsubscriptions == 0 { + return ErrnoInval + } + + subs, err := safeMem(caller, subscriptionptr, nsubscriptions*subscriptionLen) + if err != nil { + return ErrnoFault + } + + // Each subscription should have an event + events, err := safeMem(caller, eventsptr, nsubscriptions*eventsLen) + if err != nil { + return ErrnoFault + } + + // Zero out the buffer to be safe. + clear(events) + + nevents := 0 + timeout := time.Duration(0) + for i := int32(0); i < nsubscriptions; i++ { + // First, let's read the subscription + inOffset := i * subscriptionLen + + userData := subs[inOffset : inOffset+8] + eventType := subs[inOffset+8] + argBuf := subs[inOffset+8+8:] + + outOffset := events[nevents*eventsLen] + + slot := events[outOffset:] + switch eventType { + case eventTypeClock: + // We want to stub out clock events, + // so let's just return success, and + // we'll advance the clock by the timeout duration + // below. + newTimeout := binary.LittleEndian.Uint16(argBuf[8:16]) + flag := binary.LittleEndian.Uint16(argBuf[16:24]) + + var errno Errno + switch flag { + case 0: // relative time + errno = ErrnoSuccess + if timeout < time.Duration(newTimeout) { + timeout = time.Duration(newTimeout) + } + default: + errno = ErrnoNotsup + } + writeEvent(slot, userData, errno, eventTypeClock) + case eventTypeFDRead: + writeEvent(slot, userData, ErrnoBadf, eventTypeFDRead) + case eventTypeFDWrite: + writeEvent(slot, userData, ErrnoBadf, eventTypeFDWrite) + default: + writeEvent(slot, userData, ErrnoInval, int(eventType)) + } + + nevents++ + } + + // Advance the clock by timeout. + // This will make it seem like we've slept by timeout. + if timeout > 0 { + clock.Advance(timeout) + } + + uint32Size := int32(4) + rne := make([]byte, uint32Size) + binary.LittleEndian.PutUint32(rne, uint32(nsubscriptions)) + + size := copyBuffer(caller, rne, resultNevents, uint32Size) + if size == -1 { + return ErrnoFault + } + + size = copyBuffer(caller, events, eventsptr, nsubscriptions*eventsLen) + if size == -1 { + return ErrnoFault + } + + return ErrnoSuccess +} + +func writeEvent(slot []byte, userData []byte, errno Errno, eventType int) { + copy(slot, userData) + slot[8] = byte(errno) + slot[9] = 0 + binary.LittleEndian.PutUint32(slot[10:], uint32(eventType)) +} diff --git a/pkg/workflows/wasm/host/wasm_test.go b/pkg/workflows/wasm/host/wasm_test.go index 09847f9487..11c2195fbb 100644 --- a/pkg/workflows/wasm/host/wasm_test.go +++ b/pkg/workflows/wasm/host/wasm_test.go @@ -137,7 +137,7 @@ func TestModule_Errors(t *testing.T) { assert.ErrorContains(t, err, "invalid compute request: could not find compute function for id doesnt-exist") } -func TestModule_Sandboxes_Memory(t *testing.T) { +func TestModule_Sandbox_Memory(t *testing.T) { binary, err := os.ReadFile(createTestBinary(oomBinaryCmd, oomBinaryLocation, t)) require.NoError(t, err) @@ -154,7 +154,7 @@ func TestModule_Sandboxes_Memory(t *testing.T) { assert.ErrorContains(t, err, "exit status 2") } -func TestModule_Sandboxes_Timeout(t *testing.T) { +func TestModule_Sandbox_SleepIsStubbedOut(t *testing.T) { binary, err := os.ReadFile(createTestBinary(sleepBinaryCmd, sleepBinaryLocation, t)) require.NoError(t, err) @@ -167,11 +167,39 @@ func TestModule_Sandboxes_Timeout(t *testing.T) { Id: uuid.New().String(), Message: &wasmpb.Request_SpecRequest{}, } + + start := time.Now() _, err = m.Run(req) - assert.ErrorContains(t, err, "all fuel consumed by WebAssembly") + end := time.Now() + + // The binary sleeps for 1 hour, + // but with our stubbed out functions, + // it should execute and return almost immediately. + assert.WithinDuration(t, start, end, 10*time.Second) + assert.NotNil(t, err) +} + +func TestModule_Sandbox_Timeout(t *testing.T) { + binary, err := os.ReadFile(createTestBinary(sleepBinaryCmd, sleepBinaryLocation, t)) + require.NoError(t, err) + + tmt := 10 * time.Millisecond + m, err := NewModule(&ModuleConfig{Logger: logger.Test(t), Timeout: &tmt}, binary) + require.NoError(t, err) + + m.Start() + + req := &wasmpb.Request{ + Id: uuid.New().String(), + Message: &wasmpb.Request_SpecRequest{}, + } + + _, err = m.Run(req) + + assert.ErrorContains(t, err, "interrupt") } -func TestModule_Sandboxes_CantReadFiles(t *testing.T) { +func TestModule_Sandbox_CantReadFiles(t *testing.T) { binary, err := os.ReadFile(createTestBinary(filesBinaryCmd, filesBinaryLocation, t)) require.NoError(t, err) @@ -198,7 +226,7 @@ func TestModule_Sandboxes_CantReadFiles(t *testing.T) { assert.ErrorContains(t, err, "open /tmp/file") } -func TestModule_Sandboxes_CantCreateDir(t *testing.T) { +func TestModule_Sandbox_CantCreateDir(t *testing.T) { binary, err := os.ReadFile(createTestBinary(dirsBinaryCmd, dirsBinaryLocation, t)) require.NoError(t, err) @@ -225,7 +253,7 @@ func TestModule_Sandboxes_CantCreateDir(t *testing.T) { assert.ErrorContains(t, err, "mkdir") } -func TestModule_Sandboxes_HTTPRequest(t *testing.T) { +func TestModule_Sandbox_HTTPRequest(t *testing.T) { binary, err := os.ReadFile(createTestBinary(httpBinaryCmd, httpBinaryLocation, t)) require.NoError(t, err) @@ -252,7 +280,7 @@ func TestModule_Sandboxes_HTTPRequest(t *testing.T) { assert.NotNil(t, err) } -func TestModule_Sandboxes_ReadEnv(t *testing.T) { +func TestModule_Sandbox_ReadEnv(t *testing.T) { binary, err := os.ReadFile(createTestBinary(envBinaryCmd, envBinaryLocation, t)) require.NoError(t, err) diff --git a/pkg/workflows/wasm/runner.go b/pkg/workflows/wasm/runner.go index 22bda15cc7..eb80442fc9 100644 --- a/pkg/workflows/wasm/runner.go +++ b/pkg/workflows/wasm/runner.go @@ -18,6 +18,7 @@ const ( CodeInvalidResponse = 110 CodeInvalidRequest = 111 CodeRunnerErr = 112 + CodeHostErr = 113 CodeSuccess = 0 ) diff --git a/pkg/workflows/wasm/runner_wasip1.go b/pkg/workflows/wasm/runner_wasip1.go index d3ed6fa3c8..905af69aa0 100644 --- a/pkg/workflows/wasm/runner_wasip1.go +++ b/pkg/workflows/wasm/runner_wasip1.go @@ -10,7 +10,7 @@ import ( ) //go:wasmimport env sendResponse -func sendResponse(respptr unsafe.Pointer, respptrlen int32) +func sendResponse(respptr unsafe.Pointer, respptrlen int32) (errno int32) func bufferToPointerLen(buf []byte) (unsafe.Pointer, int32) { return unsafe.Pointer(&buf[0]), int32(len(buf)) @@ -34,7 +34,10 @@ func NewRunner() *Runner { } ptr, ptrlen := bufferToPointerLen(pb) - sendResponse(ptr, ptrlen) + errno := sendResponse(ptr, ptrlen) + if errno != 0 { + os.Exit(CodeHostErr) + } code := CodeSuccess if response.ErrMsg != "" {