-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Queues Consumer #137
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package queues | ||
|
||
import ( | ||
"fmt" | ||
"syscall/js" | ||
|
||
"github.com/syumai/workers/internal/jsutil" | ||
) | ||
|
||
// Consumer is a function that received a batch of messages from Cloudflare Queues. | ||
// The function should be set using Consume or ConsumeNonBlocking. | ||
// A returned error will cause the batch to be retried (unless the batch or individual messages are acked). | ||
// NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message | ||
// acknowledgment until the task is completed witout blocking the queue consumption. | ||
type Consumer func(batch *ConsumerMessageBatch) error | ||
|
||
var consumer Consumer | ||
|
||
func init() { | ||
handleBatchCallback := js.FuncOf(func(this js.Value, args []js.Value) any { | ||
batch := args[0] | ||
var cb js.Func | ||
cb = js.FuncOf(func(_ js.Value, pArgs []js.Value) any { | ||
defer cb.Release() | ||
resolve := pArgs[0] | ||
reject := pArgs[1] | ||
go func() { | ||
if len(args) > 1 { | ||
reject.Invoke(jsutil.Errorf("too many args given to handleQueueMessageBatch: %d", len(args))) | ||
return | ||
} | ||
err := consumeBatch(batch) | ||
if err != nil { | ||
reject.Invoke(jsutil.Error(err.Error())) | ||
return | ||
} | ||
resolve.Invoke(js.Undefined()) | ||
}() | ||
return js.Undefined() | ||
}) | ||
return jsutil.NewPromise(cb) | ||
}) | ||
jsutil.Binding.Set("handleQueueMessageBatch", handleBatchCallback) | ||
} | ||
|
||
func consumeBatch(batch js.Value) error { | ||
b, err := newConsumerMessageBatch(batch) | ||
if err != nil { | ||
return fmt.Errorf("failed to parse message batch: %v", err) | ||
} | ||
|
||
if err := consumer(b); err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
//go:wasmimport workers ready | ||
func ready() | ||
|
||
// Consume sets the Consumer function to receive batches of messages from Cloudflare Queues | ||
// NOTE: This function will block the current goroutine and is intented to be used as long as the | ||
// only worker's purpose is to be the consumer of a Cloudflare Queue. | ||
// In case the worker has other purposes (e.g. handling HTTP requests), use ConsumeNonBlocking instead. | ||
func Consume(f Consumer) { | ||
consumer = f | ||
ready() | ||
select {} | ||
} | ||
|
||
// ConsumeNonBlocking sets the Consumer function to receive batches of messages from Cloudflare Queues. | ||
// This function is intented to be used when the worker has other purposes (e.g. handling HTTP requests). | ||
// The worker will not block receiving messages and will continue to execute other tasks. | ||
// ConsumeNonBlocking should be called before setting other blocking handlers (e.g. workers.Serve). | ||
func ConsumeNonBlocking(f Consumer) { | ||
consumer = f | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
package queues | ||
|
||
import ( | ||
"fmt" | ||
"syscall/js" | ||
"time" | ||
|
||
"github.com/syumai/workers/internal/jsutil" | ||
) | ||
|
||
// ConsumerMessage represents a message of the batch received by the consumer. | ||
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message | ||
type ConsumerMessage struct { | ||
// instance - The underlying instance of the JS message object passed by the cloudflare | ||
instance js.Value | ||
|
||
// Id - The unique Cloudflare-generated identifier of the message | ||
Id string | ||
// Timestamp - The time when the message was enqueued | ||
Timestamp time.Time | ||
// Body - The message body. Could be accessed directly or using converting helpers as StringBody, BytesBody, IntBody, FloatBody. | ||
Body js.Value | ||
// Attempts - The number of times the message delivery has been retried. | ||
Attempts int | ||
} | ||
|
||
func newConsumerMessage(obj js.Value) (*ConsumerMessage, error) { | ||
timestamp, err := jsutil.DateToTime(obj.Get("timestamp")) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to parse message timestamp: %v", err) | ||
} | ||
|
||
return &ConsumerMessage{ | ||
instance: obj, | ||
Id: obj.Get("id").String(), | ||
Body: obj.Get("body"), | ||
Attempts: obj.Get("attempts").Int(), | ||
Timestamp: timestamp, | ||
}, nil | ||
} | ||
|
||
// Ack acknowledges the message as successfully delivered despite the result returned from the consuming function. | ||
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message | ||
func (m *ConsumerMessage) Ack() { | ||
m.instance.Call("ack") | ||
} | ||
|
||
// Retry marks the message to be re-delivered. | ||
// The message will be retried after the optional delay configured with RetryOption. | ||
func (m *ConsumerMessage) Retry(opts ...RetryOption) { | ||
var o *retryOptions | ||
if len(opts) > 0 { | ||
o = &retryOptions{} | ||
for _, opt := range opts { | ||
opt(o) | ||
} | ||
} | ||
|
||
m.instance.Call("retry", o.toJS()) | ||
} | ||
|
||
func (m *ConsumerMessage) StringBody() (string, error) { | ||
if m.Body.Type() != js.TypeString { | ||
return "", fmt.Errorf("message body is not a string: %v", m.Body) | ||
} | ||
return m.Body.String(), nil | ||
} | ||
|
||
func (m *ConsumerMessage) BytesBody() ([]byte, error) { | ||
switch m.Body.Type() { | ||
case js.TypeString: | ||
return []byte(m.Body.String()), nil | ||
case js.TypeObject: | ||
if m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass) { | ||
b := make([]byte, m.Body.Get("byteLength").Int()) | ||
js.CopyBytesToGo(b, m.Body) | ||
return b, nil | ||
} | ||
} | ||
|
||
return nil, fmt.Errorf("message body is not a byte array: %v", m.Body) | ||
} | ||
|
||
func (m *ConsumerMessage) IntBody() (int, error) { | ||
if m.Body.Type() == js.TypeNumber { | ||
return m.Body.Int(), nil | ||
} | ||
|
||
return 0, fmt.Errorf("message body is not a number: %v", m.Body) | ||
} | ||
|
||
func (m *ConsumerMessage) FloatBody() (float64, error) { | ||
if m.Body.Type() == js.TypeNumber { | ||
return m.Body.Float(), nil | ||
} | ||
|
||
return 0, fmt.Errorf("message body is not a number: %v", m.Body) | ||
} | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods seem to be just wrappers around js.Value.
Since we can also use Body directly, I'll omit these methods and leave only StringBody and BytesBody. (StringBody is also just a wrapper, but I'll keep it because I think it will be used often.)