Skip to content

Commit

Permalink
Repro of #2147
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Feb 18, 2020
1 parent d74180a commit ea6a26c
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 11 deletions.
188 changes: 177 additions & 11 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/thanos-io/thanos/pkg/objstore/filesystem"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/pool"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
Expand Down Expand Up @@ -1058,7 +1059,6 @@ func newSeries(t testing.TB, lset labels.Labels, smplChunks [][]sample) storepb.
return s
}

// https://github.com/thanos-io/thanos/issues/2147
func TestSeries(t *testing.T) {
tb := testutil.NewTB(t)
tb.Run("200kSeriesWithOneSample", func(tb testutil.TB) {
Expand Down Expand Up @@ -1144,6 +1144,9 @@ func benchSeries_SeriesWithOneSample(t testutil.TB, totalSeries int) {
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
b.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b.meta.ULID)
testutil.Ok(t, err)

blocks = append(blocks, b)
}

Expand Down Expand Up @@ -1191,11 +1194,6 @@ func benchSeries_SeriesWithOneSample(t testutil.TB, totalSeries int) {
})
}

for _, block := range blocks {
block.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, block.meta.ULID)
testutil.Ok(t, err)
}

benchmarkSeries(t, store, cases)
if !t.IsBenchmark() {
// Make sure pool is correctly used.
Expand Down Expand Up @@ -1307,6 +1305,9 @@ func benchSeries_OneSeriesWithManySamples(t testutil.TB, totalSamples int) {
chunkPool: chunkPool,
seriesRefetches: prometheus.NewCounter(prometheus.CounterOpts{}),
}
b.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b.meta.ULID)
testutil.Ok(t, err)

blocks = append(blocks, b)
}

Expand Down Expand Up @@ -1356,11 +1357,6 @@ func benchSeries_OneSeriesWithManySamples(t testutil.TB, totalSamples int) {
})
}

for _, block := range blocks {
block.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, block.meta.ULID)
testutil.Ok(t, err)
}

benchmarkSeries(t, store, cases)
if !t.IsBenchmark() {
// Make sure pool is correctly used.
Expand Down Expand Up @@ -1443,3 +1439,173 @@ func benchmarkSeries(t testutil.TB, store *BucketStore, cases []*benchSeriesCase
})
}
}

// Regression test against: https://github.com/thanos-io/thanos/issues/2147
func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "segfault-series")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()

logger := log.NewNopLogger()

thanosMeta := metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}

chunkPool, err := pool.NewBucketedBytesPool(maxChunkSize, 50e6, 2, 100e7)
testutil.Ok(t, err)

indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{
MaxItemSize: 3000,
// This is the exact size of cache needed for our *single request*.
// This is limited in order to make sure we test evictions.
MaxSize: 4089,
})
testutil.Ok(t, err)

var b1 *bucketBlock

const numSeries = 100

// Create 4 blocks. Each will have numSeriesPerBlock number of series that have 1 sample only.
// Timestamp will be counted for each new series, so each series will have unique timestamp.
// This allows to pick time range that will correspond to number of series picked 1:1.
{
// Block 1.
h, err := tsdb.NewHead(nil, nil, nil, 1)
testutil.Ok(t, err)
defer testutil.Ok(t, h.Close())

app := h.Appender()

for i := 0; i < numSeries; i++ {
ts := int64(i)
lbls := labels.FromStrings("foo", "bar", "b", "1", "i", fmt.Sprintf("%07d%s", ts, postingsBenchSuffix))

_, err := app.Add(lbls, ts, 0)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())

blockDir := filepath.Join(tmpDir, "tmp")
id := createBlockFromHead(t, blockDir, h)

meta, err := metadata.InjectThanos(log.NewNopLogger(), filepath.Join(blockDir, id.String()), thanosMeta, nil)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String())))

b1 = &bucketBlock{
indexCache: indexCache,
logger: logger,
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
b1.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b1.meta.ULID)
testutil.Ok(t, err)
}

var b2 *bucketBlock
{
// Block 2, do not load this block yet.
h, err := tsdb.NewHead(nil, nil, nil, 1)
testutil.Ok(t, err)
defer testutil.Ok(t, h.Close())

app := h.Appender()

for i := 0; i < numSeries; i++ {
ts := int64(i)
lbls := labels.FromStrings("foo", "bar", "b", "2", "i", fmt.Sprintf("%07d%s", ts, postingsBenchSuffix))

_, err := app.Add(lbls, ts, 0)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())

blockDir := filepath.Join(tmpDir, "tmp2")
id := createBlockFromHead(t, blockDir, h)

meta, err := metadata.InjectThanos(log.NewNopLogger(), filepath.Join(blockDir, id.String()), thanosMeta, nil)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String())))

b2 = &bucketBlock{
indexCache: indexCache,
logger: logger,
bkt: bkt,
meta: meta,
partitioner: gapBasedPartitioner{maxGapSize: partitionerMaxGapSize},
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
b2.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b2.meta.ULID)
testutil.Ok(t, err)
}

store := &BucketStore{
bkt: bkt,
logger: logger,
indexCache: indexCache,
metrics: newBucketStoreMetrics(nil),
blockSets: map[uint64]*bucketBlockSet{
labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{{b1, b2}}},
},
blocks: map[ulid.ULID]*bucketBlock{
b1.meta.ULID: b1,
b2.meta.ULID: b2,
},
queryGate: noopGater{},
samplesLimiter: noopLimiter{},
}

t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) {
srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, store.Series(&storepb.SeriesRequest{
MinTime: 0,
MaxTime: int64(numSeries) - 1,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
{Type: storepb.LabelMatcher_EQ, Name: "b", Value: "1"},
},
}, srv))
testutil.Equals(t, 0, len(srv.Warnings))
testutil.Equals(t, numSeries, len(srv.SeriesSet))
})
t.Run("invoke series for second block. This should revoke previous cache.", func(t *testing.T) {
srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, store.Series(&storepb.SeriesRequest{
MinTime: 0,
MaxTime: int64(numSeries) - 1,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
{Type: storepb.LabelMatcher_EQ, Name: "b", Value: "2"},
},
}, srv))
testutil.Equals(t, 0, len(srv.Warnings))
testutil.Equals(t, numSeries, len(srv.SeriesSet))
})
t.Run("remove second block. Cache stays. Ask for first again.", func(t *testing.T) {
testutil.Ok(t, store.removeBlock(b2.meta.ULID))

srv := newStoreSeriesServer(context.Background())
testutil.Ok(t, store.Series(&storepb.SeriesRequest{
MinTime: 0,
MaxTime: int64(numSeries) - 1,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
{Type: storepb.LabelMatcher_EQ, Name: "b", Value: "1"},
},
}, srv))
testutil.Equals(t, 0, len(srv.Warnings))
testutil.Equals(t, numSeries, len(srv.SeriesSet))
})
}
4 changes: 4 additions & 0 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package storecache

import (
"context"
"fmt"
"math"
"sync"

Expand Down Expand Up @@ -177,6 +178,7 @@ func NewInMemoryIndexCacheWithConfig(logger log.Logger, reg prometheus.Registere
}

func (c *InMemoryIndexCache) onEvict(key, val interface{}) {
fmt.Println("evicting!")
k := key.(cacheKey).keyType()
entrySize := sliceHeaderSize + uint64(len(val.([]byte)))

Expand Down Expand Up @@ -228,6 +230,8 @@ func (c *InMemoryIndexCache) set(typ string, key cacheKey, val []byte) {
c.totalCurrentSize.WithLabelValues(typ).Add(float64(size + key.size()))
c.current.WithLabelValues(typ).Inc()
c.curSize += size

fmt.Println("Current size:", typ, key, c.curSize)
}

// ensureFits tries to make sure that the passed slice will fit into the LRU cache.
Expand Down

0 comments on commit ea6a26c

Please sign in to comment.