From d8b21e708bee6d19f46ca32b158b0509ca9b7fed Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 27 May 2021 13:42:13 -0400 Subject: [PATCH] tools: Support bucket rewrite relabel (#3707) * support bucket rewrite relabel Signed-off-by: yeya24 * update tests comment Signed-off-by: yeya24 * add changelog Signed-off-by: yeya24 --- CHANGELOG.md | 1 + cmd/thanos/tools_bucket.go | 37 +++++- docs/components/tools.md | 8 ++ pkg/block/fetcher.go | 9 +- pkg/block/metadata/meta.go | 6 +- pkg/compactv2/chunk_series_set.go | 1 + pkg/compactv2/compactor_test.go | 189 ++++++++++++++++++++++++++++++ pkg/compactv2/modifiers.go | 158 ++++++++++++++++++++++++- 8 files changed, 398 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65f5fdf716..036e2a74ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#4125](https://github.com/thanos-io/thanos/pull/4125) Rule: Add `--alert.relabel-config` / `--alert.relabel-config-file` allowing to specify alert relabel configurations like [Prometheus](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) - [#4211](https://github.com/thanos-io/thanos/pull/4211) Add TLS and basic authentication to Thanos APIs - [#4249](https://github.com/thanos-io/thanos/pull/4249) UI: add dark theme +- [#3707](https://github.com/thanos-io/thanos/pull/3707) Tools: Added `--rewrite.to-relabel-config` to bucket rewrite tool to support series relabel from given blocks. ### Fixed - diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 2184f30fa8..d9b312dede 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -29,8 +29,10 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" + v1 "github.com/thanos-io/thanos/pkg/api/blocks" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -819,7 +821,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). Default("").Enum("SHA256", "") dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool() - toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true) + toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", false) + toRelabel := extflag.RegisterPathOrContent(cmd, "rewrite.to-relabel-config", "YAML file that contains relabel configs that will be applied to blocks", false) provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool() promBlocks := cmd.Flag("prom-blocks", "If specified, we assume the blocks to be uploaded are only used with Prometheus so we don't check external labels in this case.").Default("false").Bool() cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { @@ -833,15 +836,36 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat return err } - deletionsYaml, err := toDelete.Content() + var modifiers []compactv2.Modifier + + relabelYaml, err := toRelabel.Content() if err != nil { return err } + var relabels []*relabel.Config + if len(relabelYaml) > 0 { + relabels, err = block.ParseRelabelConfig(relabelYaml, nil) + if err != nil { + return err + } + modifiers = append(modifiers, compactv2.WithRelabelModifier(relabels...)) + } - var deletions []metadata.DeletionRequest - if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil { + deletionsYaml, err := toDelete.Content() + if err != nil { return err } + var deletions []metadata.DeletionRequest + if len(deletionsYaml) > 0 { + if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil { + return err + } + modifiers = append(modifiers, compactv2.WithDeletionModifier(deletions...)) + } + + if len(modifiers) == 0 { + return errors.New("rewrite configuration should be provided") + } var ids []ulid.ULID for _, id := range *blockIDs { @@ -885,6 +909,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat meta.Thanos.Rewrites = append(meta.Thanos.Rewrites, metadata.Rewrite{ Sources: meta.Compaction.Sources, DeletionsApplied: deletions, + RelabelsApplied: relabels, }) meta.Compaction.Sources = []ulid.ULID{newID} meta.Thanos.Source = metadata.BucketRewriteSource @@ -916,8 +941,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat comp = compactv2.New(*tmpDir, logger, changeLog, chunkPool) } - level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml)) - if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, compactv2.WithDeletionModifier(deletions...)); err != nil { + level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml), "toRelabel", string(relabelYaml)) + if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, modifiers...); err != nil { return errors.Wrapf(err, "writing series from %v to %v", id, newID) } diff --git a/docs/components/tools.md b/docs/components/tools.md index 2d66d89607..b69c45d227 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -769,6 +769,14 @@ Flags: Path to YAML file that contains []metadata.DeletionRequest that will be applied to blocks + --rewrite.to-relabel-config= + Alternative to 'rewrite.to-relabel-config-file' + flag (mutually exclusive). Content of YAML file + that contains relabel configs that will be + applied to blocks + --rewrite.to-relabel-config-file= + Path to YAML file that contains relabel configs + that will be applied to blocks --tmp.dir="/tmp/thanos-rewrite" Working directory for temporary files --tracing.config= diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 0fd2458f1d..7c9ca71e3c 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -879,15 +879,18 @@ var ( ) // ParseRelabelConfig parses relabel configuration. +// If supportedActions not specified, all relabel actions are valid. func ParseRelabelConfig(contentYaml []byte, supportedActions map[relabel.Action]struct{}) ([]*relabel.Config, error) { var relabelConfig []*relabel.Config if err := yaml.Unmarshal(contentYaml, &relabelConfig); err != nil { return nil, errors.Wrap(err, "parsing relabel configuration") } - for _, cfg := range relabelConfig { - if _, ok := supportedActions[cfg.Action]; !ok { - return nil, errors.Errorf("unsupported relabel action: %v", cfg.Action) + if supportedActions != nil { + for _, cfg := range relabelConfig { + if _, ok := supportedActions[cfg.Action]; !ok { + return nil, errors.Errorf("unsupported relabel action: %v", cfg.Action) + } } } diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index f88aae6a20..0cbda37e19 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -19,12 +19,14 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/tombstones" - "github.com/thanos-io/thanos/pkg/runutil" "gopkg.in/yaml.v3" + + "github.com/thanos-io/thanos/pkg/runutil" ) type SourceType string @@ -95,6 +97,8 @@ type Rewrite struct { Sources []ulid.ULID `json:"sources,omitempty"` // Deletions if applied (in order). DeletionsApplied []DeletionRequest `json:"deletions_applied,omitempty"` + // Relabels if applied. + RelabelsApplied []*relabel.Config `json:"relabels_applied,omitempty"` } type Matchers []*labels.Matcher diff --git a/pkg/compactv2/chunk_series_set.go b/pkg/compactv2/chunk_series_set.go index 6bcaa2abf6..a02139c4b2 100644 --- a/pkg/compactv2/chunk_series_set.go +++ b/pkg/compactv2/chunk_series_set.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" + "github.com/thanos-io/thanos/pkg/block" ) diff --git a/pkg/compactv2/compactor_test.go b/pkg/compactv2/compactor_test.go index e5c979a053..f66ec927bd 100644 --- a/pkg/compactv2/compactor_test.go +++ b/pkg/compactv2/compactor_test.go @@ -17,12 +17,15 @@ import ( "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/testutil" @@ -397,6 +400,192 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) { NumChunks: 12, }, }, + { + name: "1 block + relabel modifier, two chunks from the same series are merged into one larger chunk", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}}}}, + }, + }, + // Not used in this test case. + modifiers: []Modifier{WithRelabelModifier( + &relabel.Config{ + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("no-match"), + SourceLabels: model.LabelNames{"a"}, + }, + )}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 6, + NumSeries: 1, + NumChunks: 1, + }, + }, + { + name: "1 block + relabel modifier, delete first series", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 13}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithRelabelModifier( + &relabel.Config{ + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("1"), + SourceLabels: model.LabelNames{"a"}, + }, + )}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 13}, {11, 11}, {20, 20}}}}, + }, + expectedChanges: "Deleted {a=\"1\"} [{0 20}]\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 13, + NumSeries: 2, + NumChunks: 2, + }, + }, + { + name: "1 block + relabel modifier, series reordered", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, -1}, {2, -2}, {10, -10}, {11, -11}, {20, -20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + }, + // {a="1"} will be relabeled to {a="3"} while {a="2"} will be relabeled to {a="0"}. + modifiers: []Modifier{WithRelabelModifier( + &relabel.Config{ + Action: relabel.Replace, + Regex: relabel.MustNewRegexp("1"), + SourceLabels: model.LabelNames{"a"}, + TargetLabel: "a", + Replacement: "3", + }, + &relabel.Config{ + Action: relabel.Replace, + Regex: relabel.MustNewRegexp("2"), + SourceLabels: model.LabelNames{"a"}, + TargetLabel: "a", + Replacement: "0", + }, + )}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "0"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, -1}, {2, -2}, {10, -10}, {11, -11}, {20, -20}}}}, + }, + expectedChanges: "Relabelled {a=\"1\"} {a=\"3\"}\nRelabelled {a=\"2\"} {a=\"0\"}\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 13, + NumSeries: 2, + NumChunks: 2, + }, + }, + { + name: "1 block + relabel modifier, series deleted because of no labels left after relabel", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + { + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + }, + // Drop all label name "a". + modifiers: []Modifier{WithRelabelModifier( + &relabel.Config{ + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("a"), + }, + )}, + expected: nil, + expectedChanges: "Deleted {a=\"1\"} [{0 25}]\nDeleted {a=\"2\"} [{0 25}]\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 0, + NumSeries: 0, + NumChunks: 0, + }, + }, + { + name: "1 block + relabel modifier, series 1 is deleted because of no labels left after relabel", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + { + {lset: labels.Labels{{Name: "a", Value: "2"}, {Name: "b", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + }, + // Drop all label name "a". + modifiers: []Modifier{WithRelabelModifier( + &relabel.Config{ + Action: relabel.LabelDrop, + Regex: relabel.MustNewRegexp("a"), + }, + )}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "b", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + expectedChanges: "Deleted {a=\"1\"} [{0 25}]\nRelabelled {a=\"2\", b=\"1\"} {b=\"1\"}\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 7, + NumSeries: 1, + NumChunks: 1, + }, + }, + { + name: "1 block + relabel modifier, series merged after relabeling", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {10, 10}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {2, 2}, {3, 3}}, {{4, 4}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + }, + // Replace values of label name "a" with "0". + modifiers: []Modifier{WithRelabelModifier( + &relabel.Config{ + Action: relabel.Replace, + Regex: relabel.MustNewRegexp("1|2"), + SourceLabels: model.LabelNames{"a"}, + TargetLabel: "a", + Replacement: "0", + }, + )}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "0"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {10, 10}, {11, 11}, {20, 20}, {25, 25}}}}, + }, + expectedChanges: "Relabelled {a=\"1\"} {a=\"0\"}\nRelabelled {a=\"2\"} {a=\"0\"}\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 9, + NumSeries: 1, + NumChunks: 1, + }, + }, } { t.Run(tcase.name, func(t *testing.T) { tmpDir, err := ioutil.TempDir("", "test-series-writer") diff --git a/pkg/compactv2/modifiers.go b/pkg/compactv2/modifiers.go index aef4ec065e..28ff81ded5 100644 --- a/pkg/compactv2/modifiers.go +++ b/pkg/compactv2/modifiers.go @@ -4,13 +4,19 @@ package compactv2 import ( + "math" + "sort" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/thanos-io/thanos/pkg/block/metadata" ) @@ -325,4 +331,154 @@ func (p *delChunkSeriesIterator) Next() bool { func (p *delChunkSeriesIterator) At() chunks.Meta { return p.curr } -// TODO(bwplotka): Add relabelling. +type RelabelModifier struct { + relabels []*relabel.Config +} + +func WithRelabelModifier(relabels ...*relabel.Config) *RelabelModifier { + return &RelabelModifier{relabels: relabels} +} + +func (d *RelabelModifier) Modify(_ index.StringIter, set storage.ChunkSeriesSet, log ChangeLogger, p ProgressLogger) (index.StringIter, storage.ChunkSeriesSet) { + // Gather symbols. + symbols := make(map[string]struct{}) + chunkSeriesMap := make(map[string]*mergeChunkSeries) + + for set.Next() { + s := set.At() + lbls := s.Labels() + chksIter := s.Iterator() + + if processedLabels := relabel.Process(lbls, d.relabels...); len(processedLabels) == 0 { + // Special case: Delete whole series if no labels are present. + var ( + minT int64 = math.MaxInt64 + maxT int64 = math.MinInt64 + ) + for chksIter.Next() { + c := chksIter.At() + if c.MinTime < minT { + minT = c.MinTime + } + if c.MaxTime > maxT { + maxT = c.MaxTime + } + } + + if err := chksIter.Err(); err != nil { + return errorOnlyStringIter{err: err}, nil + } + + var deleted tombstones.Intervals + // If minTime is set then there is at least one chunk. + if minT != math.MaxInt64 { + deleted = deleted.Add(tombstones.Interval{Mint: minT, Maxt: maxT}) + } + log.DeleteSeries(lbls, deleted) + p.SeriesProcessed() + } else { + for _, lb := range processedLabels { + symbols[lb.Name] = struct{}{} + symbols[lb.Value] = struct{}{} + } + + lbStr := processedLabels.String() + if _, ok := chunkSeriesMap[lbStr]; !ok { + chunkSeriesMap[lbStr] = newChunkSeriesBuilder(processedLabels) + } + cs := chunkSeriesMap[lbStr] + + // We have to iterate over the chunks and populate them here as + // lazyPopulateChunkSeriesSet reuses chunks and previous chunks + // will be overwritten at set.Next() call. + for chksIter.Next() { + c := chksIter.At() + cs.addIter(c.Chunk.Iterator(nil)) + } + if err := chksIter.Err(); err != nil { + return errorOnlyStringIter{err}, nil + } + + if !labels.Equal(lbls, processedLabels) { + log.ModifySeries(lbls, processedLabels) + } + } + } + + symbolsSlice := make([]string, 0, len(symbols)) + for s := range symbols { + symbolsSlice = append(symbolsSlice, s) + } + sort.Strings(symbolsSlice) + + chunkSeriesSet := make([]storage.ChunkSeries, 0, len(chunkSeriesMap)) + for _, chunkSeries := range chunkSeriesMap { + chunkSeriesSet = append(chunkSeriesSet, chunkSeries) + } + sort.Slice(chunkSeriesSet, func(i, j int) bool { + return labels.Compare(chunkSeriesSet[i].Labels(), chunkSeriesSet[j].Labels()) < 0 + }) + return index.NewStringListIter(symbolsSlice), newListChunkSeriesSet(chunkSeriesSet...) +} + +// mergeChunkSeries build storage.ChunkSeries from several chunkenc.Iterator. +type mergeChunkSeries struct { + lset labels.Labels + ss []storage.Series +} + +func newChunkSeriesBuilder(lset labels.Labels) *mergeChunkSeries { + return &mergeChunkSeries{ + lset: lset, + ss: make([]storage.Series, 0), + } +} + +func (s *mergeChunkSeries) addIter(iter chunkenc.Iterator) { + s.ss = append(s.ss, &storage.SeriesEntry{ + SampleIteratorFn: func() chunkenc.Iterator { + return iter + }, + }) +} + +func (s *mergeChunkSeries) Labels() labels.Labels { + return s.lset +} + +func (s *mergeChunkSeries) Iterator() chunks.Iterator { + if len(s.ss) == 0 { + return nil + } + if len(s.ss) == 1 { + return storage.NewSeriesToChunkEncoder(s.ss[0]).Iterator() + } + + return storage.NewSeriesToChunkEncoder(storage.ChainedSeriesMerge(s.ss...)).Iterator() +} + +type errorOnlyStringIter struct { + err error +} + +func (errorOnlyStringIter) Next() bool { return false } +func (errorOnlyStringIter) At() string { return "" } +func (s errorOnlyStringIter) Err() error { return s.err } + +type listChunkSeriesSet struct { + css []storage.ChunkSeries + idx int +} + +func newListChunkSeriesSet(css ...storage.ChunkSeries) storage.ChunkSeriesSet { + return &listChunkSeriesSet{css: css, idx: -1} +} + +func (s *listChunkSeriesSet) Next() bool { + s.idx++ + return s.idx < len(s.css) +} + +func (s *listChunkSeriesSet) At() storage.ChunkSeries { return s.css[s.idx] } +func (s *listChunkSeriesSet) Err() error { return nil } +func (s *listChunkSeriesSet) Warnings() storage.Warnings { return nil }