Skip to content

Commit

Permalink
add block lifecycle callback
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Jan 6, 2025
1 parent ca40906 commit 613d0da
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 0 deletions.
24 changes: 24 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ type BucketStore struct {
indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc

requestLoggerFunc RequestLoggerFunc

blockLifecycleCallback BlockLifecycleCallback
}

func (s *BucketStore) validate() error {
Expand Down Expand Up @@ -583,6 +585,24 @@ func WithIndexHeaderLazyDownloadStrategy(strategy indexheader.LazyDownloadIndexH
}
}

// BlockLifecycleCallback specifies callbacks that will be called during the lifecycle of a block.
type BlockLifecycleCallback interface {
// PreAdd is called before adding a block to indicate if the block needs to be added.
// A non nil error means the block should not be added.
PreAdd(meta metadata.Meta) error
}

type noopBlockLifecycleCallback struct{}

func (c noopBlockLifecycleCallback) PreAdd(meta metadata.Meta) error { return nil }

// WithBlockLifecycleCallback allows customizing callbacks of block lifecycle.
func WithBlockLifecycleCallback(c BlockLifecycleCallback) BucketStoreOption {
return func(s *BucketStore) {
s.blockLifecycleCallback = c
}
}

// NewBucketStore creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
func NewBucketStore(
Expand Down Expand Up @@ -628,6 +648,7 @@ func NewBucketStore(
sortingStrategy: sortingStrategyStore,
indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader,
requestLoggerFunc: NoopRequestLoggerFunc,
blockLifecycleCallback: &noopBlockLifecycleCallback{},
}

for _, option := range options {
Expand Down Expand Up @@ -683,6 +704,9 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
wg.Add(1)
go func() {
for meta := range blockc {
if preAddErr := s.blockLifecycleCallback.PreAdd(*meta); preAddErr != nil {
continue
}
if err := s.addBlock(ctx, meta); err != nil {
continue
}
Expand Down
65 changes: 65 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4115,3 +4115,68 @@ func TestBucketStoreMetadataLimit(t *testing.T) {
})
}
}

func TestBucketStoreBlockLifecycleCallback(t *testing.T) {
t.Parallel()

tb := testutil.NewTB(t)

tmpDir := t.TempDir()

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(tb, err)
defer func() { testutil.Ok(tb, bkt.Close()) }()

blockID1 := uploadTestBlock(tb, tmpDir, bkt, 300)
blockID2 := uploadTestBlock(tb, tmpDir, bkt, 300)

instrBkt := objstore.WithNoopInstr(bkt)
logger := log.NewNopLogger()

// Instance a real bucket store we'll use to query the series.
baseBlockIDsFetcher := block.NewConcurrentLister(logger, instrBkt)
fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, baseBlockIDsFetcher, tmpDir, nil, nil)
testutil.Ok(tb, err)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, nil, storecache.InMemoryIndexCacheConfig{})
testutil.Ok(tb, err)

store, err := NewBucketStore(
instrBkt,
fetcher,
tmpDir,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
NewBytesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
10,
false,
DefaultPostingOffsetInMemorySampling,
true,
false,
0,
WithLogger(logger),
WithIndexCache(indexCache),
WithBlockLifecycleCallback(&mockBlockLifecycleCallback{allowed: []ulid.ULID{blockID2}}),
)
testutil.Ok(tb, err)
testutil.Ok(tb, store.SyncBlocks(context.Background()))

_, ok := store.blocks[blockID2]
testutil.Equals(t, true, ok)

_, ok = store.blocks[blockID1]
testutil.Equals(t, false, ok)
}

type mockBlockLifecycleCallback struct {
allowed []ulid.ULID
}

func (c *mockBlockLifecycleCallback) PreAdd(meta metadata.Meta) error {
contains := slices.Contains(c.allowed, meta.ULID)
if !contains {
return fmt.Errorf("don't add")
}
return nil
}

0 comments on commit 613d0da

Please sign in to comment.