Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: remove unused storepb.StoreServer #6958

Merged
merged 3 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/grafana/mimir/pkg/storegateway/indexcache"
"github.com/grafana/mimir/pkg/storegateway/indexheader"
streamindex "github.com/grafana/mimir/pkg/storegateway/indexheader/index"
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/pool"
Expand Down Expand Up @@ -536,8 +537,8 @@ type seriesChunks struct {
chks []storepb.AggrChunk
}

// Series implements the storepb.StoreServer interface.
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) {
// Series implements the storegatewaypb.StoreGatewayServer interface.
func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) (err error) {
if req.SkipChunks {
// We don't do the streaming call if we are not requesting the chunks.
req.StreamingChunksBatchSize = 0
Expand Down Expand Up @@ -710,7 +711,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
// this function also sends the hints and the stats.
func (s *BucketStore) sendStreamingSeriesLabelsAndStats(
req *storepb.SeriesRequest,
srv storepb.Store_SeriesServer,
srv storegatewaypb.StoreGateway_SeriesServer,
stats *safeQueryStats,
seriesSet storepb.SeriesSet,
) (numSeries int, err error) {
Expand Down Expand Up @@ -767,7 +768,7 @@ func (s *BucketStore) sendStreamingSeriesLabelsAndStats(

func (s *BucketStore) sendStreamingChunks(
req *storepb.SeriesRequest,
srv storepb.Store_SeriesServer,
srv storegatewaypb.StoreGateway_SeriesServer,
it seriesChunksSetIterator,
stats *safeQueryStats,
totalSeriesCount int,
Expand Down Expand Up @@ -875,7 +876,7 @@ func (s *BucketStore) sendStreamingChunks(

func (s *BucketStore) sendSeriesChunks(
req *storepb.SeriesRequest,
srv storepb.Store_SeriesServer,
srv storegatewaypb.StoreGateway_SeriesServer,
seriesSet storepb.SeriesSet,
stats *safeQueryStats,
) error {
Expand Down Expand Up @@ -920,7 +921,7 @@ func (s *BucketStore) sendSeriesChunks(
return nil
}

func (s *BucketStore) sendMessage(typ string, srv storepb.Store_SeriesServer, msg interface{}, encodeDuration, sendDuration *time.Duration) error {
func (s *BucketStore) sendMessage(typ string, srv storegatewaypb.StoreGateway_SeriesServer, msg interface{}, encodeDuration, sendDuration *time.Duration) error {
// We encode it ourselves into a PreparedMsg in order to measure the time it takes.
encodeBegin := time.Now()
pmsg := &grpc.PreparedMsg{}
Expand All @@ -940,7 +941,7 @@ func (s *BucketStore) sendMessage(typ string, srv storepb.Store_SeriesServer, ms
return nil
}

func (s *BucketStore) sendHints(srv storepb.Store_SeriesServer, resHints *hintspb.SeriesResponseHints) error {
func (s *BucketStore) sendHints(srv storegatewaypb.StoreGateway_SeriesServer, resHints *hintspb.SeriesResponseHints) error {
var anyHints *types.Any
var err error
if anyHints, err = types.MarshalAny(resHints); err != nil {
Expand All @@ -954,7 +955,7 @@ func (s *BucketStore) sendHints(srv storepb.Store_SeriesServer, resHints *hintsp
return nil
}

func (s *BucketStore) sendStats(srv storepb.Store_SeriesServer, stats *safeQueryStats) error {
func (s *BucketStore) sendStats(srv storegatewaypb.StoreGateway_SeriesServer, stats *safeQueryStats) error {
var encodeDuration, sendDuration time.Duration
defer stats.update(func(stats *queryStats) {
stats.streamingSeriesSendResponseDuration += sendDuration
Expand Down Expand Up @@ -1285,7 +1286,7 @@ func (s *BucketStore) openBlocksForReading(ctx context.Context, skipChunks bool,
return blocks, indexReaders, chunkReaders
}

// LabelNames implements the storepb.StoreServer interface.
// LabelNames implements the storegatewaypb.StoreGatewayServer interface.
func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
reqSeriesMatchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
if err != nil {
Expand Down Expand Up @@ -1476,7 +1477,7 @@ func storeCachedLabelNames(ctx context.Context, indexCache indexcache.IndexCache
indexCache.StoreLabelNames(userID, blockID, entry.MatchersKey, data)
}

// LabelValues implements the storepb.StoreServer interface.
// LabelValues implements the storegatewaypb.StoreGatewayServer interface.
func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
reqSeriesMatchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func testBucketStore_e2e(t *testing.T, ctx context.Context, s *storeSuite, addit
assert.NoError(t, err)
assert.Equal(t, []string{"1", "2"}, vals.Values)

srv := newBucketStoreTestServer(t, s.store)
srv := newStoreGatewayTestServer(t, s.store)

// TODO(bwplotka): Add those test cases to TSDB querier_test.go as well, there are no tests for matching.
testCases := []testBucketStoreCase{
Expand Down Expand Up @@ -656,7 +656,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
StreamingChunksBatchSize: uint64(streamingBatchSize),
}

srv := newBucketStoreTestServer(t, s.store)
srv := newStoreGatewayTestServer(t, s.store)
_, _, _, _, err := srv.Series(context.Background(), req)

if testData.expectedErr == "" {
Expand Down Expand Up @@ -911,7 +911,7 @@ func TestBucketStore_ValueTypes_e2e(t *testing.T) {
StreamingChunksBatchSize: uint64(streamingBatchSize),
}

srv := newBucketStoreTestServer(t, s.store)
srv := newStoreGatewayTestServer(t, s.store)
seriesSet, _, _, _, err := srv.Series(ctx, req)
require.NoError(t, err)

Expand Down
34 changes: 3 additions & 31 deletions pkg/storegateway/bucket_store_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,7 @@ type storeTestServer struct {
// requestSeries is the function to call the Series() API endpoint
// via gRPC. The actual implementation depends whether we're calling
// the StoreGateway or BucketStore API endpoint.
requestSeries func(ctx context.Context, conn *grpc.ClientConn, req *storepb.SeriesRequest) (storepb.Store_SeriesClient, error)
}

func newBucketStoreTestServer(t testing.TB, store storepb.StoreServer) *storeTestServer {
listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
t.Cleanup(func() {
_ = listener.Close()
})

s := &storeTestServer{
server: grpc.NewServer(),
serverListener: listener,
requestSeries: func(ctx context.Context, conn *grpc.ClientConn, req *storepb.SeriesRequest) (storepb.Store_SeriesClient, error) {
client := storepb.NewStoreClient(conn)
return client.Series(ctx, req)
},
}

storepb.RegisterStoreServer(s.server, store)

go func() {
_ = s.server.Serve(listener)
}()

// Stop the gRPC server once the test has done.
t.Cleanup(s.server.GracefulStop)

return s
requestSeries func(ctx context.Context, conn *grpc.ClientConn, req *storepb.SeriesRequest) (storegatewaypb.StoreGateway_SeriesClient, error)
}

func newStoreGatewayTestServer(t testing.TB, store storegatewaypb.StoreGatewayServer) *storeTestServer {
Expand All @@ -74,7 +46,7 @@ func newStoreGatewayTestServer(t testing.TB, store storegatewaypb.StoreGatewaySe
s := &storeTestServer{
server: grpc.NewServer(),
serverListener: listener,
requestSeries: func(ctx context.Context, conn *grpc.ClientConn, req *storepb.SeriesRequest) (storepb.Store_SeriesClient, error) {
requestSeries: func(ctx context.Context, conn *grpc.ClientConn, req *storepb.SeriesRequest) (storegatewaypb.StoreGateway_SeriesClient, error) {
client := storegatewaypb.NewCustomStoreGatewayClient(conn)
return client.Series(ctx, req)
},
Expand All @@ -97,7 +69,7 @@ func newStoreGatewayTestServer(t testing.TB, store storegatewaypb.StoreGatewaySe
func (s *storeTestServer) Series(ctx context.Context, req *storepb.SeriesRequest) (seriesSet []*storepb.Series, warnings annotations.Annotations, hints hintspb.SeriesResponseHints, estimatedChunks uint64, err error) {
var (
conn *grpc.ClientConn
stream storepb.Store_SeriesClient
stream storegatewaypb.StoreGateway_SeriesClient
res *storepb.SeriesResponse
streamingSeriesSet []*storepb.StreamingSeries
)
Expand Down
15 changes: 8 additions & 7 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/storegateway/indexcache"
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/spanlogger"
Expand Down Expand Up @@ -309,8 +310,8 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte
return errs.Err()
}

// Series implements the storepb.StoreServer interface, making a series request to the underlying user bucket store.
func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
// Series implements the storegatewaypb.StoreGatewayServer interface, making a series request to the underlying user bucket store.
func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error {
spanLog, spanCtx := spanlogger.NewWithLogger(srv.Context(), u.logger, "BucketStores.Series")
defer spanLog.Span.Finish()

Expand All @@ -325,12 +326,12 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
}

return store.Series(req, spanSeriesServer{
Store_SeriesServer: srv,
ctx: spanCtx,
StoreGateway_SeriesServer: srv,
ctx: spanCtx,
})
}

// LabelNames implements the storepb.StoreServer interface.
// LabelNames implements the storegatewaypb.StoreGatewayServer interface.
func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
spanLog, spanCtx := spanlogger.NewWithLogger(ctx, u.logger, "BucketStores.LabelNames")
defer spanLog.Span.Finish()
Expand All @@ -348,7 +349,7 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe
return store.LabelNames(ctx, req)
}

// LabelValues implements the storepb.StoreServer interface.
// LabelValues implements the storegatewaypb.StoreGatewayServer interface.
func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
spanLog, spanCtx := spanlogger.NewWithLogger(ctx, u.logger, "BucketStores.LabelValues")
defer spanLog.Span.Finish()
Expand Down Expand Up @@ -596,7 +597,7 @@ func getUserIDFromGRPCContext(ctx context.Context) string {
}

type spanSeriesServer struct {
storepb.Store_SeriesServer
storegatewaypb.StoreGateway_SeriesServer

ctx context.Context
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func querySeries(t *testing.T, stores *BucketStores, userID, metricName string,
}},
}

srv := newBucketStoreTestServer(t, stores)
srv := newStoreGatewayTestServer(t, stores)
seriesSet, warnings, _, _, err := srv.Series(setUserIDToGRPCContext(context.Background(), userID), req)

return seriesSet, warnings, err
Expand Down
16 changes: 8 additions & 8 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,7 @@ func TestBucketStore_Series_Concurrency(t *testing.T) {
Hints: marshalledHints,
StreamingChunksBatchSize: uint64(streamBatchSize),
}
srv := newBucketStoreTestServer(t, store)
srv := newStoreGatewayTestServer(t, store)
seriesSet, warnings, _, _, err := srv.Series(context.Background(), req)
require.NoError(t, err)
require.Equal(t, 0, len(warnings), "%v", warnings)
Expand Down Expand Up @@ -1789,7 +1789,7 @@ func TestBucketStore_Series_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
maxSeriesPerBatch: 65536,
}

srv := newBucketStoreTestServer(t, store)
srv := newStoreGatewayTestServer(t, store)

t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {
seriesSet, warnings, _, _, err := srv.Series(context.Background(), &storepb.SeriesRequest{
Expand Down Expand Up @@ -1970,7 +1970,7 @@ func TestBucketStore_Series_ErrorUnmarshallingRequestHints(t *testing.T) {
Hints: mustMarshalAny(&hintspb.SeriesResponseHints{}),
}

srv := newBucketStoreTestServer(t, store)
srv := newStoreGatewayTestServer(t, store)
_, _, _, _, err = srv.Series(context.Background(), req)
assert.Error(t, err)
assert.Equal(t, true, regexp.MustCompile(".*unmarshal series request hints.*").MatchString(err.Error()))
Expand Down Expand Up @@ -2027,7 +2027,7 @@ func TestBucketStore_Series_CanceledRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

srv := newBucketStoreTestServer(t, store)
srv := newStoreGatewayTestServer(t, store)
_, _, _, _, err = srv.Series(ctx, req)
assert.Error(t, err)
s, ok := status.FromError(err)
Expand Down Expand Up @@ -2090,7 +2090,7 @@ func TestBucketStore_Series_InvalidRequest(t *testing.T) {
},
}

srv := newBucketStoreTestServer(t, store)
srv := newStoreGatewayTestServer(t, store)
_, _, _, _, err = srv.Series(context.Background(), req)
assert.Error(t, err)
s, ok := status.FromError(err)
Expand Down Expand Up @@ -2209,7 +2209,7 @@ func testBucketStoreSeriesBlockWithMultipleChunks(
assert.NoError(t, err)
assert.NoError(t, store.SyncBlocks(context.Background()))

srv := newBucketStoreTestServer(t, store)
srv := newStoreGatewayTestServer(t, store)

tests := map[string]struct {
reqMinTime int64
Expand Down Expand Up @@ -2373,7 +2373,7 @@ func TestBucketStore_Series_Limits(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, store.SyncBlocks(ctx))

srv := newBucketStoreTestServer(t, store)
srv := newStoreGatewayTestServer(t, store)
for _, streamingBatchSize := range []int{0, 1, 5} {
t.Run(fmt.Sprintf("streamingBatchSize: %d", streamingBatchSize), func(t *testing.T) {
req := &storepb.SeriesRequest{
Expand Down Expand Up @@ -2951,7 +2951,7 @@ type seriesCase struct {
func runTestServerSeries(t test.TB, store *BucketStore, streamingBatchSize int, cases ...*seriesCase) {
for _, c := range cases {
t.Run(c.Name, func(t test.TB) {
srv := newBucketStoreTestServer(t, store)
srv := newStoreGatewayTestServer(t, store)

c.Req.StreamingChunksBatchSize = uint64(streamingBatchSize)
t.ResetTimer()
Expand Down
Loading