Skip to content

Commit

Permalink
Receive: Add parameter to set out-of-order time window (thanos-io#5839)
Browse files Browse the repository at this point in the history
* Add flag to specify out-of-order time window

Signed-off-by: Matej Gera <[email protected]>

* Handle new sample error

Signed-off-by: Matej Gera <[email protected]>

* Adjust CHANGELOG

Signed-off-by: Matej Gera <[email protected]>

* Add out-of-order cap max parameter

Signed-off-by: Matej Gera <[email protected]>

* Update docs

Signed-off-by: Matej Gera <[email protected]>

* Add warning about enabling vertical compaction

Signed-off-by: Matej Gera <[email protected]>

Signed-off-by: Matej Gera <[email protected]>
  • Loading branch information
matej-g authored and Nathaniel Graham committed May 18, 2023
1 parent db79cff commit 04dad70
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#5814](https://github.com/thanos-io/thanos/pull/5814) Store: Add metric `thanos_bucket_store_postings_size_bytes` that shows the distribution of how many postings (in bytes) were needed for each Series() call in Thanos Store. Useful for determining limits.
- [#5801](https://github.com/thanos-io/thanos/pull/5801) Store: add a new limiter `--store.grpc.downloaded-bytes-limit` that limits the number of bytes downloaded in each Series/LabelNames/LabelValues call. Use `thanos_bucket_store_postings_size_bytes` for determining the limits.
- [#5839](https://github.com/thanos-io/thanos/pull/5839) Receive: Add parameter `--tsdb.out-of-order.time-window` to set time window for experimental out-of-order samples ingestion. Disabled by default (set to 0s). Please note if you enable this option and you use compactor, make sure you set the `--enable-vertical-compaction` flag, otherwise you might risk compactor halt.
- [#5836](https://github.com/thanos-io/thanos/pull/5836) Receive: Add hidden flag `tsdb.memory-snapshot-on-shutdown` to enable experimental TSDB feature to snapshot on shutdown. This is intended to speed up receiver restart.

### Changed
Expand Down
13 changes: 13 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func registerReceive(app *extkingpin.App) {
MinBlockDuration: int64(time.Duration(*conf.tsdbMinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(*conf.tsdbMaxBlockDuration) / time.Millisecond),
RetentionDuration: int64(time.Duration(*conf.retention) / time.Millisecond),
OutOfOrderTimeWindow: int64(time.Duration(*conf.tsdbOutOfOrderTimeWindow) / time.Millisecond),
OutOfOrderCapMax: conf.tsdbOutOfOrderCapMax,
NoLockfile: conf.noLockFile,
WALCompression: conf.walCompression,
MaxExemplars: conf.tsdbMaxExemplars,
Expand Down Expand Up @@ -775,6 +777,8 @@ type receiveConfig struct {

tsdbMinBlockDuration *model.Duration
tsdbMaxBlockDuration *model.Duration
tsdbOutOfOrderTimeWindow *model.Duration
tsdbOutOfOrderCapMax int64
tsdbAllowOverlappingBlocks bool
tsdbMaxExemplars int64
tsdbWriteQueueSize int64
Expand Down Expand Up @@ -861,6 +865,15 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.tsdbMaxBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())

rc.tsdbOutOfOrderTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.out-of-order.time-window",
"[EXPERIMENTAL] Configures the allowed time window for ingestion of out-of-order samples. Disabled (0s) by default"+
"Please note if you enable this option and you use compactor, make sure you have the --enable-vertical-compaction flag enabled, otherwise you might risk compactor halt.",
).Default("0s").Hidden())

cmd.Flag("tsdb.out-of-order.cap-max",
"[EXPERIMENTAL] Configures the maximum capacity for out-of-order chunks (in samples). If set to <=0, default value 32 is assumed.",
).Default("0").Int64Var(&rc.tsdbOutOfOrderCapMax)

cmd.Flag("tsdb.allow-overlapping-blocks", "Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge. Does not do anything, enabled all the time.").Default("false").BoolVar(&rc.tsdbAllowOverlappingBlocks)

cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").BoolVar(&rc.walCompression)
Expand Down
4 changes: 4 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ Flags:
--tsdb.no-lockfile Do not create lockfile in TSDB data directory.
In any case, the lockfiles will be deleted on
next startup.
--tsdb.out-of-order.cap-max=0
[EXPERIMENTAL] Configures the maximum capacity
for out-of-order chunks (in samples). If set to
<=0, default value 32 is assumed.
--tsdb.path="./data" Data directory of TSDB.
--tsdb.retention=15d How long to retain raw samples on local
storage. 0d - disables this retention.
Expand Down
8 changes: 8 additions & 0 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
numSamplesOutOfOrder = 0
numSamplesDuplicates = 0
numSamplesOutOfBounds = 0
numSamplesTooOld = 0

numExemplarsOutOfOrder = 0
numExemplarsDuplicate = 0
Expand Down Expand Up @@ -120,6 +121,9 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
case storage.ErrOutOfBounds:
numSamplesOutOfBounds++
level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
case storage.ErrTooOldSample:
numSamplesTooOld++
level.Debug(tLogger).Log("msg", "Sample is too old", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
default:
if err != nil {
level.Debug(tLogger).Log("msg", "Error ingesting sample", "err", err)
Expand Down Expand Up @@ -185,6 +189,10 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numSamplesOutOfBounds)
errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numSamplesOutOfBounds))
}
if numSamplesTooOld > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples that are outside of the allowed out-of-order time window", "numDropped", numSamplesTooOld)
errs.Add(errors.Wrapf(storage.ErrTooOldSample, "add %d samples", numSamplesTooOld))
}

if numExemplarsOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order exemplars", "numDropped", numExemplarsOutOfOrder)
Expand Down

0 comments on commit 04dad70

Please sign in to comment.