Skip to content

Commit

Permalink
Read streaming chunks from storegateway in queriers
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome committed Jun 8, 2023
1 parent d8a29b7 commit 8488231
Show file tree
Hide file tree
Showing 7 changed files with 632 additions and 81 deletions.
28 changes: 16 additions & 12 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ func convertMatchersToLabelMatcher(matchers []*labels.Matcher) []storepb.LabelMa

// Implementation of storage.SeriesSet, based on individual responses from store client.
type blockQuerierSeriesSet struct {
series []*storepb.Series
warnings storage.Warnings
series []*storepb.Series

// next response to process
next int
Expand Down Expand Up @@ -88,7 +87,7 @@ func (bqss *blockQuerierSeriesSet) Err() error {
}

func (bqss *blockQuerierSeriesSet) Warnings() storage.Warnings {
return bqss.warnings
return nil
}

// newBlockQuerierSeries makes a new blockQuerierSeries. Input labels must be already sorted by name.
Expand All @@ -115,9 +114,18 @@ func (bqs *blockQuerierSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
return series.NewErrIterator(errors.New("no chunks"))
}

its := make([]iteratorWithMaxTime, 0, len(bqs.chunks))
it, err := newBlockQuerierSeriesIterator(bqs.Labels(), bqs.chunks)
if err != nil {
return series.NewErrIterator(err)
}

return it
}

func newBlockQuerierSeriesIterator(labels labels.Labels, chunks []storepb.AggrChunk) (*blockQuerierSeriesIterator, error) {
its := make([]iteratorWithMaxTime, 0, len(chunks))

for _, c := range bqs.chunks {
for _, c := range chunks {
var (
ch chunkenc.Chunk
err error
Expand All @@ -130,22 +138,18 @@ func (bqs *blockQuerierSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
case storepb.Chunk_FloatHistogram:
ch, err = chunkenc.FromData(chunkenc.EncFloatHistogram, c.Raw.Data)
default:
return series.NewErrIterator(errors.Wrapf(err, "failed to initialize chunk from unknown type (%v) encoded raw data (series: %v min time: %d max time: %d)", c.Raw.Type, bqs.Labels(), c.MinTime, c.MaxTime))
return nil, errors.Wrapf(err, "failed to initialize chunk from unknown type (%v) encoded raw data (series: %v min time: %d max time: %d)", c.Raw.Type, labels, c.MinTime, c.MaxTime)
}

if err != nil {
return series.NewErrIterator(errors.Wrapf(err, "failed to initialize chunk from %v type encoded raw data (series: %v min time: %d max time: %d)", c.Raw.Type, bqs.Labels(), c.MinTime, c.MaxTime))
return nil, errors.Wrapf(err, "failed to initialize chunk from %v type encoded raw data (series: %v min time: %d max time: %d)", c.Raw.Type, labels, c.MinTime, c.MaxTime)
}

it := ch.Iterator(nil)
its = append(its, iteratorWithMaxTime{it, c.MaxTime})
}

return newBlockQuerierSeriesIterator(bqs.Labels(), its)
}

func newBlockQuerierSeriesIterator(labels labels.Labels, its []iteratorWithMaxTime) *blockQuerierSeriesIterator {
return &blockQuerierSeriesIterator{labels: labels, iterators: its, lastT: math.MinInt64}
return &blockQuerierSeriesIterator{labels: labels, iterators: its, lastT: math.MinInt64}, nil
}

// iteratorWithMaxTime is an iterator which is aware of the maxT of its embedded iterator.
Expand Down
109 changes: 109 additions & 0 deletions pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querier

import (
"sort"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/series"
"github.com/grafana/mimir/pkg/storegateway/storepb"
)

// Implementation of storage.SeriesSet, based on individual responses from store client.
type blockStreamingQuerierSeriesSet struct {
series []*storepb.StreamingSeries
streamReader *SeriesChunksStreamReader

// next response to process
next int

currSeries storage.Series
}

func (bqss *blockStreamingQuerierSeriesSet) Next() bool {
bqss.currSeries = nil

if bqss.next >= len(bqss.series) {
return false
}

currLabels := mimirpb.FromLabelAdaptersToLabels(bqss.series[bqss.next].Labels)
seriesIdxStart, seriesIdxEnd := bqss.next, bqss.next
bqss.next++

// Merge chunks for current series. Chunks may come in multiple responses, but as soon
// as the response has chunks for a new series, we can stop searching. Series are sorted.
// See documentation for StoreClient.Series call for details.
for bqss.next < len(bqss.series) && labels.Compare(currLabels, mimirpb.FromLabelAdaptersToLabels(bqss.series[bqss.next].Labels)) == 0 {
bqss.next++
}
seriesIdxEnd = bqss.next - 1

bqss.currSeries = newBlockStreamingQuerierSeries(currLabels, seriesIdxStart, seriesIdxEnd, bqss.streamReader)
return true
}

func (bqss *blockStreamingQuerierSeriesSet) At() storage.Series {
return bqss.currSeries
}

func (bqss *blockStreamingQuerierSeriesSet) Err() error {
return nil
}

func (bqss *blockStreamingQuerierSeriesSet) Warnings() storage.Warnings {
return nil
}

// newBlockStreamingQuerierSeries makes a new blockQuerierSeries. Input labels must be already sorted by name.
func newBlockStreamingQuerierSeries(lbls labels.Labels, seriesIdxStart, seriesIdxEnd int, streamReader *SeriesChunksStreamReader) *blockStreamingQuerierSeries {
return &blockStreamingQuerierSeries{
labels: lbls,
seriesIdxStart: seriesIdxStart,
seriesIdxEnd: seriesIdxEnd,
streamReader: streamReader,
}
}

type blockStreamingQuerierSeries struct {
labels labels.Labels
seriesIdxStart, seriesIdxEnd int
streamReader *SeriesChunksStreamReader
}

func (bqs *blockStreamingQuerierSeries) Labels() labels.Labels {
return bqs.labels
}

func (bqs *blockStreamingQuerierSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator {
// Fetch the chunks from the stream.
var allChunks []storepb.AggrChunk
for i := bqs.seriesIdxStart; i <= bqs.seriesIdxEnd; i++ {
chks, err := bqs.streamReader.GetChunks(uint64(i))
if err != nil {
return series.NewErrIterator(err)
}
allChunks = append(allChunks, chks...)
}
if len(allChunks) == 0 {
// should not happen in practice, but we have a unit test for it
return series.NewErrIterator(errors.New("no chunks"))
}

sort.Slice(allChunks, func(i, j int) bool {
return allChunks[i].MinTime < allChunks[j].MinTime
})

it, err := newBlockQuerierSeriesIterator(bqs.Labels(), allChunks)
if err != nil {
return series.NewErrIterator(err)
}

return it
}
171 changes: 171 additions & 0 deletions pkg/querier/blocks_store_querable_streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querier

import (
"errors"
"fmt"
"io"

"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util/limiter"
)

type StreamingSeries struct {
Labels []mimirpb.LabelAdapter
Source StreamingSeriesSource
}

// StreamingSeriesSource holds the relationship between a stream of chunks from a SeriesChunksStreamReader
// and the expected position of a series' chunks in that stream.
type StreamingSeriesSource struct {
StreamReader *SeriesChunksStreamReader
SeriesIndex uint64
}

// SeriesChunksStreamReader is responsible for managing the streaming of chunks from an ingester and buffering
// chunks in memory until they are consumed by the PromQL engine.
type SeriesChunksStreamReader struct {
client storegatewaypb.StoreGateway_SeriesClient
expectedSeriesCount int
queryLimiter *limiter.QueryLimiter
stats *stats.Stats
log log.Logger

seriesCunksChan chan *storepb.StreamSeriesChunks
errorChan chan error
}

func NewSeriesChunksStreamReader(client storegatewaypb.StoreGateway_SeriesClient, expectedSeriesCount int, queryLimiter *limiter.QueryLimiter, stats *stats.Stats, log log.Logger) *SeriesChunksStreamReader {
return &SeriesChunksStreamReader{
client: client,
expectedSeriesCount: expectedSeriesCount,
queryLimiter: queryLimiter,
stats: stats,
log: log,
}
}

// Close cleans up all resources associated with this SeriesChunksStreamReader.
// This method should only be called if StartBuffering is not called.
func (s *SeriesChunksStreamReader) Close() {
if err := s.client.CloseSend(); err != nil {
level.Warn(s.log).Log("msg", "closing ingester client stream failed", "err", err)
}
}

// StartBuffering begins streaming series' chunks from the store gateway associated with
// this SeriesChunksStreamReader. Once all series have been consumed with GetChunks, all resources
// associated with this SeriesChunksStreamReader are cleaned up.
// If an error occurs while streaming, a subsequent call to GetChunks will return an error.
// To cancel buffering, cancel the context associated with this SeriesChunksStreamReader's storegatewaypb.StoreGateway_SeriesClient.
func (s *SeriesChunksStreamReader) StartBuffering() {
s.seriesCunksChan = make(chan *storepb.StreamSeriesChunks, 30) // TODO: increase or reduce the channel size.

// Important: to ensure that the goroutine does not become blocked and leak, the goroutine must only ever write to errorChan at most once.
s.errorChan = make(chan error, 1)
ctxDone := s.client.Context().Done()

go func() {
defer func() {
if err := s.client.CloseSend(); err != nil {
level.Warn(s.log).Log("msg", "closing ingester client stream failed", "err", err)
}

close(s.seriesCunksChan)
close(s.errorChan)
}()

totalSeries := 0

for {
msg, err := s.client.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
if totalSeries < s.expectedSeriesCount {
s.errorChan <- fmt.Errorf("expected to receive %v series, but got EOF after receiving %v series", s.expectedSeriesCount, totalSeries)
}
} else {
s.errorChan <- err
}

return
}

c := msg.GetStreamingSeriesChunks()
if c == nil {
s.errorChan <- fmt.Errorf("expected to receive StreamingSeriesChunks, but got something else")
return
}

totalSeries++
if totalSeries > s.expectedSeriesCount {
s.errorChan <- fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries)
return
}

if err := s.queryLimiter.AddChunks(len(c.Chunks)); err != nil {
s.errorChan <- err
return
}

chunkBytes := 0
for _, ch := range c.Chunks {
chunkBytes += ch.Size()
}
if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil {
s.errorChan <- err
return
}

s.stats.AddFetchedChunks(uint64(len(c.Chunks)))
s.stats.AddFetchedChunkBytes(uint64(chunkBytes))

select {
case <-ctxDone:
// Why do we abort if the context is done?
// We want to make sure that this goroutine is never leaked.
// This goroutine could be leaked if nothing is reading from the buffer, but this method is still trying to send
// more series to a full buffer: it would block forever.
// So, here, we try to send the series to the buffer if we can, but if the context is cancelled, then we give up.
// This only works correctly if the context is cancelled when the query request is complete or cancelled,
// which is true at the time of writing.
s.errorChan <- s.client.Context().Err()
return
case s.seriesCunksChan <- c:
// Batch enqueued successfully, nothing else to do for this batch.
}
}
}()
}

// GetChunks returns the chunks for the series with index seriesIndex.
// This method must be called with monotonically increasing values of seriesIndex.
func (s *SeriesChunksStreamReader) GetChunks(seriesIndex uint64) ([]storepb.AggrChunk, error) {
chks, haveChunks := <-s.seriesCunksChan

if !haveChunks {
// If there's an error, report it.
select {
case err, haveError := <-s.errorChan:
if haveError {
return nil, fmt.Errorf("attempted to read series at index %v from stream, but the stream has failed: %w", seriesIndex, err)
}
default:
}

return nil, fmt.Errorf("attempted to read series at index %v from stream, but the stream has already been exhausted", seriesIndex)
}

if chks.SeriesIndex != seriesIndex {
return nil, fmt.Errorf("attempted to read series at index %v from stream, but the stream has series with index %v", seriesIndex, chks.SeriesIndex)
}

return chks.Chunks, nil
}
Loading

0 comments on commit 8488231

Please sign in to comment.