Skip to content

Commit

Permalink
use zscan instead of zrangebyscore
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmie Han <[email protected]>
  • Loading branch information
hanjm committed Jul 28, 2023
1 parent 464bba7 commit fab4c83
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 16 deletions.
36 changes: 26 additions & 10 deletions pkg/objmeta/backend_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,28 +130,44 @@ func (m *redisBackend) DelBlockAllMeta(ctx context.Context, blockID string) erro
// ListBlocks list block id list and invoke function f.
func (m *redisBackend) ListBlocks(ctx context.Context, f func(s []string) error) error {
key := genBlocksKey()
const pageSize = 512
var offset int64
const pageSize = 128
var (
cursor uint64
keys []string
)
for {
cmd := m.redisClient.B().Zrangebyscore().Key(key).Min("-inf").Max("+inf").
Limit(offset, pageSize).Build()
results, err := m.redisClient.Do(ctx, cmd).AsStrSlice()
cmd := m.redisClient.B().Zscan().Key(key).Cursor(cursor).Count(pageSize).Build()
results, err := m.redisClient.Do(ctx, cmd).ToArray()
if err != nil {
if rueidis.IsRedisNil(err) {
return nil
}
return errors.Wrapf(err, "redisClient.ZRangeByScore")
return errors.Wrapf(err, "redisClient.Zscan")
}
if len(results) == 0 {
return nil
if len(results) != 2 {
return errors.Wrapf(err, "parse zscan results length")
}
cursor, err = results[0].AsUint64()
if err != nil {
return errors.Wrapf(err, "parse zscan results[0]")
}
memberScores, err := results[1].AsZScores()
if err != nil {
return errors.Wrapf(err, "parse zscan results[1]")
}
keys = keys[:0]
for _, v := range memberScores {
keys = append(keys, v.Member)
}
offset += int64(len(results))
if err := f(results); err != nil {
if err := f(keys); err != nil {
if err == io.EOF {
return nil
}
return err
}
if cursor == 0 {
return nil
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/objmeta/backend_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Benchmark_redisBackend_GetBlockMeta(b *testing.B) {
redisConf := []byte(`addr: 127.0.0.1:6379`)
backend, err := NewRedisBackend(log.NewNopLogger(), prometheus.NewRegistry(), redisConf)
assert.NoError(b, err)
blockMeta := buildTestBlockMeta(b)
blockMeta := buildTestBlockMeta(b, 1)
err = backend.SetBlockMeta(ctx, blockMeta)
assert.NoError(b, err)
b.ReportAllocs()
Expand All @@ -40,7 +40,7 @@ func Benchmark_redisBackend_ExistsBlockMeta(b *testing.B) {
redisConf := []byte(`addr: 127.0.0.1:6379`)
backend, err := NewRedisBackend(log.NewNopLogger(), prometheus.NewRegistry(), redisConf)
assert.NoError(b, err)
blockMeta := buildTestBlockMeta(b)
blockMeta := buildTestBlockMeta(b, 1)
err = backend.SetBlockMeta(ctx, blockMeta)
assert.NoError(b, err)
b.ReportAllocs()
Expand All @@ -53,10 +53,10 @@ func Benchmark_redisBackend_ExistsBlockMeta(b *testing.B) {
})
}

func buildTestBlockMeta(t testing.TB) *objmetapb.BlockMeta {
func buildTestBlockMeta(t testing.TB, ms uint64) *objmetapb.BlockMeta {
metaJSON := metadata.Meta{
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(5, nil),
ULID: ulid.MustNew(ms, nil),
MinTime: 2424,
MaxTime: 134,
Version: 1,
Expand Down
18 changes: 16 additions & 2 deletions pkg/objmeta/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package objmeta
import (
"context"
"fmt"
"sort"
"testing"

"github.com/alicebob/miniredis/v2"
Expand All @@ -32,7 +33,7 @@ config:
`, miniRedis.Addr()))
s, err := NewServer(log.NewNopLogger(), prometheus.NewRegistry(), objStoreConfContentYaml, objMetaConfContentYaml, 32)
assert.NoError(t, err)
m1 := buildTestBlockMeta(t)
m1 := buildTestBlockMeta(t, 1)
t.Run("set", func(t *testing.T) {
_, err = s.SetBlockMeta(ctx, &objmetapb.SetBlockMetaRequest{BlockMeta: m1})
assert.NoError(t, err)
Expand All @@ -48,13 +49,26 @@ config:
assert.Equal(t, true, rsp.Exist)
})
t.Run("list", func(t *testing.T) {
var setBlockIDs []string
for i := 0; i < 200; i++ {
m := buildTestBlockMeta(t, uint64(i))
setBlockIDs = append(setBlockIDs, m.BlockId)
_, err = s.SetBlockMeta(ctx, &objmetapb.SetBlockMetaRequest{BlockMeta: m})
}
sort.Slice(setBlockIDs, func(i, j int) bool {
return setBlockIDs[i] < setBlockIDs[j]
})
assert.NoError(t, err)
var result []string
err := s.backend.ListBlocks(ctx, func(s []string) error {
result = append(result, s...)
return nil
})
sort.Slice(result, func(i, j int) bool {
return result[i] < result[j]
})
assert.NoError(t, err)
assert.Equal(t, []string{m1.BlockId}, result)
assert.Equal(t, setBlockIDs, result)
})
t.Run("del", func(t *testing.T) {
rsp, err := s.DelBlockMeta(ctx, &objmetapb.DelBlockMetaRequest{BlockId: m1.BlockId, Type: objmetapb.Type_TYPE_META})
Expand Down

0 comments on commit fab4c83

Please sign in to comment.