Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Queues Consumer #137

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions _examples/queues/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ make deploy # deploy worker

### Interacting with the local queue


NOTE: Wrangler does not support running multiple workers interacting with the same _local_ queue. Therefore, for the demostrational purposes,
we use the same worker to both produce and consume messages from the queue. For a real-world scenario, please consider the differences
between [queues.Consume](https://github.com/syumai/workers/blob/main/cloudflare/queues/consumer.go#L65) and
(queues.ConsumeNonBlocking)(https://github.com/syumai/workers/blob/main/cloudflare/queues/consumer.go#L75) functions.

1. Start the dev server.
```sh
make dev
Expand Down
14 changes: 14 additions & 0 deletions _examples/queues/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ func handleErr(w http.ResponseWriter, msg string, err error) {
}

func main() {
// start Qeueue consumer.
// If we would not have an HTTP handler in this worker, we would use queues.Consume instead
queues.ConsumeNonBlocking(consumeBatch)

// start HTTP server
http.HandleFunc("/", handleProduce)
workers.Serve(nil)
}
Expand Down Expand Up @@ -99,3 +104,12 @@ func produceBytes(q *queues.Producer, req *http.Request) error {
}
return nil
}

func consumeBatch(batch *queues.ConsumerMessageBatch) error {
for _, msg := range batch.Messages {
log.Printf("Received message: %v\n", msg.Body.Get("name").String())
}

batch.AckAll()
return nil
}
7 changes: 7 additions & 0 deletions _examples/queues/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,12 @@ compatibility_flags = [
queue = "my-queue"
binding = "QUEUE"

[[queues.consumers]]
queue = "my-queue"
max_batch_size = 1
max_batch_timeout = 30
max_retries = 10
dead_letter_queue = "my-queue-dlq"

[build]
command = "make build"
77 changes: 77 additions & 0 deletions cloudflare/queues/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package queues

import (
"fmt"
"syscall/js"

"github.com/syumai/workers/internal/jsutil"
)

// Consumer is a function that received a batch of messages from Cloudflare Queues.
// The function should be set using Consume or ConsumeNonBlocking.
// A returned error will cause the batch to be retried (unless the batch or individual messages are acked).
// NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message
// acknowledgment until the task is completed witout blocking the queue consumption.
type Consumer func(batch *ConsumerMessageBatch) error

var consumer Consumer

func init() {
handleBatchCallback := js.FuncOf(func(this js.Value, args []js.Value) any {
batch := args[0]
var cb js.Func
cb = js.FuncOf(func(_ js.Value, pArgs []js.Value) any {
defer cb.Release()
resolve := pArgs[0]
reject := pArgs[1]
go func() {
if len(args) > 1 {
reject.Invoke(jsutil.Errorf("too many args given to handleQueueMessageBatch: %d", len(args)))
return
}
err := consumeBatch(batch)
if err != nil {
reject.Invoke(jsutil.Error(err.Error()))
return
}
resolve.Invoke(js.Undefined())
}()
return js.Undefined()
})
return jsutil.NewPromise(cb)
})
jsutil.Binding.Set("handleQueueMessageBatch", handleBatchCallback)
}

func consumeBatch(batch js.Value) error {
b, err := newConsumerMessageBatch(batch)
if err != nil {
return fmt.Errorf("failed to parse message batch: %v", err)
}

if err := consumer(b); err != nil {
return err
}
return nil
}

//go:wasmimport workers ready
func ready()

// Consume sets the Consumer function to receive batches of messages from Cloudflare Queues
// NOTE: This function will block the current goroutine and is intented to be used as long as the
// only worker's purpose is to be the consumer of a Cloudflare Queue.
// In case the worker has other purposes (e.g. handling HTTP requests), use ConsumeNonBlocking instead.
func Consume(f Consumer) {
consumer = f
ready()
select {}
}

// ConsumeNonBlocking sets the Consumer function to receive batches of messages from Cloudflare Queues.
// This function is intented to be used when the worker has other purposes (e.g. handling HTTP requests).
// The worker will not block receiving messages and will continue to execute other tasks.
// ConsumeNonBlocking should be called before setting other blocking handlers (e.g. workers.Serve).
func ConsumeNonBlocking(f Consumer) {
consumer = f
}
98 changes: 98 additions & 0 deletions cloudflare/queues/consumermessage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package queues

import (
"fmt"
"syscall/js"
"time"

"github.com/syumai/workers/internal/jsutil"
)

// ConsumerMessage represents a message of the batch received by the consumer.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message
type ConsumerMessage struct {
// instance - The underlying instance of the JS message object passed by the cloudflare
instance js.Value

// Id - The unique Cloudflare-generated identifier of the message
Id string
// Timestamp - The time when the message was enqueued
Timestamp time.Time
// Body - The message body. Could be accessed directly or using converting helpers as StringBody, BytesBody, IntBody, FloatBody.
Body js.Value
// Attempts - The number of times the message delivery has been retried.
Attempts int
}

func newConsumerMessage(obj js.Value) (*ConsumerMessage, error) {
timestamp, err := jsutil.DateToTime(obj.Get("timestamp"))
if err != nil {
return nil, fmt.Errorf("failed to parse message timestamp: %v", err)
}

return &ConsumerMessage{
instance: obj,
Id: obj.Get("id").String(),
Body: obj.Get("body"),
Attempts: obj.Get("attempts").Int(),
Timestamp: timestamp,
}, nil
}

// Ack acknowledges the message as successfully delivered despite the result returned from the consuming function.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message
func (m *ConsumerMessage) Ack() {
m.instance.Call("ack")
}

// Retry marks the message to be re-delivered.
// The message will be retried after the optional delay configured with RetryOption.
func (m *ConsumerMessage) Retry(opts ...RetryOption) {
var o *retryOptions
if len(opts) > 0 {
o = &retryOptions{}
for _, opt := range opts {
opt(o)
}
}

m.instance.Call("retry", o.toJS())
}

func (m *ConsumerMessage) StringBody() (string, error) {
if m.Body.Type() != js.TypeString {
return "", fmt.Errorf("message body is not a string: %v", m.Body)
}
return m.Body.String(), nil
}

func (m *ConsumerMessage) BytesBody() ([]byte, error) {
switch m.Body.Type() {
case js.TypeString:
return []byte(m.Body.String()), nil
case js.TypeObject:
if m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass) {
b := make([]byte, m.Body.Get("byteLength").Int())
js.CopyBytesToGo(b, m.Body)
return b, nil
}
}

return nil, fmt.Errorf("message body is not a byte array: %v", m.Body)
}

func (m *ConsumerMessage) IntBody() (int, error) {
if m.Body.Type() == js.TypeNumber {
return m.Body.Int(), nil
}

return 0, fmt.Errorf("message body is not a number: %v", m.Body)
}

func (m *ConsumerMessage) FloatBody() (float64, error) {
if m.Body.Type() == js.TypeNumber {
return m.Body.Float(), nil
}

return 0, fmt.Errorf("message body is not a number: %v", m.Body)
}
Comment on lines +84 to +98
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods seem to be just wrappers around js.Value.
Since we can also use Body directly, I'll omit these methods and leave only StringBody and BytesBody. (StringBody is also just a wrapper, but I'll keep it because I think it will be used often.)

Loading
Loading