From e0527c4a04312f0deb4fd5634b4a9870731f92db Mon Sep 17 00:00:00 2001 From: Yangyang <51769527+yangyang919@users.noreply.github.com> Date: Thu, 12 Aug 2021 14:33:11 +0800 Subject: [PATCH 1/3] docs: Fix wrong description in /docs/receive.md (#4555) * docs: Fix wrong description in /docs/receive.md Signed-off-by: yangyang * remove whitespace Signed-off-by: yangyang --- docs/components/receive.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/components/receive.md b/docs/components/receive.md index f46d840491..8e60de1d30 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -6,7 +6,7 @@ menu: components # Receiver -The `thanos receive` command implements the [Prometheus Remote Write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). It builds on top of existing Prometheus TSDB and retains its usefulness while extending its functionality with long-term-storage, horizontal scalability, and downsampling. It exposes the StoreAPI so that [Thanos Queriers](query.md) can query received metrics in real-time. The [Thanos Sidecar](sidecar.md) is not sufficient for this, as the system would always lag the block length behind (typically 2 hours). +The `thanos receive` command implements the [Prometheus Remote Write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). It builds on top of existing Prometheus TSDB and retains its usefulness while extending its functionality with long-term-storage, horizontal scalability, and downsampling. Prometheus instances are configured to continuously write metrics to it, and then Thanos Receive uploads TSDB blocks to an object storage bucket every 2 hours by default. Thanos Receive exposes the StoreAPI so that [Thanos Queriers](query.md) can query received metrics in real-time. We recommend this component to users who can only push into a Thanos due to air-gapped, or egress only environments. Please note the [various pros and cons of pushing metrics](https://docs.google.com/document/d/1H47v7WfyKkSLMrR8_iku6u9VB73WrVzBHb2SB6dL9_g/edit#heading=h.2v27snv0lsur). From 07b377f47229c853290fcb9520a05696376d870d Mon Sep 17 00:00:00 2001 From: Yang Hu Date: Thu, 12 Aug 2021 20:59:16 -0700 Subject: [PATCH 2/3] skip blocks with out-of-order chunk during compaction (#4469) Signed-off-by: Yang Hu --- CHANGELOG.md | 1 + cmd/thanos/compact.go | 21 +++-- pkg/block/index.go | 19 +++-- pkg/block/index_test.go | 14 +++ pkg/block/metadata/markers.go | 2 + pkg/compact/compact.go | 147 ++++++++++++++++++++++---------- pkg/compact/compact_e2e_test.go | 78 +++++++++++------ pkg/query/querier.go | 2 +- pkg/testutil/testutil.go | 110 ++++++++++++++++++++++++ 9 files changed, 308 insertions(+), 86 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ab107ba91..26cc524b0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4444](https://github.com/thanos-io/thanos/pull/4444) UI: Add search block UI. - [#4509](https://github.com/thanos-io/thanos/pull/4509) Logging: Adds duration_ms in int64 to the logs. - [#4462](https://github.com/thanos-io/thanos/pull/4462) UI: Add find overlap block UI +- [#4469](https://github.com/thanos-io/thanos/pull/4469) Compact: Add flag `compact.skip-block-with-out-of-order-chunks` to skip blocks with out-of-order chunks during compaction instead of halting ### Fixed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index af3fe78129..53eb9a5f9a 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -147,9 +147,10 @@ func newCompactMetrics(reg *prometheus.Registry, deleteDelay time.Duration) *com m.blocksMarked = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_compact_blocks_marked_total", Help: "Total number of blocks marked in compactor.", - }, []string{"marker"}) - m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename) - m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename) + }, []string{"marker", "reason"}) + m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason) + m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason) + m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "") m.garbageCollectedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_compact_garbage_collected_blocks_total", @@ -281,7 +282,7 @@ func runCompact( cf, duplicateBlocksFilter, ignoreDeletionMarkFilter, - compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename), + compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""), compactMetrics.garbageCollectedBlocks, conf.blockSyncConcurrency) if err != nil { @@ -347,15 +348,17 @@ func runCompact( conf.acceptMalformedIndex, enableVerticalCompaction, reg, - compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename), + compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""), compactMetrics.garbageCollectedBlocks, + compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason), metadata.HashFunc(conf.hashFunc), + conf.skipBlockWithOutOfOrderChunks, ) planner := compact.WithLargeTotalIndexSizeFilter( compact.NewPlanner(logger, levels, noCompactMarkerFilter), bkt, int64(conf.maxBlockIndexSize), - compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename), + compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason), ) blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures) compactor, err := compact.NewBucketCompactor( @@ -448,7 +451,7 @@ func runCompact( return errors.Wrap(err, "sync before first pass of downsampling") } - if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)); err != nil { + if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")); err != nil { return errors.Wrap(err, "retention failed") } @@ -585,6 +588,7 @@ type compactConfig struct { hashFunc string enableVerticalCompaction bool dedupFunc string + skipBlockWithOutOfOrderChunks bool } func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -668,6 +672,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size."). Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize) + cmd.Flag("compact.skip-block-with-out-of-order-chunks", "When set to true, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction"). + Hidden().Default("false").BoolVar(&cc.skipBlockWithOutOfOrderChunks) + cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). Default("").EnumVar(&cc.hashFunc, "SHA256", "") diff --git a/pkg/block/index.go b/pkg/block/index.go index 2b6ece295e..851dfa9d98 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -111,12 +111,9 @@ func (i HealthStats) Issue347OutsideChunksErr() error { return nil } -// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure. -func (i HealthStats) CriticalErr() error { - var errMsg []string - - if i.OutOfOrderSeries > 0 { - errMsg = append(errMsg, fmt.Sprintf( +func (i HealthStats) OutOfOrderChunksErr() error { + if i.OutOfOrderChunks > 0 { + return errors.New(fmt.Sprintf( "%d/%d series have an average of %.3f out-of-order chunks: "+ "%.3f of these are exact duplicates (in terms of data and time range)", i.OutOfOrderSeries, @@ -125,6 +122,12 @@ func (i HealthStats) CriticalErr() error { float64(i.DuplicatedChunks)/float64(i.OutOfOrderChunks), )) } + return nil +} + +// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure. +func (i HealthStats) CriticalErr() error { + var errMsg []string n := i.OutsideChunks - (i.CompleteOutsideChunks + i.Issue347OutsideChunks) if n > 0 { @@ -158,6 +161,10 @@ func (i HealthStats) AnyErr() error { errMsg = append(errMsg, err.Error()) } + if err := i.OutOfOrderChunksErr(); err != nil { + errMsg = append(errMsg, err.Error()) + } + if len(errMsg) > 0 { return errors.New(strings.Join(errMsg, ", ")) } diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go index 6db6554f44..044c047874 100644 --- a/pkg/block/index_test.go +++ b/pkg/block/index_test.go @@ -6,6 +6,7 @@ package block import ( "context" "io/ioutil" + "math" "os" "path/filepath" "testing" @@ -83,5 +84,18 @@ func TestRewrite(t *testing.T) { testutil.Ok(t, ir2.Series(p.At(), &lset, &chks)) testutil.Equals(t, 1, len(chks)) } +} + +func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) { + blockDir, err := ioutil.TempDir("", "test-ooo-index") + testutil.Ok(t, err) + err = testutil.PutOutOfOrderIndex(blockDir, 0, math.MaxInt64) + testutil.Ok(t, err) + + stats, err := GatherIndexHealthStats(log.NewLogfmtLogger(os.Stderr), blockDir+"/"+IndexFilename, 0, math.MaxInt64) + + testutil.Ok(t, err) + testutil.Equals(t, 1, stats.OutOfOrderChunks) + testutil.NotOk(t, stats.OutOfOrderChunksErr()) } diff --git a/pkg/block/metadata/markers.go b/pkg/block/metadata/markers.go index b3c8b9d1f0..f2d40bd045 100644 --- a/pkg/block/metadata/markers.go +++ b/pkg/block/metadata/markers.go @@ -67,6 +67,8 @@ const ( // IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424) // This reason can be ignored when vertical block sharding will be implemented. IndexSizeExceedingNoCompactReason = "index-size-exceeding" + // OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked. + OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk" ) // NoCompactMark marker stores reason of block being excluded from compaction if needed. diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 946c554621..7dc7801953 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -229,18 +229,20 @@ func defaultGroupKey(res int64, lbls labels.Labels) string { // DefaultGrouper is the Thanos built-in grouper. It groups blocks based on downsample // resolution and block's labels. type DefaultGrouper struct { - bkt objstore.Bucket - logger log.Logger - acceptMalformedIndex bool - enableVerticalCompaction bool - compactions *prometheus.CounterVec - compactionRunsStarted *prometheus.CounterVec - compactionRunsCompleted *prometheus.CounterVec - compactionFailures *prometheus.CounterVec - verticalCompactions *prometheus.CounterVec - garbageCollectedBlocks prometheus.Counter - blocksMarkedForDeletion prometheus.Counter - hashFunc metadata.HashFunc + bkt objstore.Bucket + logger log.Logger + acceptMalformedIndex bool + enableVerticalCompaction bool + compactions *prometheus.CounterVec + compactionRunsStarted *prometheus.CounterVec + compactionRunsCompleted *prometheus.CounterVec + compactionFailures *prometheus.CounterVec + verticalCompactions *prometheus.CounterVec + garbageCollectedBlocks prometheus.Counter + blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter + hashFunc metadata.HashFunc + skipChunksWithOutOfOrderBlocks bool } // NewDefaultGrouper makes a new DefaultGrouper. @@ -252,7 +254,9 @@ func NewDefaultGrouper( reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, + blocksMarkedForNoCompact prometheus.Counter, hashFunc metadata.HashFunc, + skipChunksWithOutOfOrderBlocks bool, ) *DefaultGrouper { return &DefaultGrouper{ bkt: bkt, @@ -279,9 +283,11 @@ func NewDefaultGrouper( Name: "thanos_compact_group_vertical_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", }, []string{"group"}), - garbageCollectedBlocks: garbageCollectedBlocks, - blocksMarkedForDeletion: blocksMarkedForDeletion, - hashFunc: hashFunc, + blocksMarkedForNoCompact: blocksMarkedForNoCompact, + garbageCollectedBlocks: garbageCollectedBlocks, + blocksMarkedForDeletion: blocksMarkedForDeletion, + hashFunc: hashFunc, + skipChunksWithOutOfOrderBlocks: skipChunksWithOutOfOrderBlocks, } } @@ -309,7 +315,9 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro g.verticalCompactions.WithLabelValues(groupKey), g.garbageCollectedBlocks, g.blocksMarkedForDeletion, + g.blocksMarkedForNoCompact, g.hashFunc, + g.skipChunksWithOutOfOrderBlocks, ) if err != nil { return nil, errors.Wrap(err, "create compaction group") @@ -330,23 +338,25 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro // Group captures a set of blocks that have the same origin labels and downsampling resolution. // Those blocks generally contain the same series and can thus efficiently be compacted. type Group struct { - logger log.Logger - bkt objstore.Bucket - key string - labels labels.Labels - resolution int64 - mtx sync.Mutex - metasByMinTime []*metadata.Meta - acceptMalformedIndex bool - enableVerticalCompaction bool - compactions prometheus.Counter - compactionRunsStarted prometheus.Counter - compactionRunsCompleted prometheus.Counter - compactionFailures prometheus.Counter - verticalCompactions prometheus.Counter - groupGarbageCollectedBlocks prometheus.Counter - blocksMarkedForDeletion prometheus.Counter - hashFunc metadata.HashFunc + logger log.Logger + bkt objstore.Bucket + key string + labels labels.Labels + resolution int64 + mtx sync.Mutex + metasByMinTime []*metadata.Meta + acceptMalformedIndex bool + enableVerticalCompaction bool + compactions prometheus.Counter + compactionRunsStarted prometheus.Counter + compactionRunsCompleted prometheus.Counter + compactionFailures prometheus.Counter + verticalCompactions prometheus.Counter + groupGarbageCollectedBlocks prometheus.Counter + blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter + hashFunc metadata.HashFunc + skipChunksWithOutofOrderBlocks bool } // NewGroup returns a new compaction group. @@ -365,27 +375,31 @@ func NewGroup( verticalCompactions prometheus.Counter, groupGarbageCollectedBlocks prometheus.Counter, blocksMarkedForDeletion prometheus.Counter, + blockMakredForNoCopmact prometheus.Counter, hashFunc metadata.HashFunc, + skipChunksWithOutOfOrderChunks bool, ) (*Group, error) { if logger == nil { logger = log.NewNopLogger() } g := &Group{ - logger: logger, - bkt: bkt, - key: key, - labels: lset, - resolution: resolution, - acceptMalformedIndex: acceptMalformedIndex, - enableVerticalCompaction: enableVerticalCompaction, - compactions: compactions, - compactionRunsStarted: compactionRunsStarted, - compactionRunsCompleted: compactionRunsCompleted, - compactionFailures: compactionFailures, - verticalCompactions: verticalCompactions, - groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, - blocksMarkedForDeletion: blocksMarkedForDeletion, - hashFunc: hashFunc, + logger: logger, + bkt: bkt, + key: key, + labels: lset, + resolution: resolution, + acceptMalformedIndex: acceptMalformedIndex, + enableVerticalCompaction: enableVerticalCompaction, + compactions: compactions, + compactionRunsStarted: compactionRunsStarted, + compactionRunsCompleted: compactionRunsCompleted, + compactionFailures: compactionFailures, + verticalCompactions: verticalCompactions, + groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, + blocksMarkedForDeletion: blocksMarkedForDeletion, + blocksMarkedForNoCompact: blockMakredForNoCopmact, + hashFunc: hashFunc, + skipChunksWithOutofOrderBlocks: skipChunksWithOutOfOrderChunks, } return g, nil } @@ -541,6 +555,26 @@ func IsIssue347Error(err error) bool { return ok } +// OutOfOrderChunkError is a type wrapper for OOO chunk error from validating block index. +type OutOfOrderChunksError struct { + err error + id ulid.ULID +} + +func (e OutOfOrderChunksError) Error() string { + return e.err.Error() +} + +func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksError { + return OutOfOrderChunksError{err: err, id: brokenBlock} +} + +// IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError. +func IsOutOfOrderChunkError(err error) bool { + _, ok := errors.Cause(err).(OutOfOrderChunksError) + return ok +} + // HaltError is a type wrapper for errors that should halt any further progress on compactions. type HaltError struct { err error @@ -749,6 +783,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels)) } + if err := stats.OutOfOrderChunksErr(); cg.skipChunksWithOutofOrderBlocks && err != nil { + return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID) + } + if err := stats.Issue347OutsideChunksErr(); err != nil { return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) } @@ -939,6 +977,21 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { continue } } + // if block has out of order chunk, mark the block for no compaction and continue. + if IsOutOfOrderChunkError(err) { + if err := block.MarkForNoCompact( + ctx, + c.logger, + c.bkt, + err.(OutOfOrderChunksError).id, + metadata.OutOfOrderChunksNoCompactReason, + "OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", g.blocksMarkedForNoCompact); err == nil { + mtx.Lock() + finishedAllGroups = false + mtx.Unlock() + continue + } + } errChan <- errors.Wrapf(err, "group %s", g.Key()) return } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index cc690188cc..bfd0cb9fe5 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -102,6 +102,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency) sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 1) testutil.Ok(t, err) @@ -138,7 +139,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, sy.GarbageCollect(ctx)) // Only the level 3 block, the last source block in both resolutions should be left. - grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, metadata.NoneFunc) + grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc, true) groups, err := grouper.Groups(sy.Metas()) testutil.Ok(t, err) @@ -195,13 +196,16 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(bkt), 48*time.Hour, fetcherConcurrency) duplicateBlocksFilter := block.NewDeduplicateFilter() + noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2) metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{ ignoreDeletionMarkFilter, duplicateBlocksFilter, + noCompactMarkerFilter, }, nil) testutil.Ok(t, err) blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + blocksMaredForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5) testutil.Ok(t, err) @@ -209,9 +213,8 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc) testutil.Ok(t, err) - planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000}) - - grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, metadata.NoneFunc) + planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter) + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, true) bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2) testutil.Ok(t, err) @@ -220,6 +223,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) + testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.blocksMarkedForNoCompact)) testutil.Equals(t, 0, MetricCount(grouper.compactions)) testutil.Equals(t, 0, MetricCount(grouper.compactionRunsStarted)) testutil.Equals(t, 0, MetricCount(grouper.compactionRunsCompleted)) @@ -233,7 +237,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg extLabels2 := labels.Labels{{Name: "e1", Value: "1"}} metas := createAndUpload(t, bkt, []blockgenSpec{ { - numSamples: 100, mint: 0, maxt: 1000, extLset: extLabels, res: 124, + numSamples: 100, mint: 500, maxt: 1000, extLset: extLabels, res: 124, series: []labels.Labels{ {{Name: "a", Value: "1"}}, {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}}, @@ -303,11 +307,22 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg {{Name: "a", Value: "7"}}, }, }, + }, []blockgenSpec{ + { + numSamples: 100, mint: 0, maxt: 499, extLset: extLabels, res: 124, + series: []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}, {Name: "b", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + }, + }, }) testutil.Ok(t, bComp.Compact(ctx)) testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks)) testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion)) + testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.blocksMarkedForNoCompact)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) testutil.Equals(t, 4, MetricCount(grouper.compactions)) testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) @@ -315,19 +330,19 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) testutil.Equals(t, 4, MetricCount(grouper.compactionRunsStarted)) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) // TODO(bwplotka): Looks like we do some unnecessary loops. Not a major problem but investigate. - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) testutil.Equals(t, 4, MetricCount(grouper.compactionRunsCompleted)) testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) // TODO(bwplotka): Looks like we do some unnecessary loops. Not a major problem but investigate. - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) - testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) + testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) testutil.Equals(t, 4, MetricCount(grouper.compactionFailures)) - testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) + testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[0].Thanos)))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[7].Thanos)))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[4].Thanos)))) testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(DefaultGroupKey(metas[5].Thanos)))) @@ -342,6 +357,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg metas[4].ULID: false, metas[5].ULID: false, metas[8].ULID: false, + metas[9].ULID: false, } others := map[string]metadata.Meta{} testutil.Ok(t, bkt.Iter(ctx, "", func(n string) error { @@ -374,7 +390,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg meta, ok := others[defaultGroupKey(124, extLabels)] testutil.Assert(t, ok, "meta not found") - testutil.Equals(t, int64(0), meta.MinTime) + testutil.Equals(t, int64(500), meta.MinTime) testutil.Equals(t, int64(3000), meta.MaxTime) testutil.Equals(t, uint64(6), meta.Stats.NumSeries) testutil.Equals(t, uint64(2*4*100), meta.Stats.NumSamples) // Only 2 times 4*100 because one block was empty. @@ -413,7 +429,7 @@ type blockgenSpec struct { res int64 } -func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) (metas []*metadata.Meta) { +func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec, blocksWithOutOfOrderChunks []blockgenSpec) (metas []*metadata.Meta) { prepareDir, err := ioutil.TempDir("", "test-compact-prepare") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(prepareDir)) }() @@ -422,23 +438,35 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) ( defer cancel() for _, b := range blocks { - var id ulid.ULID - var err error - if b.numSamples == 0 { - id, err = e2eutil.CreateEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res) - } else { - id, err = e2eutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res, metadata.NoneFunc) - } - testutil.Ok(t, err) + id, meta := createBlock(t, ctx, prepareDir, b) + metas = append(metas, meta) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), metadata.NoneFunc)) + } + for _, b := range blocksWithOutOfOrderChunks { + id, meta := createBlock(t, ctx, prepareDir, b) - meta, err := metadata.ReadFromDir(filepath.Join(prepareDir, id.String())) + err := testutil.PutOutOfOrderIndex(filepath.Join(prepareDir, id.String()), b.mint, b.maxt) testutil.Ok(t, err) - metas = append(metas, meta) + metas = append(metas, meta) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()), metadata.NoneFunc)) } + return metas } +func createBlock(t testing.TB, ctx context.Context, prepareDir string, b blockgenSpec) (id ulid.ULID, meta *metadata.Meta) { + var err error + if b.numSamples == 0 { + id, err = e2eutil.CreateEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res) + } else { + id, err = e2eutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res, metadata.NoneFunc) + } + testutil.Ok(t, err) + + meta, err = metadata.ReadFromDir(filepath.Join(prepareDir, id.String())) + testutil.Ok(t, err) + return +} // Regression test for #2459 issue. func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T) { diff --git a/pkg/query/querier.go b/pkg/query/querier.go index b1aa95bee2..30202d5b5e 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -207,7 +207,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match } // The querier has a context but it gets canceled, as soon as query evaluation is completed, by the engine. - // We want to prevent this from happening for the async storea API calls we make while preserving tracing context. + // We want to prevent this from happening for the async store API calls we make while preserving tracing context. ctx := tracing.CopyTraceContext(context.Background(), q.ctx) ctx, cancel := context.WithTimeout(ctx, q.selectTimeout) span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{ diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index 7f0e0a8911..9668a2f03f 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -4,13 +4,21 @@ package testutil import ( + "context" "fmt" + "math/rand" "path/filepath" "reflect" "runtime" "runtime/debug" + "sort" "testing" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/davecgh/go-spew/spew" "github.com/pkg/errors" "github.com/pmezard/go-difflib/difflib" @@ -203,3 +211,105 @@ func FaultOrPanicToErr(f func()) (err error) { return err } + +var indexFilename = "index" + +type indexWriterSeries struct { + labels labels.Labels + chunks []chunks.Meta // series file offset of chunks +} + +type indexWriterSeriesSlice []*indexWriterSeries + +// PutOutOfOrderIndex updates the index in blockDir with an index containing an out-of-order chunk +// copied from https://github.com/prometheus/prometheus/blob/b1ed4a0a663d0c62526312311c7529471abbc565/tsdb/index/index_test.go#L346 +func PutOutOfOrderIndex(blockDir string, minTime int64, maxTime int64) error { + + if minTime >= maxTime || minTime+4 >= maxTime { + return fmt.Errorf("minTime must be at least 4 less than maxTime to not create overlapping chunks") + } + + lbls := []labels.Labels{ + []labels.Label{ + {Name: "lbl1", Value: "1"}, + }, + } + + // Sort labels as the index writer expects series in sorted order. + sort.Sort(labels.Slice(lbls)) + + symbols := map[string]struct{}{} + for _, lset := range lbls { + for _, l := range lset { + symbols[l.Name] = struct{}{} + symbols[l.Value] = struct{}{} + } + } + + var input indexWriterSeriesSlice + + // Generate ChunkMetas for every label set. + for _, lset := range lbls { + var metas []chunks.Meta + // only need two chunks that are out-of-order + chk1 := chunks.Meta{ + MinTime: maxTime - 2, + MaxTime: maxTime - 1, + Ref: rand.Uint64(), + Chunk: chunkenc.NewXORChunk(), + } + metas = append(metas, chk1) + chk2 := chunks.Meta{ + MinTime: minTime + 1, + MaxTime: minTime + 2, + Ref: rand.Uint64(), + Chunk: chunkenc.NewXORChunk(), + } + metas = append(metas, chk2) + + input = append(input, &indexWriterSeries{ + labels: lset, + chunks: metas, + }) + } + + iw, err := index.NewWriter(context.Background(), filepath.Join(blockDir, indexFilename)) + if err != nil { + return err + } + + syms := []string{} + for s := range symbols { + syms = append(syms, s) + } + sort.Strings(syms) + for _, s := range syms { + if err := iw.AddSymbol(s); err != nil { + return err + } + } + + // Population procedure as done by compaction. + var ( + postings = index.NewMemPostings() + values = map[string]map[string]struct{}{} + ) + + for i, s := range input { + if err := iw.AddSeries(uint64(i), s.labels, s.chunks...); err != nil { + return err + } + + for _, l := range s.labels { + valset, ok := values[l.Name] + if !ok { + valset = map[string]struct{}{} + values[l.Name] = valset + } + valset[l.Value] = struct{}{} + } + postings.Add(uint64(i), s.labels) + } + + return iw.Close() +} From 40040fc4aa456622ce7776fad504f5af12921351 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 13 Aug 2021 07:27:13 +0200 Subject: [PATCH 3/3] Allow to share indexheader.ReaderPool metrics (#4563) Signed-off-by: Marco Pracucci --- pkg/block/indexheader/reader_pool.go | 20 ++++++++++++++++---- pkg/block/indexheader/reader_pool_test.go | 21 +++++++++++---------- pkg/store/bucket.go | 3 ++- pkg/store/bucket_test.go | 2 +- 4 files changed, 30 insertions(+), 16 deletions(-) diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index 93f1fd88b3..20ebecaaca 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -17,6 +17,18 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" ) +// ReaderPoolMetrics holds metrics tracked by ReaderPool. +type ReaderPoolMetrics struct { + lazyReader *LazyBinaryReaderMetrics +} + +// NewReaderPoolMetrics makes new ReaderPoolMetrics. +func NewReaderPoolMetrics(reg prometheus.Registerer) *ReaderPoolMetrics { + return &ReaderPoolMetrics{ + lazyReader: NewLazyBinaryReaderMetrics(reg), + } +} + // ReaderPool is used to istantiate new index-header readers and keep track of them. // When the lazy reader is enabled, the pool keeps track of all instantiated readers // and automatically close them once the idle timeout is reached. A closed lazy reader @@ -24,8 +36,8 @@ import ( type ReaderPool struct { lazyReaderEnabled bool lazyReaderIdleTimeout time.Duration - lazyReaderMetrics *LazyBinaryReaderMetrics logger log.Logger + metrics *ReaderPoolMetrics // Channel used to signal once the pool is closing. close chan struct{} @@ -36,12 +48,12 @@ type ReaderPool struct { } // NewReaderPool makes a new ReaderPool. -func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, reg prometheus.Registerer) *ReaderPool { +func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTimeout time.Duration, metrics *ReaderPoolMetrics) *ReaderPool { p := &ReaderPool{ logger: logger, + metrics: metrics, lazyReaderEnabled: lazyReaderEnabled, lazyReaderIdleTimeout: lazyReaderIdleTimeout, - lazyReaderMetrics: NewLazyBinaryReaderMetrics(reg), lazyReaders: make(map[*LazyBinaryReader]struct{}), close: make(chan struct{}), } @@ -73,7 +85,7 @@ func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt var err error if p.lazyReaderEnabled { - reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.lazyReaderMetrics, p.onLazyReaderClosed) + reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.onLazyReaderClosed) } else { reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling) } diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go index 3e0e30ee56..e9e243096b 100644 --- a/pkg/block/indexheader/reader_pool_test.go +++ b/pkg/block/indexheader/reader_pool_test.go @@ -60,7 +60,7 @@ func TestReaderPool_NewBinaryReader(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, nil) + pool := NewReaderPool(log.NewNopLogger(), testData.lazyReaderEnabled, testData.lazyReaderIdleTimeout, NewReaderPoolMetrics(nil)) defer pool.Close() r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) @@ -96,7 +96,8 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) - pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, nil) + metrics := NewReaderPoolMetrics(nil) + pool := NewReaderPool(log.NewNopLogger(), true, idleTimeout, metrics) defer pool.Close() r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3) @@ -107,28 +108,28 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { labelNames, err := r.LabelNames() testutil.Ok(t, err) testutil.Equals(t, []string{"a"}, labelNames) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) - testutil.Equals(t, float64(0), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) + testutil.Equals(t, float64(0), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) // Wait enough time before checking it. time.Sleep(idleTimeout * 2) // We expect the reader has been closed, but not released from the pool. testutil.Assert(t, pool.isTracking(r.(*LazyBinaryReader))) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) // Ensure it can still read data (will be re-opened). labelNames, err = r.LabelNames() testutil.Ok(t, err) testutil.Equals(t, []string{"a"}, labelNames) testutil.Assert(t, pool.isTracking(r.(*LazyBinaryReader))) - testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) - testutil.Equals(t, float64(1), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) + testutil.Equals(t, float64(1), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) // We expect an explicit call to Close() to close the reader and release it from the pool too. testutil.Ok(t, r.Close()) testutil.Assert(t, !pool.isTracking(r.(*LazyBinaryReader))) - testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.loadCount)) - testutil.Equals(t, float64(2), promtestutil.ToFloat64(pool.lazyReaderMetrics.unloadCount)) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) + testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index fef930ee1d..8255e1dc18 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -403,7 +403,8 @@ func NewBucketStore( } // Depend on the options - s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, extprom.WrapRegistererWithPrefix("thanos_bucket_store_", s.reg)) + indexReaderPoolMetrics := indexheader.NewReaderPoolMetrics(extprom.WrapRegistererWithPrefix("thanos_bucket_store_", s.reg)) + s.indexReaderPool = indexheader.NewReaderPool(s.logger, lazyIndexReaderEnabled, lazyIndexReaderIdleTimeout, indexReaderPoolMetrics) s.metrics = newBucketStoreMetrics(s.reg) // TODO(metalmatze): Might be possible via Option too if err := os.MkdirAll(dir, 0750); err != nil { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 688ddbbe54..e449684457 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1470,7 +1470,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { bkt: objstore.WithNoopInstr(bkt), logger: logger, indexCache: indexCache, - indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, nil), + indexReaderPool: indexheader.NewReaderPool(log.NewNopLogger(), false, 0, indexheader.NewReaderPoolMetrics(nil)), metrics: newBucketStoreMetrics(nil), blockSets: map[uint64]*bucketBlockSet{ labels.Labels{{Name: "ext1", Value: "1"}}.Hash(): {blocks: [][]*bucketBlock{{b1, b2}}},