From d3a747173e0245dba87bbbe2d5fefef327f273ac Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 15 Jan 2025 14:17:41 -0800 Subject: [PATCH 1/4] Go: `XAUTOCLAIM`. Signed-off-by: Yury-Fridlyand --- go/api/base_client.go | 251 +++++++++++++++++++++++++++ go/api/options/stream_options.go | 13 ++ go/api/response_handlers.go | 197 +++++++++++++++++++++ go/api/response_types.go | 14 ++ go/api/stream_commands.go | 28 +++ go/integTest/shared_commands_test.go | 137 +++++++++++++++ 6 files changed, 640 insertions(+) diff --git a/go/api/base_client.go b/go/api/base_client.go index 4c3ce8e307..50875e1383 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -1622,3 +1622,254 @@ func (client *baseClient) XLen(key string) (int64, error) { } return handleIntResponse(result) } + +// Transfers ownership of pending stream entries that match the specified criteria. +// +// Since: +// +// Valkey 6.2.0 and above. +// +// See [valkey.io] for more details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// consumer - The group consumer. +// minIdleTime - The minimum idle time for the message to be claimed. +// start - Filters the claimed entries to those that have an ID equal or greater than the specified value. +// +// Return value: +// +// An object containing the following elements: +// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is +// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if +// the entire stream was scanned. +// - A map of the claimed entries. +// - If you are using Valkey 7.0.0 or above, the response will also include an array containing +// the message IDs that were in the Pending Entries List but no longer exist in the stream. +// These IDs are deleted from the Pending Entries List. +// +// Example: +// +// result, err := client.XAutoClaim("myStream", "myGroup", "myConsumer", 42, "0-0") +// result: +// // &{ +// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call +// // map[ +// // "1609338752495-0": [ // claimed entries +// // ["field 1", "value 1"] +// // ["field 2", "value 2"] +// // ] +// // ] +// // [ +// // "1594324506465-0", // array of IDs of deleted messages, +// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above +// // ] +// // } +// +// [valkey.io]: https://valkey.io/commands/xautoclaim/ +func (client *baseClient) XAutoClaim( + key string, + group string, + consumer string, + minIdleTime int64, + start string, +) (*XAutoClaimResponse, error) { + return client.XAutoClaimWithOptions(key, group, consumer, minIdleTime, start, nil) +} + +// Transfers ownership of pending stream entries that match the specified criteria. +// +// Since: +// +// Valkey 6.2.0 and above. +// +// See [valkey.io] for more details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// consumer - The group consumer. +// minIdleTime - The minimum idle time for the message to be claimed. +// start - Filters the claimed entries to those that have an ID equal or greater than the specified value. +// options - Options detailing how to read the stream. +// +// Return value: +// +// An object containing the following elements: +// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is +// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if +// the entire stream was scanned. +// - A map of the claimed entries. +// - If you are using Valkey 7.0.0 or above, the response will also include an array containing +// the message IDs that were in the Pending Entries List but no longer exist in the stream. +// These IDs are deleted from the Pending Entries List. +// +// Example: +// +// opts := options.NewXAutoClaimOptionsWithCount(1) +// result, err := client.XAutoClaimWithOptions("myStream", "myGroup", "myConsumer", 42, "0-0", opts) +// result: +// // &{ +// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call +// // map[ +// // "1609338752495-0": [ // claimed entries +// // ["field 1", "value 1"] +// // ["field 2", "value 2"] +// // ] +// // ] +// // [ +// // "1594324506465-0", // array of IDs of deleted messages, +// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above +// // ] +// // } +// +// [valkey.io]: https://valkey.io/commands/xautoclaim/ +func (client *baseClient) XAutoClaimWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + start string, + options *options.XAutoClaimOptions, +) (*XAutoClaimResponse, error) { + args := []string{key, group, consumer, utils.IntToString(minIdleTime), start} + if options != nil { + optArgs, err := options.ToArgs() + if err != nil { + return nil, err + } + args = append(args, optArgs...) + } + result, err := client.executeCommand(C.XAutoClaim, args) + if err != nil { + return nil, err + } + return handleXAutoClaimResponse(result) +} + +// Transfers ownership of pending stream entries that match the specified criteria. +// +// Since: +// +// Valkey 6.2.0 and above. +// +// See [valkey.io] for more details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// consumer - The group consumer. +// minIdleTime - The minimum idle time for the message to be claimed. +// start - Filters the claimed entries to those that have an ID equal or greater than the specified value. +// +// Return value: +// +// An object containing the following elements: +// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is +// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if +// the entire stream was scanned. +// - An array of IDs for the claimed entries. +// - If you are using Valkey 7.0.0 or above, the response will also include an array containing +// the message IDs that were in the Pending Entries List but no longer exist in the stream. +// These IDs are deleted from the Pending Entries List. +// +// Example: +// +// result, err := client.XAutoClaimJustId("myStream", "myGroup", "myConsumer", 42, "0-0") +// result: +// // &{ +// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call +// // [ +// // "1609338752495-0", // claimed entries +// // "1609338752495-1" +// // ] +// // [ +// // "1594324506465-0", // array of IDs of deleted messages, +// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above +// // ] +// // } +// +// [valkey.io]: https://valkey.io/commands/xautoclaim/ +func (client *baseClient) XAutoClaimJustId( + key string, + group string, + consumer string, + minIdleTime int64, + start string, +) (*XAutoClaimJustIdResponse, error) { + return client.XAutoClaimJustIdWithOptions(key, group, consumer, minIdleTime, start, nil) +} + +// Transfers ownership of pending stream entries that match the specified criteria. +// +// Since: +// +// Valkey 6.2.0 and above. +// +// See [valkey.io] for more details. +// +// Parameters: +// +// key - The key of the stream. +// group - The consumer group name. +// consumer - The group consumer. +// minIdleTime - The minimum idle time for the message to be claimed. +// start - Filters the claimed entries to those that have an ID equal or greater than the specified value. +// options - Options detailing how to read the stream. +// +// Return value: +// +// An object containing the following elements: +// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is +// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if +// the entire stream was scanned. +// - An array of IDs for the claimed entries. +// - If you are using Valkey 7.0.0 or above, the response will also include an array containing +// the message IDs that were in the Pending Entries List but no longer exist in the stream. +// These IDs are deleted from the Pending Entries List. +// +// Example: +// +// opts := options.NewXAutoClaimOptionsWithCount(1) +// result, err := client.XAutoClaimJustIdWithOptions("myStream", "myGroup", "myConsumer", 42, "0-0", opts) +// result: +// // &{ +// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call +// // [ +// // "1609338752495-0", // claimed entries +// // "1609338752495-1" +// // ] +// // [ +// // "1594324506465-0", // array of IDs of deleted messages, +// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above +// // ] +// // } +// +// [valkey.io]: https://valkey.io/commands/xautoclaim/ +func (client *baseClient) XAutoClaimJustIdWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + start string, + options *options.XAutoClaimOptions, +) (*XAutoClaimJustIdResponse, error) { + args := []string{key, group, consumer, utils.IntToString(minIdleTime), start} + if options != nil { + optArgs, err := options.ToArgs() + if err != nil { + return nil, err + } + args = append(args, optArgs...) + } + args = append(args, "JUSTID") + result, err := client.executeCommand(C.XAutoClaim, args) + if err != nil { + return nil, err + } + return handleXAutoClaimJustIdResponse(result) +} diff --git a/go/api/options/stream_options.go b/go/api/options/stream_options.go index 95a8c69d33..35bf9e5fa3 100644 --- a/go/api/options/stream_options.go +++ b/go/api/options/stream_options.go @@ -116,3 +116,16 @@ func (xTrimOptions *XTrimOptions) ToArgs() ([]string, error) { } return args, nil } + +type XAutoClaimOptions struct { + count int64 +} + +// Option to trim the stream according to minimum ID. +func NewXAutoClaimOptionsWithCount(count int64) *XAutoClaimOptions { + return &XAutoClaimOptions{count} +} + +func (xacp *XAutoClaimOptions) ToArgs() ([]string, error) { + return []string{"COUNT", utils.IntToString(xacp.count)}, nil +} diff --git a/go/api/response_handlers.go b/go/api/response_handlers.go index 79d49c92eb..5cf83d6819 100644 --- a/go/api/response_handlers.go +++ b/go/api/response_handlers.go @@ -8,6 +8,7 @@ import "C" import ( "fmt" + "reflect" "unsafe" ) @@ -496,3 +497,199 @@ func convertToResultStringArray(input []interface{}) ([]Result[string], error) { } return result, nil } + +// get type of T +func getType[T any]() reflect.Type { + var zero [0]T + return reflect.TypeOf(zero).Elem() +} + +// convert (typecast) untyped response into a typed value +// for example, an arbitrary array `[]interface{}` into `[]string` +type responseConverter interface { + convert(data interface{}) (interface{}, error) +} + +// convert maps, T - type of the value, key is string +type mapConverter[T any] struct { + next responseConverter + canBeNil bool +} + +func (node mapConverter[T]) convert(data interface{}) (interface{}, error) { + if data == nil { + if node.canBeNil { + return nil, nil + } else { + return nil, &RequestError{fmt.Sprintf("Unexpected type received: nil, expected: map[string]%v", getType[T]())} + } + } + result := make(map[string]T) + + for key, value := range data.(map[string]interface{}) { + if node.next == nil { + valueT, ok := value.(T) + if !ok { + return nil, &RequestError{fmt.Sprintf("Unexpected type of map element: %T, expected: %v", value, getType[T]())} + } + result[key] = valueT + } else { + val, err := node.next.convert(value) + if err != nil { + return nil, err + } + if val == nil { + var null T + result[key] = null + continue + } + valueT, ok := val.(T) + if !ok { + return nil, &RequestError{fmt.Sprintf("Unexpected type of map element: %T, expected: %v", val, getType[T]())} + } + result[key] = valueT + } + } + + return result, nil +} + +// convert arrays, T - type of the value +type arrayConverter[T any] struct { + next responseConverter + canBeNil bool +} + +func (node arrayConverter[T]) convert(data interface{}) (interface{}, error) { + if data == nil { + if node.canBeNil { + return nil, nil + } else { + return nil, &RequestError{fmt.Sprintf("Unexpected type received: nil, expected: []%v", getType[T]())} + } + } + arrData := data.([]interface{}) + result := make([]T, 0, len(arrData)) + for _, value := range arrData { + if node.next == nil { + valueT, ok := value.(T) + if !ok { + return nil, &RequestError{ + fmt.Sprintf("Unexpected type of array element: %T, expected: %v", value, getType[T]()), + } + } + result = append(result, valueT) + } else { + val, err := node.next.convert(value) + if err != nil { + return nil, err + } + if val == nil { + var null T + result = append(result, null) + continue + } + valueT, ok := val.(T) + if !ok { + return nil, &RequestError{fmt.Sprintf("Unexpected type of array element: %T, expected: %v", val, getType[T]())} + } + result = append(result, valueT) + } + } + + return result, nil +} + +func handleXAutoClaimResponse(response *C.struct_CommandResponse) (*XAutoClaimResponse, error) { + defer C.free_command_response(response) + typeErr := checkResponseType(response, C.Array, false) + if typeErr != nil { + return nil, typeErr + } + slice, err := parseArray(response) + if err != nil { + return nil, err + } + arr := slice.([]interface{}) + len := len(arr) + if len < 2 || len > 3 { + return nil, &RequestError{fmt.Sprintf("Unexpected response array length: %d", len)} + } + converted, err := mapConverter[[][]string]{ + arrayConverter[[]string]{ + arrayConverter[string]{ + nil, + false, + }, + false, + }, + false, + }.convert(arr[1]) + if err != nil { + return nil, err + } + claimedEntries, ok := converted.(map[string][][]string) + if !ok { + return nil, &RequestError{fmt.Sprintf("unexpected type of second element: %T", converted)} + } + var deletedMessages []string + deletedMessages = nil + if len == 3 { + converted, err = arrayConverter[string]{ + nil, + false, + }.convert(arr[2]) + if err != nil { + return nil, err + } + deletedMessages, ok = converted.([]string) + if !ok { + return nil, &RequestError{fmt.Sprintf("unexpected type of third element: %T", converted)} + } + } + return &XAutoClaimResponse{arr[0].(string), claimedEntries, deletedMessages}, nil +} + +func handleXAutoClaimJustIdResponse(response *C.struct_CommandResponse) (*XAutoClaimJustIdResponse, error) { + defer C.free_command_response(response) + typeErr := checkResponseType(response, C.Array, false) + if typeErr != nil { + return nil, typeErr + } + slice, err := parseArray(response) + if err != nil { + return nil, err + } + arr := slice.([]interface{}) + len := len(arr) + if len < 2 || len > 3 { + return nil, &RequestError{fmt.Sprintf("Unexpected response array length: %d", len)} + } + converted, err := arrayConverter[string]{ + nil, + false, + }.convert(arr[1]) + if err != nil { + return nil, err + } + claimedEntries, ok := converted.([]string) + if !ok { + return nil, &RequestError{fmt.Sprintf("unexpected type of second element: %T", converted)} + } + var deletedMessages []string + deletedMessages = nil + if len == 3 { + converted, err = arrayConverter[string]{ + nil, + false, + }.convert(arr[2]) + if err != nil { + return nil, err + } + deletedMessages, ok = converted.([]string) + if !ok { + return nil, &RequestError{fmt.Sprintf("unexpected type of third element: %T", converted)} + } + } + return &XAutoClaimJustIdResponse{arr[0].(string), claimedEntries, deletedMessages}, nil +} diff --git a/go/api/response_types.go b/go/api/response_types.go index 2c7f3244b8..1b695c26a4 100644 --- a/go/api/response_types.go +++ b/go/api/response_types.go @@ -23,6 +23,20 @@ type KeyWithMemberAndScore struct { Score float64 } +// Response type of [XAutoClaim] command. +type XAutoClaimResponse struct { + NextEntry string + ClaimedEntries map[string][][]string + DeletedMessages []string +} + +// Response type of [XAutoClaimJustId] command. +type XAutoClaimJustIdResponse struct { + NextEntry string + ClaimedEntries []string + DeletedMessages []string +} + func (result Result[T]) IsNil() bool { return result.isNil } diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 4aa295a753..b17d4b9c82 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -101,4 +101,32 @@ type StreamCommands interface { // // [valkey.io]: https://valkey.io/commands/xlen/ XLen(key string) (int64, error) + + XAutoClaim(key string, group string, consumer string, minIdleTime int64, start string) (*XAutoClaimResponse, error) + + XAutoClaimWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + start string, + options *options.XAutoClaimOptions, + ) (*XAutoClaimResponse, error) + + XAutoClaimJustId( + key string, + group string, + consumer string, + minIdleTime int64, + start string, + ) (*XAutoClaimJustIdResponse, error) + + XAutoClaimJustIdWithOptions( + key string, + group string, + consumer string, + minIdleTime int64, + start string, + options *options.XAutoClaimOptions, + ) (*XAutoClaimJustIdResponse, error) } diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 3d58d48094..2443f64d39 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -4105,6 +4105,143 @@ func (suite *GlideTestSuite) TestXAddWithOptions() { }) } +// submit args with custom command API, check that no error returned. +// returns a response or raises `errMsg` if failed to submit the command. +func sendWithCustomCommand(suite *GlideTestSuite, client api.BaseClient, args []string, errMsg string) any { + var res any + var err error + switch c := client.(type) { + case api.GlideClient: + res, err = c.CustomCommand(args) + case api.GlideClusterClient: + res, err = c.CustomCommand(args) + default: + suite.FailNow(errMsg) + } + assert.NoError(suite.T(), err) + return res +} + +func (suite *GlideTestSuite) TestXAutoClaim() { + suite.runWithDefaultClients(func(client api.BaseClient) { + key := uuid.NewString() + group := uuid.NewString() + consumer := uuid.NewString() + + sendWithCustomCommand( + suite, + client, + []string{"xgroup", "create", key, group, "0", "MKSTREAM"}, + "Can't send XGROUP CREATE as a custom command", + ) + sendWithCustomCommand( + suite, + client, + []string{"xgroup", "createconsumer", key, group, consumer}, + "Can't send XGROUP CREATECONSUMER as a custom command", + ) + + xadd, err := client.XAddWithOptions( + key, + [][]string{{"entry1_field1", "entry1_value1"}, {"entry1_field2", "entry1_value2"}}, + options.NewXAddOptions().SetId("0-1"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "0-1", xadd.Value()) + xadd, err = client.XAddWithOptions( + key, + [][]string{{"entry2_field1", "entry2_value1"}}, + options.NewXAddOptions().SetId("0-2"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "0-2", xadd.Value()) + + sendWithCustomCommand( + suite, + client, + []string{"XREADGROUP", "GROUP", group, consumer, "STREAMS", key, ">"}, + "Can't send XREADGROUP as a custom command", + ) + // assert.Equal(suite.T(), map[string]map[string][][]string{ + // key: { + // "0-1": {{"entry1_field1", "entry1_value1"}, {"entry1_field2", "entry1_value2"}}, + // "0-2": {{"entry2_field1", "entry2_value1"}}, + // }, + // }, xreadgroup) + + opts := options.NewXAutoClaimOptionsWithCount(1) // struct{count int64}{count: 1} + xautoclaim, err := client.XAutoClaimWithOptions(key, group, consumer, 0, "0-0", opts) //&struct{count int64}{count: 1}) + assert.NoError(suite.T(), err) + var deletedEntries []string + if suite.serverVersion >= "7.0.0" { + deletedEntries = []string{} + } + assert.Equal( + suite.T(), + &api.XAutoClaimResponse{ + NextEntry: "0-2", + ClaimedEntries: map[string][][]string{ + "0-1": {{"entry1_field1", "entry1_value1"}, {"entry1_field2", "entry1_value2"}}, + }, + DeletedMessages: deletedEntries, + }, + xautoclaim, + ) + + justId, err := client.XAutoClaimJustId(key, group, consumer, 0, "0-0") + assert.NoError(suite.T(), err) + assert.Equal( + suite.T(), + &api.XAutoClaimJustIdResponse{ + NextEntry: "0-0", + ClaimedEntries: []string{"0-1", "0-2"}, + DeletedMessages: deletedEntries, + }, + justId, + ) + + // add one more entry + xadd, err = client.XAddWithOptions( + key, + [][]string{{"entry3_field1", "entry3_value1"}}, + options.NewXAddOptions().SetId("0-3"), + ) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), "0-3", xadd.Value()) + + // incorrect IDs - response is empty + xautoclaim, err = client.XAutoClaim(key, group, consumer, 0, "5-0") + assert.NoError(suite.T(), err) + assert.Equal( + suite.T(), + &api.XAutoClaimResponse{ + NextEntry: "0-0", + ClaimedEntries: map[string][][]string{}, + DeletedMessages: deletedEntries, + }, + xautoclaim, + ) + + justId, err = client.XAutoClaimJustId(key, group, consumer, 0, "5-0") + assert.NoError(suite.T(), err) + assert.Equal( + suite.T(), + &api.XAutoClaimJustIdResponse{ + NextEntry: "0-0", + ClaimedEntries: []string{}, + DeletedMessages: deletedEntries, + }, + justId, + ) + + // key exists, but it is not a stream + key2 := uuid.New().String() + suite.verifyOK(client.Set(key2, key2)) + _, err = client.XAutoClaim(key2, "_", "_", 0, "_") + assert.IsType(suite.T(), &api.RequestError{}, err) + }) +} + func (suite *GlideTestSuite) TestZAddAndZAddIncr() { suite.runWithDefaultClients(func(client api.BaseClient) { key := uuid.New().String() From d3eaf554a6f85785e3da0eec3e9f549814339364 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 15 Jan 2025 14:43:27 -0800 Subject: [PATCH 2/4] cleanup Signed-off-by: Yury-Fridlyand --- go/api/base_client.go | 1 - go/integTest/shared_commands_test.go | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 1331b323aa..2ca6f39545 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -2101,7 +2101,6 @@ func (client *baseClient) ZScore(key string, member string) (Result[float64], er // } // // [valkey.io]: https://valkey.io/commands/zscan/ - func (client *baseClient) ZScan(key string, cursor string) (Result[string], []Result[string], error) { result, err := client.executeCommand(C.ZScan, []string{key, cursor}) if err != nil { diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 22c707c625..d9fa8f3f70 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -4169,8 +4169,8 @@ func (suite *GlideTestSuite) TestXAutoClaim() { // }, // }, xreadgroup) - opts := options.NewXAutoClaimOptionsWithCount(1) // struct{count int64}{count: 1} - xautoclaim, err := client.XAutoClaimWithOptions(key, group, consumer, 0, "0-0", opts) //&struct{count int64}{count: 1}) + opts := options.NewXAutoClaimOptionsWithCount(1) + xautoclaim, err := client.XAutoClaimWithOptions(key, group, consumer, 0, "0-0", opts) assert.NoError(suite.T(), err) var deletedEntries []string if suite.serverVersion >= "7.0.0" { From 0d06f6755706622d2d82a4a93e55223bb8662c28 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Thu, 16 Jan 2025 13:52:16 -0800 Subject: [PATCH 3/4] Fix return type Signed-off-by: Yury-Fridlyand --- go/api/base_client.go | 16 ++++++------ go/api/response_handlers.go | 38 +++++++++++++++------------- go/api/stream_commands.go | 8 +++--- go/integTest/shared_commands_test.go | 8 +++--- 4 files changed, 36 insertions(+), 34 deletions(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 2ca6f39545..efa50f0678 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -1803,7 +1803,7 @@ func (client *baseClient) XAutoClaim( consumer string, minIdleTime int64, start string, -) (*XAutoClaimResponse, error) { +) (XAutoClaimResponse, error) { return client.XAutoClaimWithOptions(key, group, consumer, minIdleTime, start, nil) } @@ -1862,18 +1862,18 @@ func (client *baseClient) XAutoClaimWithOptions( minIdleTime int64, start string, options *options.XAutoClaimOptions, -) (*XAutoClaimResponse, error) { +) (XAutoClaimResponse, error) { args := []string{key, group, consumer, utils.IntToString(minIdleTime), start} if options != nil { optArgs, err := options.ToArgs() if err != nil { - return nil, err + return XAutoClaimResponse{}, err } args = append(args, optArgs...) } result, err := client.executeCommand(C.XAutoClaim, args) if err != nil { - return nil, err + return XAutoClaimResponse{}, err } return handleXAutoClaimResponse(result) } @@ -1928,7 +1928,7 @@ func (client *baseClient) XAutoClaimJustId( consumer string, minIdleTime int64, start string, -) (*XAutoClaimJustIdResponse, error) { +) (XAutoClaimJustIdResponse, error) { return client.XAutoClaimJustIdWithOptions(key, group, consumer, minIdleTime, start, nil) } @@ -1985,19 +1985,19 @@ func (client *baseClient) XAutoClaimJustIdWithOptions( minIdleTime int64, start string, options *options.XAutoClaimOptions, -) (*XAutoClaimJustIdResponse, error) { +) (XAutoClaimJustIdResponse, error) { args := []string{key, group, consumer, utils.IntToString(minIdleTime), start} if options != nil { optArgs, err := options.ToArgs() if err != nil { - return nil, err + return XAutoClaimJustIdResponse{}, err } args = append(args, optArgs...) } args = append(args, "JUSTID") result, err := client.executeCommand(C.XAutoClaim, args) if err != nil { - return nil, err + return XAutoClaimJustIdResponse{}, err } return handleXAutoClaimJustIdResponse(result) } diff --git a/go/api/response_handlers.go b/go/api/response_handlers.go index c0194d33db..463e132e51 100644 --- a/go/api/response_handlers.go +++ b/go/api/response_handlers.go @@ -602,20 +602,21 @@ func (node arrayConverter[T]) convert(data interface{}) (interface{}, error) { // TODO: convert sets -func handleXAutoClaimResponse(response *C.struct_CommandResponse) (*XAutoClaimResponse, error) { +func handleXAutoClaimResponse(response *C.struct_CommandResponse) (XAutoClaimResponse, error) { defer C.free_command_response(response) + var null XAutoClaimResponse // default response typeErr := checkResponseType(response, C.Array, false) if typeErr != nil { - return nil, typeErr + return null, typeErr } slice, err := parseArray(response) if err != nil { - return nil, err + return null, err } arr := slice.([]interface{}) len := len(arr) if len < 2 || len > 3 { - return nil, &RequestError{fmt.Sprintf("Unexpected response array length: %d", len)} + return null, &RequestError{fmt.Sprintf("Unexpected response array length: %d", len)} } converted, err := mapConverter[[][]string]{ arrayConverter[[]string]{ @@ -628,11 +629,11 @@ func handleXAutoClaimResponse(response *C.struct_CommandResponse) (*XAutoClaimRe false, }.convert(arr[1]) if err != nil { - return nil, err + return null, err } claimedEntries, ok := converted.(map[string][][]string) if !ok { - return nil, &RequestError{fmt.Sprintf("unexpected type of second element: %T", converted)} + return null, &RequestError{fmt.Sprintf("unexpected type of second element: %T", converted)} } var deletedMessages []string deletedMessages = nil @@ -642,41 +643,42 @@ func handleXAutoClaimResponse(response *C.struct_CommandResponse) (*XAutoClaimRe false, }.convert(arr[2]) if err != nil { - return nil, err + return null, err } deletedMessages, ok = converted.([]string) if !ok { - return nil, &RequestError{fmt.Sprintf("unexpected type of third element: %T", converted)} + return null, &RequestError{fmt.Sprintf("unexpected type of third element: %T", converted)} } } - return &XAutoClaimResponse{arr[0].(string), claimedEntries, deletedMessages}, nil + return XAutoClaimResponse{arr[0].(string), claimedEntries, deletedMessages}, nil } -func handleXAutoClaimJustIdResponse(response *C.struct_CommandResponse) (*XAutoClaimJustIdResponse, error) { +func handleXAutoClaimJustIdResponse(response *C.struct_CommandResponse) (XAutoClaimJustIdResponse, error) { defer C.free_command_response(response) + var null XAutoClaimJustIdResponse // default response typeErr := checkResponseType(response, C.Array, false) if typeErr != nil { - return nil, typeErr + return null, typeErr } slice, err := parseArray(response) if err != nil { - return nil, err + return null, err } arr := slice.([]interface{}) len := len(arr) if len < 2 || len > 3 { - return nil, &RequestError{fmt.Sprintf("Unexpected response array length: %d", len)} + return null, &RequestError{fmt.Sprintf("Unexpected response array length: %d", len)} } converted, err := arrayConverter[string]{ nil, false, }.convert(arr[1]) if err != nil { - return nil, err + return null, err } claimedEntries, ok := converted.([]string) if !ok { - return nil, &RequestError{fmt.Sprintf("unexpected type of second element: %T", converted)} + return null, &RequestError{fmt.Sprintf("unexpected type of second element: %T", converted)} } var deletedMessages []string deletedMessages = nil @@ -686,14 +688,14 @@ func handleXAutoClaimJustIdResponse(response *C.struct_CommandResponse) (*XAutoC false, }.convert(arr[2]) if err != nil { - return nil, err + return null, err } deletedMessages, ok = converted.([]string) if !ok { - return nil, &RequestError{fmt.Sprintf("unexpected type of third element: %T", converted)} + return null, &RequestError{fmt.Sprintf("unexpected type of third element: %T", converted)} } } - return &XAutoClaimJustIdResponse{arr[0].(string), claimedEntries, deletedMessages}, nil + return XAutoClaimJustIdResponse{arr[0].(string), claimedEntries, deletedMessages}, nil } func handleXReadResponse(response *C.struct_CommandResponse) (map[string]map[string][][]string, error) { diff --git a/go/api/stream_commands.go b/go/api/stream_commands.go index 77b8738cc4..0b338f218f 100644 --- a/go/api/stream_commands.go +++ b/go/api/stream_commands.go @@ -102,7 +102,7 @@ type StreamCommands interface { // [valkey.io]: https://valkey.io/commands/xlen/ XLen(key string) (int64, error) - XAutoClaim(key string, group string, consumer string, minIdleTime int64, start string) (*XAutoClaimResponse, error) + XAutoClaim(key string, group string, consumer string, minIdleTime int64, start string) (XAutoClaimResponse, error) XAutoClaimWithOptions( key string, @@ -111,7 +111,7 @@ type StreamCommands interface { minIdleTime int64, start string, options *options.XAutoClaimOptions, - ) (*XAutoClaimResponse, error) + ) (XAutoClaimResponse, error) XAutoClaimJustId( key string, @@ -119,7 +119,7 @@ type StreamCommands interface { consumer string, minIdleTime int64, start string, - ) (*XAutoClaimJustIdResponse, error) + ) (XAutoClaimJustIdResponse, error) XAutoClaimJustIdWithOptions( key string, @@ -128,7 +128,7 @@ type StreamCommands interface { minIdleTime int64, start string, options *options.XAutoClaimOptions, - ) (*XAutoClaimJustIdResponse, error) + ) (XAutoClaimJustIdResponse, error) XRead(keysAndIds map[string]string) (map[string]map[string][][]string, error) diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index d9fa8f3f70..831c69208d 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -4178,7 +4178,7 @@ func (suite *GlideTestSuite) TestXAutoClaim() { } assert.Equal( suite.T(), - &api.XAutoClaimResponse{ + api.XAutoClaimResponse{ NextEntry: "0-2", ClaimedEntries: map[string][][]string{ "0-1": {{"entry1_field1", "entry1_value1"}, {"entry1_field2", "entry1_value2"}}, @@ -4192,7 +4192,7 @@ func (suite *GlideTestSuite) TestXAutoClaim() { assert.NoError(suite.T(), err) assert.Equal( suite.T(), - &api.XAutoClaimJustIdResponse{ + api.XAutoClaimJustIdResponse{ NextEntry: "0-0", ClaimedEntries: []string{"0-1", "0-2"}, DeletedMessages: deletedEntries, @@ -4214,7 +4214,7 @@ func (suite *GlideTestSuite) TestXAutoClaim() { assert.NoError(suite.T(), err) assert.Equal( suite.T(), - &api.XAutoClaimResponse{ + api.XAutoClaimResponse{ NextEntry: "0-0", ClaimedEntries: map[string][][]string{}, DeletedMessages: deletedEntries, @@ -4226,7 +4226,7 @@ func (suite *GlideTestSuite) TestXAutoClaim() { assert.NoError(suite.T(), err) assert.Equal( suite.T(), - &api.XAutoClaimJustIdResponse{ + api.XAutoClaimJustIdResponse{ NextEntry: "0-0", ClaimedEntries: []string{}, DeletedMessages: deletedEntries, From db8be02e89341f600f3410462add2e9cc645b47b Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Fri, 17 Jan 2025 12:11:23 -0800 Subject: [PATCH 4/4] fix tests Signed-off-by: Yury-Fridlyand --- go/api/base_client.go | 8 +++--- go/integTest/shared_commands_test.go | 38 ++++++++++------------------ 2 files changed, 18 insertions(+), 28 deletions(-) diff --git a/go/api/base_client.go b/go/api/base_client.go index 97f23a0ded..50a86dd851 100644 --- a/go/api/base_client.go +++ b/go/api/base_client.go @@ -1892,7 +1892,7 @@ func (client *baseClient) XLen(key string) (int64, error) { // // Return value: // -// An object containing the following elements: +// An object containing the following elements: // - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is // equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if // the entire stream was scanned. @@ -1949,7 +1949,7 @@ func (client *baseClient) XAutoClaim( // // Return value: // -// An object containing the following elements: +// An object containing the following elements: // - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is // equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if // the entire stream was scanned. @@ -2019,7 +2019,7 @@ func (client *baseClient) XAutoClaimWithOptions( // // Return value: // -// An object containing the following elements: +// An object containing the following elements: // - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is // equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if // the entire stream was scanned. @@ -2074,7 +2074,7 @@ func (client *baseClient) XAutoClaimJustId( // // Return value: // -// An object containing the following elements: +// An object containing the following elements: // - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is // equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if // the entire stream was scanned. diff --git a/go/integTest/shared_commands_test.go b/go/integTest/shared_commands_test.go index 4c01ef970c..eef3bf0e82 100644 --- a/go/integTest/shared_commands_test.go +++ b/go/integTest/shared_commands_test.go @@ -4156,18 +4156,14 @@ func (suite *GlideTestSuite) TestXAutoClaim() { assert.NoError(suite.T(), err) assert.Equal(suite.T(), "0-2", xadd.Value()) - sendWithCustomCommand( - suite, - client, - []string{"XREADGROUP", "GROUP", group, consumer, "STREAMS", key, ">"}, - "Can't send XREADGROUP as a custom command", - ) - // assert.Equal(suite.T(), map[string]map[string][][]string{ - // key: { - // "0-1": {{"entry1_field1", "entry1_value1"}, {"entry1_field2", "entry1_value2"}}, - // "0-2": {{"entry2_field1", "entry2_value1"}}, - // }, - // }, xreadgroup) + xreadgroup, err := client.XReadGroup(group, consumer, map[string]string{key: ">"}) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), map[string]map[string][][]string{ + key: { + "0-1": {{"entry1_field1", "entry1_value1"}, {"entry1_field2", "entry1_value2"}}, + "0-2": {{"entry2_field1", "entry2_value1"}}, + }, + }, xreadgroup) opts := options.NewXAutoClaimOptionsWithCount(1) xautoclaim, err := client.XAutoClaimWithOptions(key, group, consumer, 0, "0-0", opts) @@ -5509,8 +5505,7 @@ func (suite *GlideTestSuite) TestXPending() { streamid_2, err := client.XAdd(key, [][]string{{"field2", "value2"}}) assert.NoError(suite.T(), err) - command = []string{"XReadGroup", "GROUP", groupName, consumer1, "STREAMS", key, ">"} - _, err = client.CustomCommand(command) + _, err = client.XReadGroup(groupName, consumer1, map[string]string{key: ">"}) assert.NoError(suite.T(), err) _, err = client.XAdd(key, [][]string{{"field3", "value3"}}) @@ -5520,8 +5515,7 @@ func (suite *GlideTestSuite) TestXPending() { streamid_5, err := client.XAdd(key, [][]string{{"field5", "value5"}}) assert.NoError(suite.T(), err) - command = []string{"XReadGroup", "GROUP", groupName, consumer2, "STREAMS", key, ">"} - _, err = client.CustomCommand(command) + _, err = client.XReadGroup(groupName, consumer2, map[string]string{key: ">"}) assert.NoError(suite.T(), err) expectedSummary := api.XPendingSummary{ @@ -5585,8 +5579,7 @@ func (suite *GlideTestSuite) TestXPending() { streamid_2, err := client.XAdd(key, [][]string{{"field2", "value2"}}) assert.NoError(suite.T(), err) - command = []string{"XReadGroup", "GROUP", groupName, consumer1, "STREAMS", key, ">"} - _, err = client.CustomCommand(command) + _, err = client.XReadGroup(groupName, consumer1, map[string]string{key: ">"}) assert.NoError(suite.T(), err) _, err = client.XAdd(key, [][]string{{"field3", "value3"}}) @@ -5596,8 +5589,7 @@ func (suite *GlideTestSuite) TestXPending() { streamid_5, err := client.XAdd(key, [][]string{{"field5", "value5"}}) assert.NoError(suite.T(), err) - command = []string{"XReadGroup", "GROUP", groupName, consumer2, "STREAMS", key, ">"} - _, err = client.CustomCommand(command) + _, err = client.XReadGroup(groupName, consumer2, map[string]string{key: ">"}) assert.NoError(suite.T(), err) expectedSummary := api.XPendingSummary{ @@ -5694,8 +5686,7 @@ func (suite *GlideTestSuite) TestXPendingFailures() { assert.Equal(suite.T(), 0, len(detailResult)) // read the entire stream for the consumer and mark messages as pending - command = []string{"XReadGroup", "GROUP", groupName, consumer1, "STREAMS", key, ">"} - _, err = client.CustomCommand(command) + _, err = client.XReadGroup(groupName, consumer1, map[string]string{key: ">"}) assert.NoError(suite.T(), err) // sanity check - expect some results: @@ -5847,8 +5838,7 @@ func (suite *GlideTestSuite) TestXPendingFailures() { assert.Equal(suite.T(), 0, len(detailResult)) // read the entire stream for the consumer and mark messages as pending - command = []string{"XReadGroup", "GROUP", groupName, consumer1, "STREAMS", key, ">"} - _, err = client.CustomCommand(command) + _, err = client.XReadGroup(groupName, consumer1, map[string]string{key: ">"}) assert.NoError(suite.T(), err) // sanity check - expect some results: