Skip to content

Commit

Permalink
Go: XAUTOCLAIM. (valkey-io#2955)
Browse files Browse the repository at this point in the history
* Go: `XAUTOCLAIM`.

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored and barshaul committed Jan 28, 2025
1 parent e4565c6 commit 76347b6
Show file tree
Hide file tree
Showing 6 changed files with 525 additions and 12 deletions.
251 changes: 251 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1874,6 +1874,257 @@ func (client *baseClient) XLen(key string) (int64, error) {
return handleIntResponse(result)
}

// Transfers ownership of pending stream entries that match the specified criteria.
//
// Since:
//
// Valkey 6.2.0 and above.
//
// See [valkey.io] for more details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The group consumer.
// minIdleTime - The minimum idle time for the message to be claimed.
// start - Filters the claimed entries to those that have an ID equal or greater than the specified value.
//
// Return value:
//
// An object containing the following elements:
// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is
// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if
// the entire stream was scanned.
// - A map of the claimed entries.
// - If you are using Valkey 7.0.0 or above, the response will also include an array containing
// the message IDs that were in the Pending Entries List but no longer exist in the stream.
// These IDs are deleted from the Pending Entries List.
//
// Example:
//
// result, err := client.XAutoClaim("myStream", "myGroup", "myConsumer", 42, "0-0")
// result:
// // &{
// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call
// // map[
// // "1609338752495-0": [ // claimed entries
// // ["field 1", "value 1"]
// // ["field 2", "value 2"]
// // ]
// // ]
// // [
// // "1594324506465-0", // array of IDs of deleted messages,
// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above
// // ]
// // }
//
// [valkey.io]: https://valkey.io/commands/xautoclaim/
func (client *baseClient) XAutoClaim(
key string,
group string,
consumer string,
minIdleTime int64,
start string,
) (XAutoClaimResponse, error) {
return client.XAutoClaimWithOptions(key, group, consumer, minIdleTime, start, nil)
}

// Transfers ownership of pending stream entries that match the specified criteria.
//
// Since:
//
// Valkey 6.2.0 and above.
//
// See [valkey.io] for more details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The group consumer.
// minIdleTime - The minimum idle time for the message to be claimed.
// start - Filters the claimed entries to those that have an ID equal or greater than the specified value.
// options - Options detailing how to read the stream.
//
// Return value:
//
// An object containing the following elements:
// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is
// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if
// the entire stream was scanned.
// - A map of the claimed entries.
// - If you are using Valkey 7.0.0 or above, the response will also include an array containing
// the message IDs that were in the Pending Entries List but no longer exist in the stream.
// These IDs are deleted from the Pending Entries List.
//
// Example:
//
// opts := options.NewXAutoClaimOptionsWithCount(1)
// result, err := client.XAutoClaimWithOptions("myStream", "myGroup", "myConsumer", 42, "0-0", opts)
// result:
// // &{
// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call
// // map[
// // "1609338752495-0": [ // claimed entries
// // ["field 1", "value 1"]
// // ["field 2", "value 2"]
// // ]
// // ]
// // [
// // "1594324506465-0", // array of IDs of deleted messages,
// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above
// // ]
// // }
//
// [valkey.io]: https://valkey.io/commands/xautoclaim/
func (client *baseClient) XAutoClaimWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
start string,
options *options.XAutoClaimOptions,
) (XAutoClaimResponse, error) {
args := []string{key, group, consumer, utils.IntToString(minIdleTime), start}
if options != nil {
optArgs, err := options.ToArgs()
if err != nil {
return XAutoClaimResponse{}, err
}
args = append(args, optArgs...)
}
result, err := client.executeCommand(C.XAutoClaim, args)
if err != nil {
return XAutoClaimResponse{}, err
}
return handleXAutoClaimResponse(result)
}

// Transfers ownership of pending stream entries that match the specified criteria.
//
// Since:
//
// Valkey 6.2.0 and above.
//
// See [valkey.io] for more details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The group consumer.
// minIdleTime - The minimum idle time for the message to be claimed.
// start - Filters the claimed entries to those that have an ID equal or greater than the specified value.
//
// Return value:
//
// An object containing the following elements:
// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is
// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if
// the entire stream was scanned.
// - An array of IDs for the claimed entries.
// - If you are using Valkey 7.0.0 or above, the response will also include an array containing
// the message IDs that were in the Pending Entries List but no longer exist in the stream.
// These IDs are deleted from the Pending Entries List.
//
// Example:
//
// result, err := client.XAutoClaimJustId("myStream", "myGroup", "myConsumer", 42, "0-0")
// result:
// // &{
// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call
// // [
// // "1609338752495-0", // claimed entries
// // "1609338752495-1"
// // ]
// // [
// // "1594324506465-0", // array of IDs of deleted messages,
// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above
// // ]
// // }
//
// [valkey.io]: https://valkey.io/commands/xautoclaim/
func (client *baseClient) XAutoClaimJustId(
key string,
group string,
consumer string,
minIdleTime int64,
start string,
) (XAutoClaimJustIdResponse, error) {
return client.XAutoClaimJustIdWithOptions(key, group, consumer, minIdleTime, start, nil)
}

// Transfers ownership of pending stream entries that match the specified criteria.
//
// Since:
//
// Valkey 6.2.0 and above.
//
// See [valkey.io] for more details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The group consumer.
// minIdleTime - The minimum idle time for the message to be claimed.
// start - Filters the claimed entries to those that have an ID equal or greater than the specified value.
// options - Options detailing how to read the stream.
//
// Return value:
//
// An object containing the following elements:
// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is
// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if
// the entire stream was scanned.
// - An array of IDs for the claimed entries.
// - If you are using Valkey 7.0.0 or above, the response will also include an array containing
// the message IDs that were in the Pending Entries List but no longer exist in the stream.
// These IDs are deleted from the Pending Entries List.
//
// Example:
//
// opts := options.NewXAutoClaimOptionsWithCount(1)
// result, err := client.XAutoClaimJustIdWithOptions("myStream", "myGroup", "myConsumer", 42, "0-0", opts)
// result:
// // &{
// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call
// // [
// // "1609338752495-0", // claimed entries
// // "1609338752495-1"
// // ]
// // [
// // "1594324506465-0", // array of IDs of deleted messages,
// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above
// // ]
// // }
//
// [valkey.io]: https://valkey.io/commands/xautoclaim/
func (client *baseClient) XAutoClaimJustIdWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
start string,
options *options.XAutoClaimOptions,
) (XAutoClaimJustIdResponse, error) {
args := []string{key, group, consumer, utils.IntToString(minIdleTime), start}
if options != nil {
optArgs, err := options.ToArgs()
if err != nil {
return XAutoClaimJustIdResponse{}, err
}
args = append(args, optArgs...)
}
args = append(args, "JUSTID")
result, err := client.executeCommand(C.XAutoClaim, args)
if err != nil {
return XAutoClaimJustIdResponse{}, err
}
return handleXAutoClaimJustIdResponse(result)
}

// Removes the specified entries by id from a stream, and returns the number of entries deleted.
//
// See [valkey.io] for details.
Expand Down
14 changes: 14 additions & 0 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ func (xTrimOptions *XTrimOptions) ToArgs() ([]string, error) {
return args, nil
}

// Optional arguments for `XAutoClaim` in [StreamCommands]
type XAutoClaimOptions struct {
count int64
}

// Option to trim the stream according to minimum ID.
func NewXAutoClaimOptionsWithCount(count int64) *XAutoClaimOptions {
return &XAutoClaimOptions{count}
}

func (xacp *XAutoClaimOptions) ToArgs() ([]string, error) {
return []string{"COUNT", utils.IntToString(xacp.count)}, nil
}

// Optional arguments for `XRead` in [StreamCommands]
type XReadOptions struct {
count, block int64
Expand Down
96 changes: 96 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,102 @@ func (node arrayConverter[T]) convert(data interface{}) (interface{}, error) {

// TODO: convert sets

func handleXAutoClaimResponse(response *C.struct_CommandResponse) (XAutoClaimResponse, error) {
defer C.free_command_response(response)
var null XAutoClaimResponse // default response
typeErr := checkResponseType(response, C.Array, false)
if typeErr != nil {
return null, typeErr
}
slice, err := parseArray(response)
if err != nil {
return null, err
}
arr := slice.([]interface{})
len := len(arr)
if len < 2 || len > 3 {
return null, &RequestError{fmt.Sprintf("Unexpected response array length: %d", len)}
}
converted, err := mapConverter[[][]string]{
arrayConverter[[]string]{
arrayConverter[string]{
nil,
false,
},
false,
},
false,
}.convert(arr[1])
if err != nil {
return null, err
}
claimedEntries, ok := converted.(map[string][][]string)
if !ok {
return null, &RequestError{fmt.Sprintf("unexpected type of second element: %T", converted)}
}
var deletedMessages []string
deletedMessages = nil
if len == 3 {
converted, err = arrayConverter[string]{
nil,
false,
}.convert(arr[2])
if err != nil {
return null, err
}
deletedMessages, ok = converted.([]string)
if !ok {
return null, &RequestError{fmt.Sprintf("unexpected type of third element: %T", converted)}
}
}
return XAutoClaimResponse{arr[0].(string), claimedEntries, deletedMessages}, nil
}

func handleXAutoClaimJustIdResponse(response *C.struct_CommandResponse) (XAutoClaimJustIdResponse, error) {
defer C.free_command_response(response)
var null XAutoClaimJustIdResponse // default response
typeErr := checkResponseType(response, C.Array, false)
if typeErr != nil {
return null, typeErr
}
slice, err := parseArray(response)
if err != nil {
return null, err
}
arr := slice.([]interface{})
len := len(arr)
if len < 2 || len > 3 {
return null, &RequestError{fmt.Sprintf("Unexpected response array length: %d", len)}
}
converted, err := arrayConverter[string]{
nil,
false,
}.convert(arr[1])
if err != nil {
return null, err
}
claimedEntries, ok := converted.([]string)
if !ok {
return null, &RequestError{fmt.Sprintf("unexpected type of second element: %T", converted)}
}
var deletedMessages []string
deletedMessages = nil
if len == 3 {
converted, err = arrayConverter[string]{
nil,
false,
}.convert(arr[2])
if err != nil {
return null, err
}
deletedMessages, ok = converted.([]string)
if !ok {
return null, &RequestError{fmt.Sprintf("unexpected type of third element: %T", converted)}
}
}
return XAutoClaimJustIdResponse{arr[0].(string), claimedEntries, deletedMessages}, nil
}

func handleXReadResponse(response *C.struct_CommandResponse) (map[string]map[string][][]string, error) {
defer C.free_command_response(response)
data, err := parseMap(response)
Expand Down
Loading

0 comments on commit 76347b6

Please sign in to comment.