Skip to content

Commit

Permalink
Go: XREADGROUP. (valkey-io#2949)
Browse files Browse the repository at this point in the history
* Go: `XREADGROUP`.

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored and Maayanshani25 committed Jan 19, 2025
1 parent 8849031 commit 162cc35
Show file tree
Hide file tree
Showing 5 changed files with 393 additions and 7 deletions.
123 changes: 123 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,129 @@ func (client *baseClient) XReadWithOptions(
return handleXReadResponse(result)
}

// Reads entries from the given streams owned by a consumer group.
//
// Note:
//
// When in cluster mode, all keys in `keysAndIds` must map to the same hash slot.
//
// See [valkey.io] for details.
//
// Parameters:
//
// group - The consumer group name.
// consumer - The group consumer.
// keysAndIds - A map of keys and entry IDs to read from.
//
// Return value:
// A `map[string]map[string][][]string` of stream keys to a map of stream entry IDs mapped to an array entries or `nil` if
// a key does not exist or does not contain requiested entries.
//
// For example:
//
// result, err := client.XReadGroup({"stream1": "0-0", "stream2": "0-1", "stream3": "0-1"})
// err == nil: true
// result: map[string]map[string][][]string{
// "stream1": {
// "0-1": {{"field1", "value1"}},
// "0-2": {{"field2", "value2"}, {"field2", "value3"}},
// },
// "stream2": {
// "1526985676425-0": {{"name", "Virginia"}, {"surname", "Woolf"}},
// "1526985685298-0": nil, // entry was deleted
// },
// "stream3": {}, // stream is empty
// }
//
// [valkey.io]: https://valkey.io/commands/xreadgroup/
func (client *baseClient) XReadGroup(
group string,
consumer string,
keysAndIds map[string]string,
) (map[string]map[string][][]string, error) {
return client.XReadGroupWithOptions(group, consumer, keysAndIds, options.NewXReadGroupOptions())
}

// Reads entries from the given streams owned by a consumer group.
//
// Note:
//
// When in cluster mode, all keys in `keysAndIds` must map to the same hash slot.
//
// See [valkey.io] for details.
//
// Parameters:
//
// group - The consumer group name.
// consumer - The group consumer.
// keysAndIds - A map of keys and entry IDs to read from.
// options - Options detailing how to read the stream.
//
// Return value:
// A `map[string]map[string][][]string` of stream keys to a map of stream entry IDs mapped to an array entries or `nil` if
// a key does not exist or does not contain requiested entries.
//
// For example:
//
// options := options.NewXReadGroupOptions().SetNoAck()
// result, err := client.XReadGroupWithOptions({"stream1": "0-0", "stream2": "0-1", "stream3": "0-1"}, options)
// err == nil: true
// result: map[string]map[string][][]string{
// "stream1": {
// "0-1": {{"field1", "value1"}},
// "0-2": {{"field2", "value2"}, {"field2", "value3"}},
// },
// "stream2": {
// "1526985676425-0": {{"name", "Virginia"}, {"surname", "Woolf"}},
// "1526985685298-0": nil, // entry was deleted
// },
// "stream3": {}, // stream is empty
// }
//
// [valkey.io]: https://valkey.io/commands/xreadgroup/
func (client *baseClient) XReadGroupWithOptions(
group string,
consumer string,
keysAndIds map[string]string,
options *options.XReadGroupOptions,
) (map[string]map[string][][]string, error) {
args, err := createStreamCommandArgs([]string{"GROUP", group, consumer}, keysAndIds, options)
if err != nil {
return nil, err
}

result, err := client.executeCommand(C.XReadGroup, args)
if err != nil {
return nil, err
}

return handleXReadGroupResponse(result)
}

// Combine `args` with `keysAndIds` and `options` into arguments for a stream command
func createStreamCommandArgs(
args []string,
keysAndIds map[string]string,
options interface{ ToArgs() ([]string, error) },
) ([]string, error) {
optionArgs, err := options.ToArgs()
if err != nil {
return nil, err
}
args = append(args, optionArgs...)
// Note: this loop iterates in an indeterminate order, but it is OK for that case
keys := make([]string, 0, len(keysAndIds))
values := make([]string, 0, len(keysAndIds))
for key := range keysAndIds {
keys = append(keys, key)
values = append(values, keysAndIds[key])
}
args = append(args, "STREAMS")
args = append(args, keys...)
args = append(args, values...)
return args, nil
}

func (client *baseClient) ZAdd(
key string,
membersScoreMap map[string]float64,
Expand Down
45 changes: 45 additions & 0 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,51 @@ func (xro *XReadOptions) ToArgs() ([]string, error) {
return args, nil
}

// Optional arguments for `XReadGroup` in [StreamCommands]
type XReadGroupOptions struct {
count, block int64
noAck bool
}

// Create new empty `XReadOptions`
func NewXReadGroupOptions() *XReadGroupOptions {
return &XReadGroupOptions{-1, -1, false}
}

// The maximal number of elements requested. Equivalent to `COUNT` in the Valkey API.
func (xrgo *XReadGroupOptions) SetCount(count int64) *XReadGroupOptions {
xrgo.count = count
return xrgo
}

// If set, the request will be blocked for the set amount of milliseconds or until the server has
// the required number of entries. A value of `0` will block indefinitely. Equivalent to `BLOCK` in the Valkey API.
func (xrgo *XReadGroupOptions) SetBlock(block int64) *XReadGroupOptions {
xrgo.block = block
return xrgo
}

// If set, messages are not added to the Pending Entries List (PEL). This is equivalent to
// acknowledging the message when it is read.
func (xrgo *XReadGroupOptions) SetNoAck() *XReadGroupOptions {
xrgo.noAck = true
return xrgo
}

func (xrgo *XReadGroupOptions) ToArgs() ([]string, error) {
args := []string{}
if xrgo.count >= 0 {
args = append(args, "COUNT", utils.IntToString(xrgo.count))
}
if xrgo.block >= 0 {
args = append(args, "BLOCK", utils.IntToString(xrgo.block))
}
if xrgo.noAck {
args = append(args, "NOACK")
}
return args, nil
}

// Optional arguments for `XPending` in [StreamCommands]
type XPendingOptions struct {
minIdleTime int64
Expand Down
82 changes: 75 additions & 7 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,27 +513,40 @@ type responseConverter interface {

// convert maps, T - type of the value, key is string
type mapConverter[T any] struct {
next responseConverter
next responseConverter
canBeNil bool
}

func (node mapConverter[T]) convert(data interface{}) (interface{}, error) {
if data == nil {
if node.canBeNil {
return nil, nil
} else {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: nil, expected: map[string]%v", getType[T]())}
}
}
result := make(map[string]T)

for key, value := range data.(map[string]interface{}) {
if node.next == nil {
valueT, ok := value.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: %T, expected: %v", value, getType[T]())}
return nil, &RequestError{fmt.Sprintf("Unexpected type of map element: %T, expected: %v", value, getType[T]())}
}
result[key] = valueT
} else {
val, err := node.next.convert(value)
if err != nil {
return nil, err
}
if val == nil {
var null T
result[key] = null
continue
}
valueT, ok := val.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: %T, expected: %v", valueT, getType[T]())}
return nil, &RequestError{fmt.Sprintf("Unexpected type of map element: %T, expected: %v", val, getType[T]())}
}
result[key] = valueT
}
Expand All @@ -544,27 +557,42 @@ func (node mapConverter[T]) convert(data interface{}) (interface{}, error) {

// convert arrays, T - type of the value
type arrayConverter[T any] struct {
next responseConverter
next responseConverter
canBeNil bool
}

func (node arrayConverter[T]) convert(data interface{}) (interface{}, error) {
if data == nil {
if node.canBeNil {
return nil, nil
} else {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: nil, expected: []%v", getType[T]())}
}
}
arrData := data.([]interface{})
result := make([]T, 0, len(arrData))
for _, value := range arrData {
if node.next == nil {
valueT, ok := value.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: %T, expected: %v", value, getType[T]())}
return nil, &RequestError{
fmt.Sprintf("Unexpected type of array element: %T, expected: %v", value, getType[T]()),
}
}
result = append(result, valueT)
} else {
val, err := node.next.convert(value)
if err != nil {
return nil, err
}
if val == nil {
var null T
result = append(result, null)
continue
}
valueT, ok := val.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: %T, expected: %v", valueT, getType[T]())}
return nil, &RequestError{fmt.Sprintf("Unexpected type of array element: %T, expected: %v", val, getType[T]())}
}
result = append(result, valueT)
}
Expand All @@ -588,9 +616,49 @@ func handleXReadResponse(response *C.struct_CommandResponse) (map[string]map[str
converters := mapConverter[map[string][][]string]{
mapConverter[[][]string]{
arrayConverter[[]string]{
arrayConverter[string]{},
arrayConverter[string]{
nil,
false,
},
false,
},
false,
},
false,
}

res, err := converters.convert(data)
if err != nil {
return nil, err
}
if result, ok := res.(map[string]map[string][][]string); ok {
return result, nil
}
return nil, &RequestError{fmt.Sprintf("unexpected type received: %T", res)}
}

func handleXReadGroupResponse(response *C.struct_CommandResponse) (map[string]map[string][][]string, error) {
defer C.free_command_response(response)
data, err := parseMap(response)
if err != nil {
return nil, err
}
if data == nil {
return nil, nil
}

converters := mapConverter[map[string][][]string]{
mapConverter[[][]string]{
arrayConverter[[]string]{
arrayConverter[string]{
nil,
false,
},
true,
},
false,
},
false,
}

res, err := converters.convert(data)
Expand Down
9 changes: 9 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ type StreamCommands interface {
// [valkey.io]: https://valkey.io/commands/xlen/
XLen(key string) (int64, error)

XReadGroup(group string, consumer string, keysAndIds map[string]string) (map[string]map[string][][]string, error)

XReadGroupWithOptions(
group string,
consumer string,
keysAndIds map[string]string,
options *options.XReadGroupOptions,
) (map[string]map[string][][]string, error)

XRead(keysAndIds map[string]string) (map[string]map[string][][]string, error)

XReadWithOptions(keysAndIds map[string]string, options *options.XReadOptions) (map[string]map[string][][]string, error)
Expand Down
Loading

0 comments on commit 162cc35

Please sign in to comment.