Skip to content

Commit

Permalink
GO: xpending command (valkey-io#2957)
Browse files Browse the repository at this point in the history
* GO: add xpending command

Signed-off-by: jbrinkman <[email protected]>
  • Loading branch information
jbrinkman authored and eifrah-aws committed Jan 23, 2025
1 parent 28c64e4 commit a5be4a3
Show file tree
Hide file tree
Showing 7 changed files with 798 additions and 3 deletions.
6 changes: 3 additions & 3 deletions go/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ unit-test:
mkdir -p reports
set -o pipefail; \
LD_LIBRARY_PATH=$(shell find . -name libglide_rs.so|grep -w release|tail -1|xargs dirname|xargs readlink -f):${LD_LIBRARY_PATH} \
go test -v -race ./... -skip TestGlideTestSuite $(if $(test-filter), -run $(test-filter)) \
go test -v -race ./... -skip TestGlideTestSuite $(if $(test-filter), -testify.m $(test-filter)) \
| tee >(go tool test2json -t -p github.com/valkey-io/valkey-glide/go/glide/utils | go-test-report -o reports/unit-tests.html -t unit-test > /dev/null)

# integration tests - run subtask with skipping modules tests
integ-test: export TEST_FILTER = -skip TestGlideTestSuite/TestModule $(if $(test-filter), -run $(test-filter))
integ-test: export TEST_FILTER = -skip TestGlideTestSuite/TestModule $(if $(test-filter), -testify.m $(test-filter))
integ-test: __it

# modules tests - run substask with default filter
modules-test: export TEST_FILTER = $(if $(test-filter), -run $(test-filter), -run TestGlideTestSuite/TestModule)
modules-test: export TEST_FILTER = $(if $(test-filter), -run $(test-filter), -testify.m TestGlideTestSuite/TestModule)
modules-test: __it

__it:
Expand Down
90 changes: 90 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1905,3 +1905,93 @@ func (client *baseClient) ZScanWithOptions(
}
return handleScanResponse(result)
}

// Returns stream message summary information for pending messages matching a stream and group.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
//
// Return value:
// An XPendingSummary struct that includes a summary with the following fields:
//
// NumOfMessages: The total number of pending messages for this consumer group.
// StartId: The smallest ID among the pending messages or nil if no pending messages exist.
// EndId: The greatest ID among the pending messages or nil if no pending messages exists.
// GroupConsumers: An array of ConsumerPendingMessages with the following fields:
// ConsumerName: The name of the consumer.
// MessageCount: The number of pending messages for this consumer.
//
// Example
//
// result, err := client.XPending("myStream", "myGroup")
// if err != nil {
// return err
// }
// fmt.Println("Number of pending messages: ", result.NumOfMessages)
// fmt.Println("Start and End ID of messages: ", result.StartId, result.EndId)
// for _, consumer := range result.ConsumerMessages {
// fmt.Printf("Consumer messages: %s: $v\n", consumer.ConsumerName, consumer.MessageCount)
// }
//
// [valkey.io]: https://valkey.io/commands/xpending/
func (client *baseClient) XPending(key string, group string) (XPendingSummary, error) {
result, err := client.executeCommand(C.XPending, []string{key, group})
if err != nil {
return XPendingSummary{}, err
}

return handleXPendingSummaryResponse(result)
}

// Returns stream message summary information for pending messages matching a given range of IDs.
//
// See [valkey.io] for details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// opts - The options for the command. See [options.XPendingOptions] for details.
//
// Return value:
// A slice of XPendingDetail structs, where each detail struct includes the following fields:
//
// Id - The ID of the pending message.
// ConsumerName - The name of the consumer that fetched the message and has still to acknowledge it.
// IdleTime - The time in milliseconds since the last time the message was delivered to the consumer.
// DeliveryCount - The number of times this message was delivered.
//
// Example
//
// detailResult, err := client.XPendingWithOptions(key, groupName, options.NewXPendingOptions("-", "+", 10))
// if err != nil {
// return err
// }
// fmt.Println("=========================")
// for _, detail := range detailResult {
// fmt.Println(detail.Id)
// fmt.Println(detail.ConsumerName)
// fmt.Println(detail.IdleTime)
// fmt.Println(detail.DeliveryCount)
// fmt.Println("=========================")
// }
//
// [valkey.io]: https://valkey.io/commands/xpending/
func (client *baseClient) XPendingWithOptions(
key string,
group string,
opts *options.XPendingOptions,
) ([]XPendingDetail, error) {
optionArgs, _ := opts.ToArgs()
args := append([]string{key, group}, optionArgs...)

result, err := client.executeCommand(C.XPending, args)
if err != nil {
return nil, err
}
return handleXPendingDetailResponse(result)
}
54 changes: 54 additions & 0 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,57 @@ func (xro *XReadOptions) ToArgs() ([]string, error) {
}
return args, nil
}

// Optional arguments for `XPending` in [StreamCommands]
type XPendingOptions struct {
minIdleTime int64
start string
end string
count int64
consumer string
}

// Create new empty `XPendingOptions`. The `start`, `end` and `count` arguments are required.
func NewXPendingOptions(start string, end string, count int64) *XPendingOptions {
options := &XPendingOptions{}
options.start = start
options.end = end
options.count = count
return options
}

// SetMinIdleTime sets the minimum idle time for the XPendingOptions.
// minIdleTime is the amount of time (in milliseconds) that a message must be idle to be considered.
// It returns the updated XPendingOptions.
func (xpo *XPendingOptions) SetMinIdleTime(minIdleTime int64) *XPendingOptions {
xpo.minIdleTime = minIdleTime
return xpo
}

// SetConsumer sets the consumer for the XPendingOptions.
// consumer is the name of the consumer to filter the pending messages.
// It returns the updated XPendingOptions.
func (xpo *XPendingOptions) SetConsumer(consumer string) *XPendingOptions {
xpo.consumer = consumer
return xpo
}

func (xpo *XPendingOptions) ToArgs() ([]string, error) {
args := []string{}

// if minIdleTime is set, we need to add an `IDLE` argument along with the minIdleTime
if xpo.minIdleTime > 0 {
args = append(args, "IDLE")
args = append(args, utils.IntToString(xpo.minIdleTime))
}

args = append(args, xpo.start)
args = append(args, xpo.end)
args = append(args, utils.IntToString(xpo.count))

if xpo.consumer != "" {
args = append(args, xpo.consumer)
}

return args, nil
}
92 changes: 92 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import "C"
import (
"fmt"
"reflect"
"strconv"
"unsafe"
)

Expand Down Expand Up @@ -601,3 +602,94 @@ func handleXReadResponse(response *C.struct_CommandResponse) (map[string]map[str
}
return nil, &RequestError{fmt.Sprintf("unexpected type received: %T", res)}
}

func handleXPendingSummaryResponse(response *C.struct_CommandResponse) (XPendingSummary, error) {
defer C.free_command_response(response)

typeErr := checkResponseType(response, C.Array, true)
if typeErr != nil {
return CreateNilXPendingSummary(), typeErr
}

slice, err := parseArray(response)
if err != nil {
return CreateNilXPendingSummary(), err
}

arr := slice.([]interface{})
NumOfMessages := arr[0].(int64)
var StartId, EndId Result[string]
if arr[1] == nil {
StartId = CreateNilStringResult()
} else {
StartId = CreateStringResult(arr[1].(string))
}
if arr[2] == nil {
EndId = CreateNilStringResult()
} else {
EndId = CreateStringResult(arr[2].(string))
}

if pendingMessages, ok := arr[3].([]interface{}); ok {
var ConsumerPendingMessages []ConsumerPendingMessage
for _, msg := range pendingMessages {
consumerMessage := msg.([]interface{})
count, err := strconv.ParseInt(consumerMessage[1].(string), 10, 64)
if err == nil {
ConsumerPendingMessages = append(ConsumerPendingMessages, ConsumerPendingMessage{
ConsumerName: consumerMessage[0].(string),
MessageCount: count,
})
}
}
return XPendingSummary{NumOfMessages, StartId, EndId, ConsumerPendingMessages}, nil
} else {
return XPendingSummary{NumOfMessages, StartId, EndId, make([]ConsumerPendingMessage, 0)}, nil
}
}

func handleXPendingDetailResponse(response *C.struct_CommandResponse) ([]XPendingDetail, error) {
// response should be [][]interface{}

defer C.free_command_response(response)

// TODO: Not sure if this is correct for a nill response
if response == nil || response.response_type == uint32(C.Null) {
return make([]XPendingDetail, 0), nil
}

typeErr := checkResponseType(response, C.Array, true)
if typeErr != nil {
return make([]XPendingDetail, 0), typeErr
}

// parse first level of array
slice, err := parseArray(response)
arr := slice.([]interface{})

if err != nil {
return make([]XPendingDetail, 0), err
}

pendingDetails := make([]XPendingDetail, 0, len(arr))

for _, message := range arr {
switch detail := message.(type) {
case []interface{}:
pDetail := XPendingDetail{
Id: detail[0].(string),
ConsumerName: detail[1].(string),
IdleTime: detail[2].(int64),
DeliveryCount: detail[3].(int64),
}
pendingDetails = append(pendingDetails, pDetail)

case XPendingDetail:
pendingDetails = append(pendingDetails, detail)
default:
fmt.Printf("handleXPendingDetailResponse - unhandled type: %s\n", reflect.TypeOf(detail))
}
}

return pendingDetails, nil
}
47 changes: 47 additions & 0 deletions go/api/response_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,50 @@ func CreateEmptyClusterValue() ClusterValue[interface{}] {
value: Result[interface{}]{val: empty, isNil: true},
}
}

// XPendingSummary represents a summary of pending messages in a stream group.
// It includes the total number of pending messages, the ID of the first and last pending messages,
// and a list of consumer pending messages.
type XPendingSummary struct {
// NumOfMessages is the total number of pending messages in the stream group.
NumOfMessages int64

// StartId is the ID of the first pending message in the stream group.
StartId Result[string]

// EndId is the ID of the last pending message in the stream group.
EndId Result[string]

// ConsumerMessages is a list of pending messages for each consumer in the stream group.
ConsumerMessages []ConsumerPendingMessage
}

// ConsumerPendingMessage represents a pending message for a consumer in a Redis stream group.
// It includes the consumer's name and the count of pending messages for that consumer.
type ConsumerPendingMessage struct {
// ConsumerName is the name of the consumer.
ConsumerName string

// MessageCount is the number of pending messages for the consumer.
MessageCount int64
}

// XPendingDetail represents the details of a pending message in a stream group.
// It includes the message ID, the consumer's name, the idle time, and the delivery count.
type XPendingDetail struct {
// Id is the ID of the pending message.
Id string

// ConsumerName is the name of the consumer who has the pending message.
ConsumerName string

// IdleTime is the amount of time (in milliseconds) that the message has been idle.
IdleTime int64

// DeliveryCount is the number of times the message has been delivered.
DeliveryCount int64
}

func CreateNilXPendingSummary() XPendingSummary {
return XPendingSummary{0, CreateNilStringResult(), CreateNilStringResult(), make([]ConsumerPendingMessage, 0)}
}
4 changes: 4 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,8 @@ type StreamCommands interface {
XReadWithOptions(keysAndIds map[string]string, options *options.XReadOptions) (map[string]map[string][][]string, error)

XDel(key string, ids []string) (int64, error)

XPending(key string, group string) (XPendingSummary, error)

XPendingWithOptions(key string, group string, options *options.XPendingOptions) ([]XPendingDetail, error)
}
Loading

0 comments on commit a5be4a3

Please sign in to comment.