Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go: XAUTOCLAIM. #2955

Merged
merged 6 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,257 @@ func (client *baseClient) XLen(key string) (int64, error) {
return handleIntResponse(result)
}

// Transfers ownership of pending stream entries that match the specified criteria.
//
// Since:
//
// Valkey 6.2.0 and above.
//
// See [valkey.io] for more details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The group consumer.
// minIdleTime - The minimum idle time for the message to be claimed.
// start - Filters the claimed entries to those that have an ID equal or greater than the specified value.
//
// Return value:
//
// An object containing the following elements:
// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is
// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if
// the entire stream was scanned.
// - A map of the claimed entries.
// - If you are using Valkey 7.0.0 or above, the response will also include an array containing
// the message IDs that were in the Pending Entries List but no longer exist in the stream.
// These IDs are deleted from the Pending Entries List.
//
// Example:
//
// result, err := client.XAutoClaim("myStream", "myGroup", "myConsumer", 42, "0-0")
// result:
// // &{
// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call
// // map[
// // "1609338752495-0": [ // claimed entries
// // ["field 1", "value 1"]
// // ["field 2", "value 2"]
// // ]
// // ]
// // [
// // "1594324506465-0", // array of IDs of deleted messages,
// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above
// // ]
// // }
//
// [valkey.io]: https://valkey.io/commands/xautoclaim/
func (client *baseClient) XAutoClaim(
key string,
group string,
consumer string,
minIdleTime int64,
start string,
) (*XAutoClaimResponse, error) {
return client.XAutoClaimWithOptions(key, group, consumer, minIdleTime, start, nil)
}

// Transfers ownership of pending stream entries that match the specified criteria.
//
// Since:
//
// Valkey 6.2.0 and above.
//
// See [valkey.io] for more details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The group consumer.
// minIdleTime - The minimum idle time for the message to be claimed.
// start - Filters the claimed entries to those that have an ID equal or greater than the specified value.
// options - Options detailing how to read the stream.
//
// Return value:
//
// An object containing the following elements:
// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is
// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if
// the entire stream was scanned.
// - A map of the claimed entries.
// - If you are using Valkey 7.0.0 or above, the response will also include an array containing
// the message IDs that were in the Pending Entries List but no longer exist in the stream.
// These IDs are deleted from the Pending Entries List.
//
// Example:
//
// opts := options.NewXAutoClaimOptionsWithCount(1)
// result, err := client.XAutoClaimWithOptions("myStream", "myGroup", "myConsumer", 42, "0-0", opts)
// result:
// // &{
// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call
// // map[
// // "1609338752495-0": [ // claimed entries
// // ["field 1", "value 1"]
// // ["field 2", "value 2"]
// // ]
// // ]
// // [
// // "1594324506465-0", // array of IDs of deleted messages,
// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above
// // ]
// // }
//
// [valkey.io]: https://valkey.io/commands/xautoclaim/
func (client *baseClient) XAutoClaimWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
start string,
options *options.XAutoClaimOptions,
) (*XAutoClaimResponse, error) {
args := []string{key, group, consumer, utils.IntToString(minIdleTime), start}
if options != nil {
optArgs, err := options.ToArgs()
if err != nil {
return nil, err
}
args = append(args, optArgs...)
}
result, err := client.executeCommand(C.XAutoClaim, args)
if err != nil {
return nil, err
}
return handleXAutoClaimResponse(result)
}

// Transfers ownership of pending stream entries that match the specified criteria.
//
// Since:
//
// Valkey 6.2.0 and above.
//
// See [valkey.io] for more details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The group consumer.
// minIdleTime - The minimum idle time for the message to be claimed.
// start - Filters the claimed entries to those that have an ID equal or greater than the specified value.
//
// Return value:
//
// An object containing the following elements:
// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is
// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if
// the entire stream was scanned.
// - An array of IDs for the claimed entries.
// - If you are using Valkey 7.0.0 or above, the response will also include an array containing
// the message IDs that were in the Pending Entries List but no longer exist in the stream.
// These IDs are deleted from the Pending Entries List.
//
// Example:
//
// result, err := client.XAutoClaimJustId("myStream", "myGroup", "myConsumer", 42, "0-0")
// result:
// // &{
// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call
// // [
// // "1609338752495-0", // claimed entries
// // "1609338752495-1"
// // ]
// // [
// // "1594324506465-0", // array of IDs of deleted messages,
// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above
// // ]
// // }
//
// [valkey.io]: https://valkey.io/commands/xautoclaim/
func (client *baseClient) XAutoClaimJustId(
key string,
group string,
consumer string,
minIdleTime int64,
start string,
) (*XAutoClaimJustIdResponse, error) {
return client.XAutoClaimJustIdWithOptions(key, group, consumer, minIdleTime, start, nil)
}

// Transfers ownership of pending stream entries that match the specified criteria.
//
// Since:
//
// Valkey 6.2.0 and above.
//
// See [valkey.io] for more details.
//
// Parameters:
//
// key - The key of the stream.
// group - The consumer group name.
// consumer - The group consumer.
// minIdleTime - The minimum idle time for the message to be claimed.
// start - Filters the claimed entries to those that have an ID equal or greater than the specified value.
// options - Options detailing how to read the stream.
//
// Return value:
//
// An object containing the following elements:
// - A stream ID to be used as the start argument for the next call to `XAUTOCLAIM`. This ID is
// equivalent to the next ID in the stream after the entries that were scanned, or "0-0" if
// the entire stream was scanned.
// - An array of IDs for the claimed entries.
// - If you are using Valkey 7.0.0 or above, the response will also include an array containing
// the message IDs that were in the Pending Entries List but no longer exist in the stream.
// These IDs are deleted from the Pending Entries List.
//
// Example:
//
// opts := options.NewXAutoClaimOptionsWithCount(1)
// result, err := client.XAutoClaimJustIdWithOptions("myStream", "myGroup", "myConsumer", 42, "0-0", opts)
// result:
// // &{
// // "1609338788321-0" // value to be used as `start` argument for the next `xautoclaim` call
// // [
// // "1609338752495-0", // claimed entries
// // "1609338752495-1"
// // ]
// // [
// // "1594324506465-0", // array of IDs of deleted messages,
// // "1594568784150-0" // included in the response only on valkey 7.0.0 and above
// // ]
// // }
//
// [valkey.io]: https://valkey.io/commands/xautoclaim/
func (client *baseClient) XAutoClaimJustIdWithOptions(
key string,
group string,
consumer string,
minIdleTime int64,
start string,
options *options.XAutoClaimOptions,
) (*XAutoClaimJustIdResponse, error) {
args := []string{key, group, consumer, utils.IntToString(minIdleTime), start}
if options != nil {
optArgs, err := options.ToArgs()
if err != nil {
return nil, err
}
args = append(args, optArgs...)
}
args = append(args, "JUSTID")
result, err := client.executeCommand(C.XAutoClaim, args)
if err != nil {
return nil, err
}
return handleXAutoClaimJustIdResponse(result)
}

// Removes the specified entries by id from a stream, and returns the number of entries deleted.
//
// See [valkey.io] for details.
Expand Down
14 changes: 14 additions & 0 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ func (xTrimOptions *XTrimOptions) ToArgs() ([]string, error) {
return args, nil
}

// Optional arguments for `XAutoClaim` in [StreamCommands]
type XAutoClaimOptions struct {
count int64
}

// Option to trim the stream according to minimum ID.
func NewXAutoClaimOptionsWithCount(count int64) *XAutoClaimOptions {
return &XAutoClaimOptions{count}
}

func (xacp *XAutoClaimOptions) ToArgs() ([]string, error) {
return []string{"COUNT", utils.IntToString(xacp.count)}, nil
}

// Optional arguments for `XRead` in [StreamCommands]
type XReadOptions struct {
count, block int64
Expand Down
Loading
Loading