Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAPPL-58] Correctly stub out clock_time_get and poll_oneoff #778

Merged
merged 4 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.0
toolchain go1.22.7

require (
github.com/andybalholm/brotli v1.1.0
github.com/atombender/go-jsonschema v0.16.1-0.20240916205339-a74cd4e2851c
github.com/bytecodealliance/wasmtime-go/v23 v23.0.0
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migc
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I=
github.com/Microsoft/hcsshim v0.9.4/go.mod h1:7pLA8lDk46WKDWlVsENo92gC0XFa8rbKfyFRBqxEbCc=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/atombender/go-jsonschema v0.16.1-0.20240916205339-a74cd4e2851c h1:cxQVoh6kY+c4b0HUchHjGWBI8288VhH50qxKG3hdEg0=
github.com/atombender/go-jsonschema v0.16.1-0.20240916205339-a74cd4e2851c/go.mod h1:3XzxudkrYVUvbduN/uI2fl4lSrMSzU0+3RCu2mpnfx8=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
Expand Down
165 changes: 165 additions & 0 deletions pkg/workflows/wasm/host/errno.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// NOTE: loosely based on: https://github.com/tetratelabs/wazero/blob/1353ca24fef0a57a3a342d75f20357a6e9d3be35/internal/wasip1/errno.go#L14
package host

type Errno = int32

// Note: Below prefers POSIX symbol names over WASI ones, even if the docs are from WASI.
// See https://linux.die.net/man/3/errno
// See https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#variants-1
const (
// ErrnoSuccess No error occurred. System call completed successfully.
ErrnoSuccess Errno = iota
// Errno2big Argument list too long.
Errno2big
// ErrnoAcces Permission denied.
ErrnoAcces
// ErrnoAddrinuse Address in use.
ErrnoAddrinuse
// ErrnoAddrnotavail Address not available.
ErrnoAddrnotavail
// ErrnoAfnosupport Address family not supported.
ErrnoAfnosupport
// ErrnoAgain Resource unavailable, or operation would block.
ErrnoAgain
// ErrnoAlready Connection already in progress.
ErrnoAlready
// ErrnoBadf Bad file descriptor.
ErrnoBadf
// ErrnoBadmsg Bad message.
ErrnoBadmsg
// ErrnoBusy Device or resource busy.
ErrnoBusy
// ErrnoCanceled Operation canceled.
ErrnoCanceled
// ErrnoChild No child processes.
ErrnoChild
// ErrnoConnaborted Connection aborted.
ErrnoConnaborted
// ErrnoConnrefused Connection refused.
ErrnoConnrefused
// ErrnoConnreset Connection reset.
ErrnoConnreset
// ErrnoDeadlk Resource deadlock would occur.
ErrnoDeadlk
// ErrnoDestaddrreq Destination address required.
ErrnoDestaddrreq
// ErrnoDom Mathematics argument out of domain of function.
ErrnoDom
// ErrnoDquot Reserved.
ErrnoDquot
// ErrnoExist File exists.
ErrnoExist
// ErrnoFault Bad address.
ErrnoFault
// ErrnoFbig File too large.
ErrnoFbig
// ErrnoHostunreach Host is unreachable.
ErrnoHostunreach
// ErrnoIdrm Identifier removed.
ErrnoIdrm
// ErrnoIlseq Illegal byte sequence.
ErrnoIlseq
// ErrnoInprogress Operation in progress.
ErrnoInprogress
// ErrnoIntr Interrupted function.
ErrnoIntr
// ErrnoInval Invalid argument.
ErrnoInval
// ErrnoIo I/O error.
ErrnoIo
// ErrnoIsconn Socket is connected.
ErrnoIsconn
// ErrnoIsdir Is a directory.
ErrnoIsdir
// ErrnoLoop Too many levels of symbolic links.
ErrnoLoop
// ErrnoMfile File descriptor value too large.
ErrnoMfile
// ErrnoMlink Too many links.
ErrnoMlink
// ErrnoMsgsize Message too large.
ErrnoMsgsize
// ErrnoMultihop Reserved.
ErrnoMultihop
// ErrnoNametoolong Filename too long.
ErrnoNametoolong
// ErrnoNetdown Network is down.
ErrnoNetdown
// ErrnoNetreset Connection aborted by network.
ErrnoNetreset
// ErrnoNetunreach Network unreachable.
ErrnoNetunreach
// ErrnoNfile Too many files open in system.
ErrnoNfile
// ErrnoNobufs No buffer space available.
ErrnoNobufs
// ErrnoNodev No such device.
ErrnoNodev
// ErrnoNoent No such file or directory.
ErrnoNoent
// ErrnoNoexec Executable file format error.
ErrnoNoexec
// ErrnoNolck No locks available.
ErrnoNolck
// ErrnoNolink Reserved.
ErrnoNolink
// ErrnoNomem Not enough space.
ErrnoNomem
// ErrnoNomsg No message of the desired type.
ErrnoNomsg
// ErrnoNoprotoopt No message of the desired type.
ErrnoNoprotoopt
// ErrnoNospc No space left on device.
ErrnoNospc
// ErrnoNosys function not supported.
ErrnoNosys
// ErrnoNotconn The socket is not connected.
ErrnoNotconn
// ErrnoNotdir Not a directory or a symbolic link to a directory.
ErrnoNotdir
// ErrnoNotempty Directory not empty.
ErrnoNotempty
// ErrnoNotrecoverable State not recoverable.
ErrnoNotrecoverable
// ErrnoNotsock Not a socket.
ErrnoNotsock
// ErrnoNotsup Not supported, or operation not supported on socket.
ErrnoNotsup
// ErrnoNotty Inappropriate I/O control operation.
ErrnoNotty
// ErrnoNxio No such device or address.
ErrnoNxio
// ErrnoOverflow Value too large to be stored in data type.
ErrnoOverflow
// ErrnoOwnerdead Previous owner died.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👻

ErrnoOwnerdead
// ErrnoPerm Operation not permitted.
ErrnoPerm
// ErrnoPipe Broken pipe.
ErrnoPipe
// ErrnoProto Protocol error.
ErrnoProto
// ErrnoProtonosupport Protocol error.
ErrnoProtonosupport
// ErrnoPrototype Protocol wrong type for socket.
ErrnoPrototype
// ErrnoRange Result too large.
ErrnoRange
// ErrnoRofs Read-only file system.
ErrnoRofs
// ErrnoSpipe Invalid seek.
ErrnoSpipe
// ErrnoSrch No such process.
ErrnoSrch
// ErrnoStale Reserved.
ErrnoStale
// ErrnoTimedout Connection timed out.
ErrnoTimedout
// ErrnoTxtbsy Text file busy.
ErrnoTxtbsy
// ErrnoXdev Cross-device link.
ErrnoXdev

// Note: ErrnoNotcapable was removed by WASI maintainers.
// See https://github.com/WebAssembly/wasi-libc/pull/294
)
99 changes: 54 additions & 45 deletions pkg/workflows/wasm/host/module.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package host

import (
"bytes"
"encoding/base64"
"errors"
"fmt"
"io"
"math"
"strings"
"sync"
"time"
"unsafe"

"github.com/andybalholm/brotli"
"github.com/bytecodealliance/wasmtime-go/v23"
"google.golang.org/protobuf/proto"

Expand All @@ -18,18 +20,28 @@ import (
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/pb"
)

func safeMem(caller *wasmtime.Caller, ptr unsafe.Pointer, size int32) ([]byte, error) {
func safeMem(caller *wasmtime.Caller, ptr int32, size int32) ([]byte, error) {
mem := caller.GetExport("memory").Memory()
data := mem.UnsafeData(caller)
iptr := int32(uintptr(ptr))
if iptr+size > int32(len(data)) {
if ptr+size > int32(len(data)) {
return nil, errors.New("out of bounds memory access")
}

cd := make([]byte, size)
copy(cd, data[iptr:iptr+size])
copy(cd, data[ptr:ptr+size])
return cd, nil
}
func copyBuffer(caller *wasmtime.Caller, src []byte, ptr int32, size int32) int64 {
mem := caller.GetExport("memory").Memory()
rawData := mem.UnsafeData(caller)
if int32(len(rawData)) < ptr+size {
return -1
}
buffer := rawData[ptr : ptr+size]
dataLen := int64(len(src))
copy(buffer, src)
return dataLen
}

type respStore struct {
m map[string]*wasmpb.Response
Expand Down Expand Up @@ -65,15 +77,16 @@ var (
defaultTickInterval = 100 * time.Millisecond
defaultTimeout = 300 * time.Millisecond
defaultMaxMemoryMBs = 64
defaultInitialFuel = uint64(100_000_000)
DefaultInitialFuel = uint64(100_000_000)
)

type ModuleConfig struct {
TickInterval time.Duration
Timeout *time.Duration
MaxMemoryMBs int64
InitialFuel uint64
Logger logger.Logger
TickInterval time.Duration
Timeout *time.Duration
MaxMemoryMBs int64
InitialFuel uint64
Logger logger.Logger
IsUncompressed bool
}

type Module struct {
Expand Down Expand Up @@ -104,10 +117,6 @@ func NewModule(modCfg *ModuleConfig, binary []byte) (*Module, error) {
modCfg.Timeout = &defaultTimeout
}

if modCfg.InitialFuel == 0 {
modCfg.InitialFuel = defaultInitialFuel
}

// Take the max of the default and the configured max memory mbs.
// We do this because Go requires a minimum of 16 megabytes to run,
// and local testing has shown that with less than 64 mbs, some
Expand All @@ -116,65 +125,60 @@ func NewModule(modCfg *ModuleConfig, binary []byte) (*Module, error) {

cfg := wasmtime.NewConfig()
cfg.SetEpochInterruption(true)
cfg.SetConsumeFuel(true)
if modCfg.InitialFuel > 0 {
cfg.SetConsumeFuel(true)
}

engine := wasmtime.NewEngineWithConfig(cfg)

if !modCfg.IsUncompressed {
rdr := brotli.NewReader(bytes.NewBuffer(binary))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how did you choose brotli for the compression/decompression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We did some benchmarking between gzip + brotli (and zopfli) and brotli offered the best mix of speed + compression performance.

decompedBinary, err := io.ReadAll(rdr)
if err != nil {
return nil, fmt.Errorf("failed to decompress binary: %w", err)
}

binary = decompedBinary
}

mod, err := wasmtime.NewModule(engine, binary)
if err != nil {
return nil, fmt.Errorf("error creating wasmtime module: %w", err)
}

linker := wasmtime.NewLinker(engine)
linker.AllowShadowing(true)

err = linker.DefineWasi()
linker, err := newWasiLinker(engine)
if err != nil {
return nil, err
return nil, fmt.Errorf("error creating wasi linker: %w", err)
}

r := &respStore{
m: map[string]*wasmpb.Response{},
}

// TODO: Stub out poll_oneoff correctly -- it's unclear what
// the effect of this naive stub is as this syscall powers
// notifications for time.Sleep, but will also effect other system notifications.
// We need this stub to prevent binaries from calling time.Sleep and
// starving our worker pool as a result.
err = linker.FuncWrap(
"wasi_snapshot_preview1",
"poll_oneoff",
func(caller *wasmtime.Caller, a int32, b int32, c int32, d int32) int32 {
return 0
},
)
if err != nil {
return nil, fmt.Errorf("could not wrap poll_oneoff: %w", err)
}

err = linker.FuncWrap(
"env",
"sendResponse",
func(caller *wasmtime.Caller, ptr int32, ptrlen int32) {
b, innerErr := safeMem(caller, unsafe.Pointer(uintptr(ptr)), ptrlen)
func(caller *wasmtime.Caller, ptr int32, ptrlen int32) int32 {
b, innerErr := safeMem(caller, ptr, ptrlen)
if innerErr != nil {
logger.Errorf("error calling sendResponse: %s", err)
return
return ErrnoFault
}

var resp wasmpb.Response
innerErr = proto.Unmarshal(b, &resp)
if innerErr != nil {
logger.Errorf("error calling sendResponse: %s", err)
return
return ErrnoFault
}

innerErr = r.add(resp.Id, &resp)
if innerErr != nil {
logger.Errorf("error calling sendResponse: %s", err)
return
return ErrnoFault
}

return ErrnoSuccess
},
)
if err != nil {
Expand Down Expand Up @@ -237,9 +241,12 @@ func (m *Module) Run(request *wasmpb.Request) (*wasmpb.Response, error) {
wasi.SetArgv([]string{"wasi", reqstr})

store.SetWasi(wasi)
err = store.SetFuel(m.cfg.InitialFuel)
if err != nil {
return nil, fmt.Errorf("error setting fuel: %w", err)

if m.cfg.InitialFuel > 0 {
err = store.SetFuel(m.cfg.InitialFuel)
if err != nil {
return nil, fmt.Errorf("error setting fuel: %w", err)
}
}

// Limit memory to max memory megabytes per instance.
Expand Down Expand Up @@ -283,6 +290,8 @@ func (m *Module) Run(request *wasmpb.Request) (*wasmpb.Response, error) {
}

return nil, fmt.Errorf("error executing runner: %s: %w", resp.ErrMsg, innerErr)
case containsCode(err, wasm.CodeHostErr):
return nil, fmt.Errorf("invariant violation: host errored during sendResponse")
default:
return nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/workflows/wasm/host/test/sleep/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

package main

import "time"
import (
"fmt"
"time"
)

func main() {
time.Sleep(10 * time.Second)
time.Sleep(1 * time.Hour)
fmt.Printf("hello")
}
Loading
Loading