diff --git a/js/modules/k6/experimental/streams/module.go b/js/modules/k6/experimental/streams/module.go index caf52341fb6..b6ab17a4253 100644 --- a/js/modules/k6/experimental/streams/module.go +++ b/js/modules/k6/experimental/streams/module.go @@ -2,6 +2,9 @@ package streams import ( + "errors" + "io" + "github.com/grafana/sobek" "go.k6.io/k6/js/common" "go.k6.io/k6/js/modules" @@ -46,13 +49,17 @@ func (mi *ModuleInstance) Exports() modules.Exports { // NewReadableStream is the constructor for the ReadableStream object. func (mi *ModuleInstance) NewReadableStream(call sobek.ConstructorCall) *sobek.Object { - rt := mi.vu.Runtime() - var err error - - // 1. If underlyingSource is missing, set it to null. - var underlyingSource *sobek.Object + return newReadableStream(mi.vu, call) +} +func newReadableStream(vu modules.VU, call sobek.ConstructorCall) *sobek.Object { var ( + // 1. If underlyingSource is missing, set it to null. + underlyingSource *sobek.Object + + rt = vu.Runtime() + + err error strategy *sobek.Object underlyingSourceDict UnderlyingSource ) @@ -60,7 +67,7 @@ func (mi *ModuleInstance) NewReadableStream(call sobek.ConstructorCall) *sobek.O // We look for the queuing strategy first, and validate it before // the underlying source, in order to pass the Web Platform Tests // constructor tests. - strategy = mi.initializeStrategy(call) + strategy = initializeStrategy(rt, call) // 2. Let underlyingSourceDict be underlyingSource, converted to an IDL value of type UnderlyingSource. if len(call.Arguments) > 0 && !sobek.IsUndefined(call.Arguments[0]) { @@ -79,8 +86,8 @@ func (mi *ModuleInstance) NewReadableStream(call sobek.ConstructorCall) *sobek.O // 3. Perform ! InitializeReadableStream(this). stream := &ReadableStream{ - runtime: mi.vu.Runtime(), - vu: mi.vu, + runtime: rt, + vu: vu, } stream.initialize() @@ -129,27 +136,25 @@ func (mi *ModuleInstance) NewReadableStream(call sobek.ConstructorCall) *sobek.O } func defaultSizeFunc(_ sobek.Value) (float64, error) { return 1.0, nil } -func (mi *ModuleInstance) initializeStrategy(call sobek.ConstructorCall) *sobek.Object { - runtime := mi.vu.Runtime() - +func initializeStrategy(rt *sobek.Runtime, call sobek.ConstructorCall) *sobek.Object { // Either if the strategy is not provided or if it doesn't have a 'highWaterMark', // we need to set its default value (highWaterMark=1). // https://streams.spec.whatwg.org/#rs-prototype - strArg := runtime.NewObject() + strArg := rt.NewObject() if len(call.Arguments) > 1 && !common.IsNullish(call.Arguments[1]) { - strArg = call.Arguments[1].ToObject(runtime) + strArg = call.Arguments[1].ToObject(rt) } if common.IsNullish(strArg.Get("highWaterMark")) { - if err := strArg.Set("highWaterMark", runtime.ToValue(1)); err != nil { - common.Throw(runtime, newError(RuntimeError, err.Error())) + if err := strArg.Set("highWaterMark", rt.ToValue(1)); err != nil { + common.Throw(rt, newError(RuntimeError, err.Error())) } } // If the stream type is 'bytes', we don't want the size function. // Except, when it is manually specified. - size := runtime.ToValue(defaultSizeFunc) + size := rt.ToValue(defaultSizeFunc) if len(call.Arguments) > 0 && !common.IsNullish(call.Arguments[0]) { - srcArg := call.Arguments[0].ToObject(runtime) + srcArg := call.Arguments[0].ToObject(rt) srcTypeArg := srcArg.Get("type") if !common.IsNullish(srcTypeArg) && srcTypeArg.String() == ReadableStreamTypeBytes { size = nil @@ -160,7 +165,7 @@ func (mi *ModuleInstance) initializeStrategy(call sobek.ConstructorCall) *sobek. } strCall := sobek.ConstructorCall{Arguments: []sobek.Value{strArg}} - return mi.newCountQueuingStrategy(runtime, strCall, size) + return newCountQueuingStrategy(rt, strCall, size) } // NewCountQueuingStrategy is the constructor for the [CountQueuingStrategy] object. @@ -170,14 +175,14 @@ func (mi *ModuleInstance) NewCountQueuingStrategy(call sobek.ConstructorCall) *s rt := mi.vu.Runtime() // By default, the CountQueuingStrategy has a pre-defined 'size' property. // It cannot be overwritten by the user. - return mi.newCountQueuingStrategy(rt, call, rt.ToValue(defaultSizeFunc)) + return newCountQueuingStrategy(rt, call, rt.ToValue(defaultSizeFunc)) } // newCountQueuingStrategy is the underlying constructor for the [CountQueuingStrategy] object. // // It allows to create a CountQueuingStrategy with or without the 'size' property, // depending on how the containing ReadableStream is initialized. -func (mi *ModuleInstance) newCountQueuingStrategy( +func newCountQueuingStrategy( rt *sobek.Runtime, call sobek.ConstructorCall, size sobek.Value, @@ -285,3 +290,49 @@ func (mi *ModuleInstance) NewReadableStreamDefaultReader(call sobek.ConstructorC return object } + +// NewReadableStreamFromReader is the equivalent of [NewReadableStreamDefaultReader] but to initialize +// a new [ReadableStream] from a given [io.Reader] in Go code. +// It is useful for those situations when a [io.Reader] needs to be surfaced up to the JS runtime. +func NewReadableStreamFromReader(vu modules.VU, reader io.Reader) *sobek.Object { + rt := vu.Runtime() + return newReadableStream(vu, sobek.ConstructorCall{ + Arguments: []sobek.Value{rt.ToValue(underlyingSourceForReader(vu, reader))}, + This: rt.NewObject(), + }) +} + +func underlyingSourceForReader(vu modules.VU, reader io.Reader) *sobek.Object { + rt := vu.Runtime() + + underlyingSource := vu.Runtime().NewObject() + if err := underlyingSource.Set("pull", rt.ToValue(func(controller *sobek.Object) *sobek.Promise { + // Prepare methods + cClose, _ := sobek.AssertFunction(controller.Get("close")) + cEnqueue, _ := sobek.AssertFunction(controller.Get("enqueue")) + + buf := make([]byte, 1024) + n, err := reader.Read(buf) + if err != nil && !errors.Is(err, io.EOF) { + panic(err) + } + + _, enqueueErr := cEnqueue(nil, rt.ToValue(string(buf[:n]))) + if enqueueErr != nil { + panic(enqueueErr) + } + + if err == io.EOF { + _, closeErr := cClose(nil) + if closeErr != nil { + panic(closeErr) + } + } + + return newResolvedPromise(vu, sobek.Undefined()) + })); err != nil { + throw(rt, err) + } + + return underlyingSource +} diff --git a/js/modules/k6/experimental/streams/module_test.go b/js/modules/k6/experimental/streams/module_test.go new file mode 100644 index 00000000000..cfe859fd85f --- /dev/null +++ b/js/modules/k6/experimental/streams/module_test.go @@ -0,0 +1,41 @@ +package streams + +import ( + "bytes" + "testing" + + "github.com/grafana/sobek" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/js/modulestest" +) + +func TestNewReadableStreamForReader(t *testing.T) { + t.Parallel() + + // The value to be streamed. + exp := "Hello, World!" + + // We initialize the runtime, with the ReadableStream(rs) accessible in JS. + r := modulestest.NewRuntime(t) + rs := NewReadableStreamFromReader(r.VU, bytes.NewReader([]byte(exp))) + require.NoError(t, r.VU.Runtime().Set("rs", rs)) + + // Then, we run some JS code that reads from the ReadableStream(rs). + var ret sobek.Value + err := r.EventLoop.Start(func() (err error) { + ret, err = r.VU.Runtime().RunString(`(async () => { + const reader = rs.getReader(); + const {value} = await reader.read(); + return value; +})()`) + return err + }) + assert.NoError(t, err) + + // Finally, we expect the returned promise to resolve + // to the expected value (the one we streamed). + p, ok := ret.Export().(*sobek.Promise) + require.True(t, ok) + assert.Equal(t, exp, p.Result().String()) +}