Skip to content

Commit

Permalink
Merge branch 'main' into metrics/compactor_downsample_duration
Browse files Browse the repository at this point in the history
  • Loading branch information
vanugrah authored Aug 13, 2021
2 parents 95a4e38 + 40040fc commit 5cf5bd6
Show file tree
Hide file tree
Showing 14 changed files with 340 additions and 104 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4444](https://github.com/thanos-io/thanos/pull/4444) UI: Add search block UI.
- [#4509](https://github.com/thanos-io/thanos/pull/4509) Logging: Adds duration_ms in int64 to the logs.
- [#4462](https://github.com/thanos-io/thanos/pull/4462) UI: Add find overlap block UI.
- [#4552](https://github.com/thanos-io/thanos/pull/4552) Compactor: Adds `thanos_compact_downsample_duration_seconds` histogram.
- [#4469](https://github.com/thanos-io/thanos/pull/4469) Compact: Add flag `compact.skip-block-with-out-of-order-chunks` to skip blocks with out-of-order chunks during compaction instead of halting
- [#4552](https://github.com/thanos-io/thanos/pull/4552) Compact: Adds `thanos_compact_downsample_duration_seconds` histogram.

### Fixed

Expand Down
21 changes: 14 additions & 7 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,10 @@ func newCompactMetrics(reg *prometheus.Registry, deleteDelay time.Duration) *com
m.blocksMarked = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_blocks_marked_total",
Help: "Total number of blocks marked in compactor.",
}, []string{"marker"})
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename)
m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)
}, []string{"marker", "reason"})
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason)
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason)
m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")

m.garbageCollectedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_garbage_collected_blocks_total",
Expand Down Expand Up @@ -281,7 +282,7 @@ func runCompact(
cf,
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
compactMetrics.garbageCollectedBlocks,
conf.blockSyncConcurrency)
if err != nil {
Expand Down Expand Up @@ -347,15 +348,17 @@ func runCompact(
conf.acceptMalformedIndex,
enableVerticalCompaction,
reg,
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
compactMetrics.garbageCollectedBlocks,
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
metadata.HashFunc(conf.hashFunc),
conf.skipBlockWithOutOfOrderChunks,
)
planner := compact.WithLargeTotalIndexSizeFilter(
compact.NewPlanner(logger, levels, noCompactMarkerFilter),
bkt,
int64(conf.maxBlockIndexSize),
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(
Expand Down Expand Up @@ -448,7 +451,7 @@ func runCompact(
return errors.Wrap(err, "sync before first pass of downsampling")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)); err != nil {
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil {
return errors.Wrap(err, "retention failed")
}

Expand Down Expand Up @@ -585,6 +588,7 @@ type compactConfig struct {
hashFunc string
enableVerticalCompaction bool
dedupFunc string
skipBlockWithOutOfOrderChunks bool
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -668,6 +672,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size.").
Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize)

cmd.Flag("compact.skip-block-with-out-of-order-chunks", "When set to true, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction").
Hidden().Default("false").BoolVar(&cc.skipBlockWithOutOfOrderChunks)

cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").EnumVar(&cc.hashFunc, "SHA256", "")

Expand Down
2 changes: 1 addition & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ menu: components

# Receiver

The `thanos receive` command implements the [Prometheus Remote Write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). It builds on top of existing Prometheus TSDB and retains its usefulness while extending its functionality with long-term-storage, horizontal scalability, and downsampling. It exposes the StoreAPI so that [Thanos Queriers](query.md) can query received metrics in real-time. The [Thanos Sidecar](sidecar.md) is not sufficient for this, as the system would always lag the block length behind (typically 2 hours).
The `thanos receive` command implements the [Prometheus Remote Write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). It builds on top of existing Prometheus TSDB and retains its usefulness while extending its functionality with long-term-storage, horizontal scalability, and downsampling. Prometheus instances are configured to continuously write metrics to it, and then Thanos Receive uploads TSDB blocks to an object storage bucket every 2 hours by default. Thanos Receive exposes the StoreAPI so that [Thanos Queriers](query.md) can query received metrics in real-time.

We recommend this component to users who can only push into a Thanos due to air-gapped, or egress only environments. Please note the [various pros and cons of pushing metrics](https://docs.google.com/document/d/1H47v7WfyKkSLMrR8_iku6u9VB73WrVzBHb2SB6dL9_g/edit#heading=h.2v27snv0lsur).

Expand Down
19 changes: 13 additions & 6 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,9 @@ func (i HealthStats) Issue347OutsideChunksErr() error {
return nil
}

// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
func (i HealthStats) CriticalErr() error {
var errMsg []string

if i.OutOfOrderSeries > 0 {
errMsg = append(errMsg, fmt.Sprintf(
func (i HealthStats) OutOfOrderChunksErr() error {
if i.OutOfOrderChunks > 0 {
return errors.New(fmt.Sprintf(
"%d/%d series have an average of %.3f out-of-order chunks: "+
"%.3f of these are exact duplicates (in terms of data and time range)",
i.OutOfOrderSeries,
Expand All @@ -125,6 +122,12 @@ func (i HealthStats) CriticalErr() error {
float64(i.DuplicatedChunks)/float64(i.OutOfOrderChunks),
))
}
return nil
}

// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
func (i HealthStats) CriticalErr() error {
var errMsg []string

n := i.OutsideChunks - (i.CompleteOutsideChunks + i.Issue347OutsideChunks)
if n > 0 {
Expand Down Expand Up @@ -158,6 +161,10 @@ func (i HealthStats) AnyErr() error {
errMsg = append(errMsg, err.Error())
}

if err := i.OutOfOrderChunksErr(); err != nil {
errMsg = append(errMsg, err.Error())
}

if len(errMsg) > 0 {
return errors.New(strings.Join(errMsg, ", "))
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/block/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package block
import (
"context"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -83,5 +84,18 @@ func TestRewrite(t *testing.T) {
testutil.Ok(t, ir2.Series(p.At(), &lset, &chks))
testutil.Equals(t, 1, len(chks))
}
}

func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) {
blockDir, err := ioutil.TempDir("", "test-ooo-index")
testutil.Ok(t, err)

err = testutil.PutOutOfOrderIndex(blockDir, 0, math.MaxInt64)
testutil.Ok(t, err)

stats, err := GatherIndexHealthStats(log.NewLogfmtLogger(os.Stderr), blockDir+"/"+IndexFilename, 0, math.MaxInt64)

testutil.Ok(t, err)
testutil.Equals(t, 1, stats.OutOfOrderChunks)
testutil.NotOk(t, stats.OutOfOrderChunksErr())
}
20 changes: 16 additions & 4 deletions pkg/block/indexheader/reader_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,27 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
)

// ReaderPoolMetrics holds metrics tracked by ReaderPool.
type ReaderPoolMetrics struct {
lazyReader *LazyBinaryReaderMetrics
}

// NewReaderPoolMetrics makes new ReaderPoolMetrics.
func NewReaderPoolMetrics(reg prometheus.Registerer) *ReaderPoolMetrics {
return &ReaderPoolMetrics{
lazyReader: NewLazyBinaryReaderMetrics(reg),
}
}

// ReaderPool is used to istantiate new index-header readers and keep track of them.
// When the lazy reader is enabled, the pool keeps track of all instantiated readers
// and automatically close them once the idle timeout is reached. A closed lazy reader
// will be automatically re-opened upon next usage.
type ReaderPool struct {
lazyReaderEnabled bool
lazyReaderIdleTimeout time.Duration
lazyReaderMetrics *LazyBinaryReaderMetrics
logger log.Logger
metrics *ReaderPoolMetrics

// Channel used to signal once the pool is closing.
close chan struct{}
Expand All @@ -36,12 +48,12 @@ type ReaderPool struct {
}

// NewReaderPool makes a new ReaderPool.
func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, reg prometheus.Registerer) *ReaderPool {
func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, metrics *ReaderPoolMetrics) *ReaderPool {
p := &ReaderPool{
logger: logger,
metrics: metrics,
lazyReaderEnabled: lazyReaderEnabled,
lazyReaderIdleTimeout: lazyReaderIdleTimeout,
lazyReaderMetrics: NewLazyBinaryReaderMetrics(reg),
lazyReaders: make(map[*LazyBinaryReader]struct{}),
close: make(chan struct{}),
}
Expand Down Expand Up @@ -73,7 +85,7 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt
var err error

if p.lazyReaderEnabled {
reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.lazyReaderMetrics, p.onLazyReaderClosed)
reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.onLazyReaderClosed)
} else {
reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling)
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/block/indexheader/reader_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestReaderPool_NewBinaryReader(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, nil)
pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, NewReaderPoolMetrics(nil))
defer pool.Close()

r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3)
Expand Down Expand Up @@ -96,7 +96,8 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc))

pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, nil)
metrics := NewReaderPoolMetrics(nil)
pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, metrics)
defer pool.Close()

r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3)
Expand All @@ -107,28 +108,28 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) {
labelNames, err := r.LabelNames()
testutil.Ok(t, err)
testutil.Equals(t, []string{"a"}, labelNames)
testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount))
testutil.Equals(t, float64(0), promtestutil.ToFloat64(metrics.lazyReader.unloadCount))

// Wait enough time before checking it.
time.Sleep(idleTimeout * 2)

// We expect the reader has been closed, but not released from the pool.
testutil.Assert(t, pool.isTracking(r.(*LazyBinaryReader)))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.unloadCount))

// Ensure it can still read data (will be re-opened).
labelNames, err = r.LabelNames()
testutil.Ok(t, err)
testutil.Equals(t, []string{"a"}, labelNames)
testutil.Assert(t, pool.isTracking(r.(*LazyBinaryReader)))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount))
testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.unloadCount))

// We expect an explicit call to Close() to close the reader and release it from the pool too.
testutil.Ok(t, r.Close())
testutil.Assert(t, !pool.isTracking(r.(*LazyBinaryReader)))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount))
testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.unloadCount))
}
2 changes: 2 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
// IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424)
// This reason can be ignored when vertical block sharding will be implemented.
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
// OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked.
OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk"
)

// NoCompactMark marker stores reason of block being excluded from compaction if needed.
Expand Down
Loading

0 comments on commit 5cf5bd6

Please sign in to comment.