From 39b6eeae729b5069eee8ab7808631b2808022eea Mon Sep 17 00:00:00 2001 From: "mike.art" Date: Wed, 20 Nov 2024 11:41:31 +0100 Subject: [PATCH] Implement Queues Consumer --- _examples/queues/README.md | 6 + _examples/queues/main.go | 14 + _examples/queues/wrangler.toml | 7 + cloudflare/queues/consumer.go | 77 +++++ cloudflare/queues/consumermessage.go | 98 ++++++ cloudflare/queues/consumermessage_test.go | 321 ++++++++++++++++++ cloudflare/queues/consumermessagebatch.go | 60 ++++ .../queues/consumermessagebatch_test.go | 124 +++++++ cloudflare/queues/retryoptions.go | 35 ++ cmd/workers-assets-gen/assets/common/shim.mjs | 8 +- .../assets/common/worker.mjs | 2 +- internal/jsutil/jsutil.go | 1 + 12 files changed, 751 insertions(+), 2 deletions(-) create mode 100644 cloudflare/queues/consumer.go create mode 100644 cloudflare/queues/consumermessage.go create mode 100644 cloudflare/queues/consumermessage_test.go create mode 100644 cloudflare/queues/consumermessagebatch.go create mode 100644 cloudflare/queues/consumermessagebatch_test.go create mode 100644 cloudflare/queues/retryoptions.go diff --git a/_examples/queues/README.md b/_examples/queues/README.md index cfb6ef9..707ca64 100644 --- a/_examples/queues/README.md +++ b/_examples/queues/README.md @@ -21,6 +21,12 @@ make deploy # deploy worker ### Interacting with the local queue + +NOTE: Wrangler does not support running multiple workers interacting with the same _local_ queue. Therefore, for the demostrational purposes, +we use the same worker to both produce and consume messages from the queue. For a real-world scenario, please consider the differences +between [queues.Consume](https://github.com/syumai/workers/blob/main/cloudflare/queues/consumer.go#L65) and +(queues.ConsumeNonBlocking)(https://github.com/syumai/workers/blob/main/cloudflare/queues/consumer.go#L75) functions. + 1. Start the dev server. ```sh make dev diff --git a/_examples/queues/main.go b/_examples/queues/main.go index 12cd84d..6c6a911 100644 --- a/_examples/queues/main.go +++ b/_examples/queues/main.go @@ -20,6 +20,11 @@ func handleErr(w http.ResponseWriter, msg string, err error) { } func main() { + // start Qeueue consumer. + // If we would not have an HTTP handler in this worker, we would use queues.Consume instead + queues.ConsumeNonBlocking(consumeBatch) + + // start HTTP server http.HandleFunc("/", handleProduce) workers.Serve(nil) } @@ -99,3 +104,12 @@ func produceBytes(q *queues.Producer, req *http.Request) error { } return nil } + +func consumeBatch(batch *queues.ConsumerMessageBatch) error { + for _, msg := range batch.Messages { + log.Printf("Received message: %v\n", msg.Body.Get("name").String()) + } + + batch.AckAll() + return nil +} diff --git a/_examples/queues/wrangler.toml b/_examples/queues/wrangler.toml index 0965832..a694215 100644 --- a/_examples/queues/wrangler.toml +++ b/_examples/queues/wrangler.toml @@ -9,5 +9,12 @@ compatibility_flags = [ queue = "my-queue" binding = "QUEUE" +[[queues.consumers]] +queue = "my-queue" +max_batch_size = 1 +max_batch_timeout = 30 +max_retries = 10 +dead_letter_queue = "my-queue-dlq" + [build] command = "make build" diff --git a/cloudflare/queues/consumer.go b/cloudflare/queues/consumer.go new file mode 100644 index 0000000..c4f332a --- /dev/null +++ b/cloudflare/queues/consumer.go @@ -0,0 +1,77 @@ +package queues + +import ( + "fmt" + "syscall/js" + + "github.com/syumai/workers/internal/jsutil" +) + +// Consumer is a function that received a batch of messages from Cloudflare Queues. +// The function should be set using Consume or ConsumeNonBlocking. +// A returned error will cause the batch to be retried (unless the batch or individual messages are acked). +// NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message +// acknowledgment until the task is completed witout blocking the queue consumption. +type Consumer func(batch *ConsumerMessageBatch) error + +var consumer Consumer + +func init() { + handleBatchCallback := js.FuncOf(func(this js.Value, args []js.Value) any { + batch := args[0] + var cb js.Func + cb = js.FuncOf(func(_ js.Value, pArgs []js.Value) any { + defer cb.Release() + resolve := pArgs[0] + reject := pArgs[1] + go func() { + if len(args) > 1 { + reject.Invoke(jsutil.Errorf("too many args given to handleQueueMessageBatch: %d", len(args))) + return + } + err := consumeBatch(batch) + if err != nil { + reject.Invoke(jsutil.Error(err.Error())) + return + } + resolve.Invoke(js.Undefined()) + }() + return js.Undefined() + }) + return jsutil.NewPromise(cb) + }) + jsutil.Binding.Set("handleQueueMessageBatch", handleBatchCallback) +} + +func consumeBatch(batch js.Value) error { + b, err := newConsumerMessageBatch(batch) + if err != nil { + return fmt.Errorf("failed to parse message batch: %v", err) + } + + if err := consumer(b); err != nil { + return err + } + return nil +} + +//go:wasmimport workers ready +func ready() + +// Consume sets the Consumer function to receive batches of messages from Cloudflare Queues +// NOTE: This function will block the current goroutine and is intented to be used as long as the +// only worker's purpose is to be the consumer of a Cloudflare Queue. +// In case the worker has other purposes (e.g. handling HTTP requests), use ConsumeNonBlocking instead. +func Consume(f Consumer) { + consumer = f + ready() + select {} +} + +// ConsumeNonBlocking sets the Consumer function to receive batches of messages from Cloudflare Queues. +// This function is intented to be used when the worker has other purposes (e.g. handling HTTP requests). +// The worker will not block receiving messages and will continue to execute other tasks. +// ConsumeNonBlocking should be called before setting other blocking handlers (e.g. workers.Serve). +func ConsumeNonBlocking(f Consumer) { + consumer = f +} diff --git a/cloudflare/queues/consumermessage.go b/cloudflare/queues/consumermessage.go new file mode 100644 index 0000000..e1f2bec --- /dev/null +++ b/cloudflare/queues/consumermessage.go @@ -0,0 +1,98 @@ +package queues + +import ( + "fmt" + "syscall/js" + "time" + + "github.com/syumai/workers/internal/jsutil" +) + +// ConsumerMessage represents a message of the batch received by the consumer. +// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message +type ConsumerMessage struct { + // instance - The underlying instance of the JS message object passed by the cloudflare + instance js.Value + + // Id - The unique Cloudflare-generated identifier of the message + Id string + // Timestamp - The time when the message was enqueued + Timestamp time.Time + // Body - The message body. Could be accessed directly or using converting helpers as StringBody, BytesBody, IntBody, FloatBody. + Body js.Value + // Attempts - The number of times the message delivery has been retried. + Attempts int +} + +func newConsumerMessage(obj js.Value) (*ConsumerMessage, error) { + timestamp, err := jsutil.DateToTime(obj.Get("timestamp")) + if err != nil { + return nil, fmt.Errorf("failed to parse message timestamp: %v", err) + } + + return &ConsumerMessage{ + instance: obj, + Id: obj.Get("id").String(), + Body: obj.Get("body"), + Attempts: obj.Get("attempts").Int(), + Timestamp: timestamp, + }, nil +} + +// Ack acknowledges the message as successfully delivered despite the result returned from the consuming function. +// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message +func (m *ConsumerMessage) Ack() { + m.instance.Call("ack") +} + +// Retry marks the message to be re-delivered. +// The message will be retried after the optional delay configured with RetryOption. +func (m *ConsumerMessage) Retry(opts ...RetryOption) { + var o *retryOptions + if len(opts) > 0 { + o = &retryOptions{} + for _, opt := range opts { + opt(o) + } + } + + m.instance.Call("retry", o.toJS()) +} + +func (m *ConsumerMessage) StringBody() (string, error) { + if m.Body.Type() != js.TypeString { + return "", fmt.Errorf("message body is not a string: %v", m.Body) + } + return m.Body.String(), nil +} + +func (m *ConsumerMessage) BytesBody() ([]byte, error) { + switch m.Body.Type() { + case js.TypeString: + return []byte(m.Body.String()), nil + case js.TypeObject: + if m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass) { + b := make([]byte, m.Body.Get("byteLength").Int()) + js.CopyBytesToGo(b, m.Body) + return b, nil + } + } + + return nil, fmt.Errorf("message body is not a byte array: %v", m.Body) +} + +func (m *ConsumerMessage) IntBody() (int, error) { + if m.Body.Type() == js.TypeNumber { + return m.Body.Int(), nil + } + + return 0, fmt.Errorf("message body is not a number: %v", m.Body) +} + +func (m *ConsumerMessage) FloatBody() (float64, error) { + if m.Body.Type() == js.TypeNumber { + return m.Body.Float(), nil + } + + return 0, fmt.Errorf("message body is not a number: %v", m.Body) +} diff --git a/cloudflare/queues/consumermessage_test.go b/cloudflare/queues/consumermessage_test.go new file mode 100644 index 0000000..c285ee2 --- /dev/null +++ b/cloudflare/queues/consumermessage_test.go @@ -0,0 +1,321 @@ +package queues + +import ( + "bytes" + "syscall/js" + "testing" + "time" + + "github.com/syumai/workers/internal/jsutil" +) + +func TestNewConsumerMessage(t *testing.T) { + ts := time.Now() + jsTs := jsutil.TimeToDate(ts) + id := "some-message-id" + m := map[string]any{ + "body": "hello", + "timestamp": jsTs, + "id": id, + "attempts": 1, + } + + got, err := newConsumerMessage(js.ValueOf(m)) + if err != nil { + t.Fatalf("newConsumerMessage failed: %v", err) + } + + if body := got.Body.String(); body != "hello" { + t.Fatalf("Body() = %v, want %v", body, "hello") + } + + if got.Id != id { + t.Fatalf("Id = %v, want %v", got.Id, id) + } + + if got.Attempts != 1 { + t.Fatalf("Attempts = %v, want %v", got.Attempts, 1) + } + + if got.Timestamp.UnixMilli() != ts.UnixMilli() { + t.Fatalf("Timestamp = %v, want %v", got.Timestamp, ts) + } +} + +func TestConsumerMessage_Ack(t *testing.T) { + ackCalled := false + jsObj := jsutil.NewObject() + jsObj.Set("ack", js.FuncOf(func(this js.Value, args []js.Value) interface{} { + ackCalled = true + return nil + })) + m := &ConsumerMessage{ + instance: jsObj, + } + + m.Ack() + + if !ackCalled { + t.Fatalf("Ack() did not call ack") + } +} + +func TestConsumerMessage_Retry(t *testing.T) { + retryCalled := false + jsObj := jsutil.NewObject() + jsObj.Set("retry", js.FuncOf(func(this js.Value, args []js.Value) interface{} { + retryCalled = true + return nil + })) + m := &ConsumerMessage{ + instance: jsObj, + } + + m.Retry() + + if !retryCalled { + t.Fatalf("Retry() did not call retry") + } +} + +func TestConsumerMessage_RetryWithDelay(t *testing.T) { + retryCalled := false + jsObj := jsutil.NewObject() + jsObj.Set("retry", js.FuncOf(func(this js.Value, args []js.Value) interface{} { + retryCalled = true + if len(args) != 1 { + t.Fatalf("retry() called with %d arguments, want 1", len(args)) + } + + opts := args[0] + if opts.Type() != js.TypeObject { + t.Fatalf("retry() called with argument of type %v, want object", opts.Type()) + } + + if delay := opts.Get("delaySeconds").Int(); delay != 10 { + t.Fatalf("delaySeconds = %v, want %v", delay, 10) + } + + return nil + })) + + m := &ConsumerMessage{ + instance: jsObj, + } + + m.Retry(WithRetryDelay(10 * time.Second)) + + if !retryCalled { + t.Fatalf("RetryAll() did not call retryAll") + } +} + +func TestNewConsumerMessage_StringBody(t *testing.T) { + tests := []struct { + name string + body func() js.Value + want string + wantErr bool + }{ + { + name: "string", + body: func() js.Value { + return js.ValueOf("hello") + }, + want: "hello", + }, + { + name: "uint8 array", + body: func() js.Value { + v := jsutil.Uint8ArrayClass.New(3) + js.CopyBytesToJS(v, []byte("foo")) + return v + }, + wantErr: true, + }, + { + name: "int", + body: func() js.Value { + return js.ValueOf(42) + }, + wantErr: true, + }, + { + name: "undefined", + body: func() js.Value { + return js.Undefined() + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &ConsumerMessage{ + Body: tt.body(), + } + + got, err := m.StringBody() + if (err != nil) != tt.wantErr { + t.Fatalf("StringBody() error = %v, wantErr %v", err, tt.wantErr) + } + + if got != tt.want { + t.Fatalf("StringBody() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestConsumerMessage_BytesBody(t *testing.T) { + tests := []struct { + name string + body func() js.Value + want []byte + wantErr bool + }{ + { + name: "string", + body: func() js.Value { + return js.ValueOf("hello") + }, + want: []byte("hello"), + }, + { + name: "uint8 array", + body: func() js.Value { + v := jsutil.Uint8ArrayClass.New(3) + js.CopyBytesToJS(v, []byte("foo")) + return v + }, + want: []byte("foo"), + }, + { + name: "uint8 clamped array", + body: func() js.Value { + v := jsutil.Uint8ClampedArrayClass.New(3) + js.CopyBytesToJS(v, []byte("bar")) + return v + }, + want: []byte("bar"), + }, + { + name: "incorrect type", + body: func() js.Value { + return js.ValueOf(42) + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &ConsumerMessage{ + Body: tt.body(), + } + + got, err := m.BytesBody() + if (err != nil) != tt.wantErr { + t.Fatalf("BytesBody() error = %v, wantErr %v", err, tt.wantErr) + } + + if !bytes.Equal(got, tt.want) { + t.Fatalf("BytesBody() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestConsumerMessage_IntBody(t *testing.T) { + tests := []struct { + name string + body js.Value + want int + wantErr bool + }{ + { + name: "int", + body: js.ValueOf(42), + want: 42, + }, + { + name: "float", + body: js.ValueOf(42.5), + want: 42, + }, + { + name: "string", + body: js.ValueOf("42"), + wantErr: true, + }, + { + name: "undefined", + body: js.Undefined(), + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &ConsumerMessage{ + Body: tt.body, + } + + got, err := m.IntBody() + if (err != nil) != tt.wantErr { + t.Fatalf("IntBody() error = %v, wantErr %v", err, tt.wantErr) + } + + if got != tt.want { + t.Fatalf("IntBody() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestConsumerMessage_FloatBody(t *testing.T) { + tests := []struct { + name string + body js.Value + want float64 + wantErr bool + }{ + { + name: "int", + body: js.ValueOf(42), + want: 42.0, + }, + { + name: "float", + body: js.ValueOf(42.5), + want: 42.5, + }, + { + name: "string", + body: js.ValueOf("42"), + wantErr: true, + }, + { + name: "undefined", + body: js.Undefined(), + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &ConsumerMessage{ + Body: tt.body, + } + + got, err := m.FloatBody() + if (err != nil) != tt.wantErr { + t.Fatalf("FloatBody() error = %v, wantErr %v", err, tt.wantErr) + } + + if got != tt.want { + t.Fatalf("FloatBody() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/cloudflare/queues/consumermessagebatch.go b/cloudflare/queues/consumermessagebatch.go new file mode 100644 index 0000000..976b0bf --- /dev/null +++ b/cloudflare/queues/consumermessagebatch.go @@ -0,0 +1,60 @@ +package queues + +import ( + "fmt" + "syscall/js" +) + +// ConsumerMessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the +// worker configuration. +// - https://developers.cloudflare.com/queues/configuration/configure-queues/#consumer +// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch +type ConsumerMessageBatch struct { + // instance - The underlying instance of the JS message object passed by the cloudflare + instance js.Value + + // Queue - The name of the queue from which the messages were received + Queue string + + // Messages - The messages in the batch + Messages []*ConsumerMessage +} + +func newConsumerMessageBatch(obj js.Value) (*ConsumerMessageBatch, error) { + msgArr := obj.Get("messages") + messages := make([]*ConsumerMessage, msgArr.Length()) + for i := 0; i < msgArr.Length(); i++ { + m, err := newConsumerMessage(msgArr.Index(i)) + if err != nil { + return nil, fmt.Errorf("failed to parse message %d: %v", i, err) + } + messages[i] = m + } + + return &ConsumerMessageBatch{ + instance: obj, + Queue: obj.Get("queue").String(), + Messages: messages, + }, nil +} + +// AckAll acknowledges all messages in the batch as successfully delivered despite the result returned from the consuming function. +// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch +func (b *ConsumerMessageBatch) AckAll() { + b.instance.Call("ackAll") +} + +// RetryAll marks all messages in the batch to be re-delivered. +// The messages will be retried after the optional delay configured with RetryOption. +// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch +func (b *ConsumerMessageBatch) RetryAll(opts ...RetryOption) { + var o *retryOptions + if len(opts) > 0 { + o = &retryOptions{} + for _, opt := range opts { + opt(o) + } + } + + b.instance.Call("retryAll", o.toJS()) +} diff --git a/cloudflare/queues/consumermessagebatch_test.go b/cloudflare/queues/consumermessagebatch_test.go new file mode 100644 index 0000000..ec76dc6 --- /dev/null +++ b/cloudflare/queues/consumermessagebatch_test.go @@ -0,0 +1,124 @@ +package queues + +import ( + "syscall/js" + "testing" + "time" + + "github.com/syumai/workers/internal/jsutil" +) + +func TestNewConsumerMessageBatch(t *testing.T) { + ts := time.Now() + jsTs := jsutil.TimeToDate(ts) + id := "some-message-id" + m := map[string]any{ + "queue": "some-queue", + "messages": []any{ + map[string]any{ + "body": "hello", + "timestamp": jsTs, + "id": id, + "attempts": 1, + }, + }, + } + + got, err := newConsumerMessageBatch(js.ValueOf(m)) + if err != nil { + t.Fatalf("newConsumerMessageBatch failed: %v", err) + } + + if got.Queue != "some-queue" { + t.Fatalf("Queue = %v, want %v", got.Queue, "some-queue") + } + + if len(got.Messages) != 1 { + t.Fatalf("Messages = %v, want %v", len(got.Messages), 1) + } + + msg := got.Messages[0] + if body := msg.Body.String(); body != "hello" { + t.Fatalf("Body() = %v, want %v", body, "hello") + } + + if msg.Id != id { + t.Fatalf("Id = %v, want %v", msg.Id, id) + } + + if msg.Attempts != 1 { + t.Fatalf("Attempts = %v, want %v", msg.Attempts, 1) + } + + if msg.Timestamp.UnixMilli() != ts.UnixMilli() { + t.Fatalf("Timestamp = %v, want %v", msg.Timestamp, ts) + } +} + +func TestConsumerMessageBatch_AckAll(t *testing.T) { + ackAllCalled := false + jsObj := jsutil.NewObject() + jsObj.Set("ackAll", js.FuncOf(func(this js.Value, args []js.Value) interface{} { + ackAllCalled = true + return nil + })) + b := &ConsumerMessageBatch{ + instance: jsObj, + } + + b.AckAll() + + if !ackAllCalled { + t.Fatalf("AckAll() did not call ackAll") + } +} + +func TestConsumerMessageBatch_RetryAll(t *testing.T) { + retryAllCalled := false + jsObj := jsutil.NewObject() + jsObj.Set("retryAll", js.FuncOf(func(this js.Value, args []js.Value) interface{} { + retryAllCalled = true + return nil + })) + b := &ConsumerMessageBatch{ + instance: jsObj, + } + + b.RetryAll() + + if !retryAllCalled { + t.Fatalf("RetryAll() did not call retryAll") + } +} + +func TestConsumerMessageBatch_RetryAllWithRetryOption(t *testing.T) { + retryAllCalled := false + jsObj := jsutil.NewObject() + jsObj.Set("retryAll", js.FuncOf(func(this js.Value, args []js.Value) interface{} { + retryAllCalled = true + if len(args) != 1 { + t.Fatalf("retryAll() called with %d arguments, want 1", len(args)) + } + + opts := args[0] + if opts.Type() != js.TypeObject { + t.Fatalf("retryAll() called with argument of type %v, want object", opts.Type()) + } + + if delay := opts.Get("delaySeconds").Int(); delay != 10 { + t.Fatalf("delaySeconds = %v, want %v", delay, 10) + } + + return nil + })) + + b := &ConsumerMessageBatch{ + instance: jsObj, + } + + b.RetryAll(WithRetryDelay(10 * time.Second)) + + if !retryAllCalled { + t.Fatalf("RetryAll() did not call retryAll") + } +} diff --git a/cloudflare/queues/retryoptions.go b/cloudflare/queues/retryoptions.go new file mode 100644 index 0000000..2e12103 --- /dev/null +++ b/cloudflare/queues/retryoptions.go @@ -0,0 +1,35 @@ +package queues + +import ( + "syscall/js" + "time" + + "github.com/syumai/workers/internal/jsutil" +) + +type retryOptions struct { + delaySeconds int +} + +func (o *retryOptions) toJS() js.Value { + if o == nil { + return js.Undefined() + } + + obj := jsutil.NewObject() + if o.delaySeconds != 0 { + obj.Set("delaySeconds", o.delaySeconds) + } + + return obj +} + +type RetryOption func(*retryOptions) + +// WithRetryDelay sets the delay in seconds before the messages delivery is retried. +// Note that the delay should not be less than a second and is not more precise than a second. +func WithRetryDelay(d time.Duration) RetryOption { + return func(o *retryOptions) { + o.delaySeconds = int(d.Seconds()) + } +} diff --git a/cmd/workers-assets-gen/assets/common/shim.mjs b/cmd/workers-assets-gen/assets/common/shim.mjs index f632a07..2d93a85 100644 --- a/cmd/workers-assets-gen/assets/common/shim.mjs +++ b/cmd/workers-assets-gen/assets/common/shim.mjs @@ -63,4 +63,10 @@ export async function onRequest(ctx) { const { request, env } = ctx; await run(createRuntimeContext(env, ctx, binding)); return binding.handleRequest(request); -} \ No newline at end of file +} + +export async function queue(batch, env, ctx) { + const binding = {}; + await run(createRuntimeContext(env, ctx, binding)); + return binding.handleQueueMessageBatch(batch); +} diff --git a/cmd/workers-assets-gen/assets/common/worker.mjs b/cmd/workers-assets-gen/assets/common/worker.mjs index 3472c98..fe3b170 100644 --- a/cmd/workers-assets-gen/assets/common/worker.mjs +++ b/cmd/workers-assets-gen/assets/common/worker.mjs @@ -3,4 +3,4 @@ import mod from "./app.wasm"; imports.init(mod); -export default { fetch: imports.fetch, scheduled: imports.scheduled } +export default { fetch: imports.fetch, scheduled: imports.scheduled, queue: imports.queue }; diff --git a/internal/jsutil/jsutil.go b/internal/jsutil/jsutil.go index 56ad95a..e5f169b 100644 --- a/internal/jsutil/jsutil.go +++ b/internal/jsutil/jsutil.go @@ -16,6 +16,7 @@ var ( HeadersClass = js.Global().Get("Headers") ArrayClass = js.Global().Get("Array") Uint8ArrayClass = js.Global().Get("Uint8Array") + Uint8ClampedArrayClass = js.Global().Get("Uint8ClampedArray") ErrorClass = js.Global().Get("Error") ReadableStreamClass = js.Global().Get("ReadableStream") FixedLengthStreamClass = js.Global().Get("FixedLengthStream")