Skip to content

Commit

Permalink
Breaking changes
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Protasio <[email protected]>
  • Loading branch information
alanprot committed Dec 16, 2022
1 parent 7c36bb8 commit b5c8346
Show file tree
Hide file tree
Showing 19 changed files with 148 additions and 55 deletions.
6 changes: 3 additions & 3 deletions pkg/chunk/encoding/prometheus_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ type prometheusChunkIterator struct {
}

func (p *prometheusChunkIterator) Scan() bool {
return p.it.Next()
return p.it.Next() != chunkenc.ValNone
}

func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) bool {
// FindAtOrAfter must return OLDEST value at given time. That means we need to start with a fresh iterator,
// otherwise we cannot guarantee OLDEST.
p.it = p.c.Iterator(p.it)
return p.it.Seek(int64(time))
return p.it.Seek(int64(time)) != chunkenc.ValNone
}

func (p *prometheusChunkIterator) Value() model.SamplePair {
Expand All @@ -110,7 +110,7 @@ func (p *prometheusChunkIterator) Batch(size int) Batch {
batch.Timestamps[j] = t
batch.Values[j] = v
j++
if j < size && !p.it.Next() {
if j < size && p.it.Next() == chunkenc.ValNone {
break
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client
}

it := series.Iterator()
for it.Next() {
for it.Next() != chunkenc.ValNone {
t, v := it.At()
ts.Samples = append(ts.Samples, cortexpb.Sample{Value: v, TimestampMs: t})
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/batch/batch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package batch

import (
"github.com/cortexproject/cortex/pkg/querier/iterators"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb/chunkenc"

Expand Down Expand Up @@ -75,10 +76,10 @@ type iteratorAdapter struct {
}

func newIteratorAdapter(underlying iterator) chunkenc.Iterator {
return &iteratorAdapter{
return iterators.NewCompatibleChunksIterator(&iteratorAdapter{
batchSize: 1,
underlying: underlying,
}
})
}

// Seek implements chunkenc.Iterator.
Expand Down
7 changes: 4 additions & 3 deletions pkg/querier/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/chunk"
Expand Down Expand Up @@ -41,7 +42,7 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {

for n := 0; n < b.N; n++ {
it := NewChunkMergeIterator(chunks, 0, 0)
for it.Next() {
for it.Next() != chunkenc.ValNone {
it.At()
}

Expand All @@ -62,8 +63,8 @@ func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
sut := NewChunkMergeIterator(chunks, 0, 0)

// Following calls mimics Prometheus's query engine behaviour for VectorSelector.
require.True(t, sut.Next())
require.True(t, sut.Seek(0))
require.True(t, sut.Next() != chunkenc.ValNone)
require.True(t, sut.Seek(0) != chunkenc.ValNone)

actual, val := sut.At()
require.Equal(t, float64(1*time.Second/time.Millisecond), val) // since mkChunk use ts as value.
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/batch/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,28 @@ func mkGenericChunk(t require.TestingT, from model.Time, points int, enc promchu
func testIter(t require.TestingT, points int, iter chunkenc.Iterator) {
ets := model.TimeFromUnix(0)
for i := 0; i < points; i++ {
require.True(t, iter.Next(), strconv.Itoa(i))
require.NotEqual(t, iter.Next(), chunkenc.ValNone, strconv.Itoa(i))
ts, v := iter.At()
require.EqualValues(t, int64(ets), ts, strconv.Itoa(i))
require.EqualValues(t, float64(ets), v, strconv.Itoa(i))
ets = ets.Add(step)
}
require.False(t, iter.Next())
require.Equal(t, iter.Next(), chunkenc.ValNone)
}

func testSeek(t require.TestingT, points int, iter chunkenc.Iterator) {
for i := 0; i < points; i += points / 10 {
ets := int64(i * int(step/time.Millisecond))

require.True(t, iter.Seek(ets))
require.NotEqual(t, iter.Seek(ets), chunkenc.ValNone)
ts, v := iter.At()
require.EqualValues(t, ets, ts)
require.EqualValues(t, v, float64(ets))
require.NoError(t, iter.Err())

for j := i + 1; j < i+points/10; j++ {
ets := int64(j * int(step/time.Millisecond))
require.True(t, iter.Next())
require.NotEqual(t, iter.Next(), chunkenc.ValNone)
ts, v := iter.At()
require.EqualValues(t, ets, ts)
require.EqualValues(t, float64(ets), v)
Expand Down
7 changes: 4 additions & 3 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"math"
"sort"

"github.com/cortexproject/cortex/pkg/querier/iterators"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -125,7 +126,7 @@ func (bqs *blockQuerierSeries) Iterator() chunkenc.Iterator {
its = append(its, it)
}

return newBlockQuerierSeriesIterator(bqs.Labels(), its)
return iterators.NewCompatibleChunksIterator(newBlockQuerierSeriesIterator(bqs.Labels(), its))
}

func newBlockQuerierSeriesIterator(labels labels.Labels, its []chunkenc.Iterator) *blockQuerierSeriesIterator {
Expand Down Expand Up @@ -173,7 +174,7 @@ func (it *blockQuerierSeriesIterator) Next() bool {
return false
}

if it.iterators[it.i].Next() {
if it.iterators[it.i].Next() != chunkenc.ValNone {
return true
}
if it.iterators[it.i].Err() != nil {
Expand All @@ -189,7 +190,7 @@ func (it *blockQuerierSeriesIterator) Next() bool {

// we must advance iterator first, to see if it has any samples.
// Seek will call At() as its first operation.
if !it.iterators[it.i].Next() {
if it.iterators[it.i].Next() == chunkenc.ValNone {
if it.iterators[it.i].Err() != nil {
return false
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestBlockQuerierSeries(t *testing.T) {
sampleIx := 0

it := series.Iterator()
for it.Next() {
for it.Next() != chunkenc.ValNone {
ts, val := it.At()
require.True(t, sampleIx < len(testData.expectedSamples))
assert.Equal(t, int64(testData.expectedSamples[sampleIx].Timestamp), ts)
Expand Down Expand Up @@ -208,7 +208,7 @@ func verifyNextSeries(t *testing.T, ss storage.SeriesSet, labels labels.Labels,

prevTS := int64(0)
count := 0
for it := s.Iterator(); it.Next(); {
for it := s.Iterator(); it.Next() != chunkenc.ValNone; {
count++
ts, v := it.At()
require.Equal(t, math.Sin(float64(ts)), v)
Expand Down Expand Up @@ -333,7 +333,7 @@ func Benchmark_blockQuerierSeriesSet_iteration(b *testing.B) {
set := blockQuerierSeriesSet{series: series}

for set.Next() {
for t := set.At().Iterator(); t.Next(); {
for t := set.At().Iterator(); t.Next() != chunkenc.ValNone; {
t.At()
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
var actualValues []valueResult

it := set.At().Iterator()
for it.Next() {
for it.Next() != chunkenc.ValNone {
t, v := it.At()
actualValues = append(actualValues, valueResult{
t: t,
Expand Down
5 changes: 3 additions & 2 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -320,13 +321,13 @@ func verifySeries(t *testing.T, series storage.Series, l labels.Labels, samples

it := series.Iterator()
for _, s := range samples {
require.True(t, it.Next())
require.NotEqual(t, it.Next(), chunkenc.ValNone)
require.Nil(t, it.Err())
ts, v := it.At()
require.Equal(t, s.Value, v)
require.Equal(t, s.TimestampMs, ts)
}
require.False(t, it.Next())
require.Equal(t, it.Next(), chunkenc.ValNone)
require.Nil(t, it.Err())
}
func TestDistributorQuerier_LabelNames(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/iterators/chunk_merge_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewChunkMergeIterator(cs []chunk.Chunk, _, _ model.Time) chunkenc.Iterator

for _, iter := range c.its {
if iter.Next() {
c.h = append(c.h, iter)
c.h = append(c.h, NewCompatibleChunksIterator(iter))
continue
}

Expand All @@ -40,7 +40,7 @@ func NewChunkMergeIterator(cs []chunk.Chunk, _, _ model.Time) chunkenc.Iterator
}

heap.Init(&c.h)
return c
return NewCompatibleChunksIterator(c)
}

// Build a list of lists of non-overlapping chunk iterators.
Expand Down Expand Up @@ -78,7 +78,7 @@ func (c *chunkMergeIterator) Seek(t int64) bool {

for _, iter := range c.its {
if iter.Seek(t) {
c.h = append(c.h, iter)
c.h = append(c.h, NewCompatibleChunksIterator(iter))
continue
}

Expand Down Expand Up @@ -107,7 +107,7 @@ func (c *chunkMergeIterator) Next() bool {
for c.currTime == lastTime && len(c.h) > 0 {
c.currTime, c.currValue = c.h[0].At()

if c.h[0].Next() {
if c.h[0].Next() != chunkenc.ValNone {
heap.Fix(&c.h, 0)
continue
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/querier/iterators/chunk_merge_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -54,13 +55,13 @@ func TestChunkMergeIterator(t *testing.T) {
t.Run(strconv.Itoa(i), func(t *testing.T) {
iter := NewChunkMergeIterator(tc.chunks, 0, 0)
for i := tc.mint; i < tc.maxt; i++ {
require.True(t, iter.Next())
require.NotEqual(t, iter.Next(), chunkenc.ValNone)
ts, s := iter.At()
assert.Equal(t, i, ts)
assert.Equal(t, float64(i), s)
assert.NoError(t, iter.Err())
}
assert.False(t, iter.Next())
assert.Equal(t, iter.Next(), chunkenc.ValNone)
})
}
}
Expand All @@ -73,20 +74,20 @@ func TestChunkMergeIteratorSeek(t *testing.T) {
}, 0, 0)

for i := int64(0); i < 10; i += 20 {
require.True(t, iter.Seek(i))
require.NotEqual(t, iter.Seek(i), chunkenc.ValNone)
ts, s := iter.At()
assert.Equal(t, i, ts)
assert.Equal(t, float64(i), s)
assert.NoError(t, iter.Err())

for j := i + 1; j < 200; j++ {
require.True(t, iter.Next())
require.NotEqual(t, iter.Next(), chunkenc.ValNone)
ts, s := iter.At()
assert.Equal(t, j, ts)
assert.Equal(t, float64(j), s)
assert.NoError(t, iter.Err())
}
assert.False(t, iter.Next())
assert.Equal(t, iter.Next(), chunkenc.ValNone)
}
}

Expand Down
77 changes: 77 additions & 0 deletions pkg/querier/iterators/compatibe_chunk_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package iterators

import (
"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

var ErrNativeHistogramsUnsupported = errors.Newf("querying native histograms is not supported")

// compChunksIterator is a compatible interface from prometheus before the native histograms implementation.
// As for now cortex does not support this feature, this interface was create as a bridge and should be deleted
// together with the compatibleChunksIterator when cortex supports native histograms.
// See: https://github.com/prometheus/prometheus/blob/5ddfba78936b7ca7ed3600869f623359e6b5ecb0/tsdb/chunkenc/chunk.go#L89-L105
type compChunksIterator interface {
Next() bool
Seek(t int64) bool
At() (int64, float64)
Err() error
}

type compatibleChunksIterator struct {
chunkenc.Iterator
it compChunksIterator
err error
}

func NewCompatibleChunksIterator(i compChunksIterator) *compatibleChunksIterator {
return &compatibleChunksIterator{it: i}
}

func (c *compatibleChunksIterator) Next() chunkenc.ValueType {
if c.it.Next() {
return chunkenc.ValFloat
}

return chunkenc.ValNone
}

func (c *compatibleChunksIterator) Seek(t int64) chunkenc.ValueType {
if c.it.Seek(t) {
return chunkenc.ValFloat
}

return chunkenc.ValNone
}

func (c *compatibleChunksIterator) At() (int64, float64) {
return c.it.At()
}

func (c *compatibleChunksIterator) AtHistogram() (int64, *histogram.Histogram) {
c.err = ErrNativeHistogramsUnsupported
return 0, nil
}

func (c *compatibleChunksIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
c.err = ErrNativeHistogramsUnsupported
return 0, nil
}

func (c *compatibleChunksIterator) AtT() int64 {
t, _ := c.it.At()
return t
}

func (c *compatibleChunksIterator) Err() error {
if c.err != nil {
return c.err
}

return c.it.Err()
}

func (c *compatibleChunksIterator) AtTime() int64 {
return c.AtT()
}
3 changes: 2 additions & 1 deletion pkg/querier/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ingester/client"
Expand Down Expand Up @@ -82,7 +83,7 @@ func seriesSetToQueryResponse(s storage.SeriesSet) (*client.QueryResponse, error
series := s.At()
samples := []cortexpb.Sample{}
it := series.Iterator()
for it.Next() {
for it.Next() != chunkenc.ValNone {
t, v := it.At()
samples = append(samples, cortexpb.Sample{
TimestampMs: t,
Expand Down
Loading

0 comments on commit b5c8346

Please sign in to comment.