Skip to content

Commit

Permalink
e2e/compact: add repro for issue thanos-io#6775 (thanos-io#7333)
Browse files Browse the repository at this point in the history
Adding a minimal test case for issue thanos-io#6775 - reproduces the panic in the
compactor.

Signed-off-by: Giedrius Statkevičius <[email protected]>
Signed-off-by: mluffman <[email protected]>
  • Loading branch information
GiedriusS authored and Nashluffy committed May 14, 2024
1 parent 036a320 commit 3641425
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 0 deletions.
77 changes: 77 additions & 0 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,80 @@ func TestCompactorDownsampleIgnoresMarked(t *testing.T) {
testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_downsample_total"}, e2emon.WaitMissingMetrics()))

}

// TestCompactorIssue6775 tests that the compactor does not crash when
// compacting 5m downsampled blocks with some overlap.
func TestCompactorIssue6775(t *testing.T) {
const minTime = 1710374400014
const maxTime = 1711584000000

logger := log.NewNopLogger()
e, err := e2e.NewDockerEnvironment("c-issue6775")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

dir := filepath.Join(e.SharedDir(), "tmp")
testutil.Ok(t, os.MkdirAll(dir, os.ModePerm))

const bucket = "compact-test"
m := e2edb.NewMinio(e, "minio", bucket, e2edb.WithMinioTLS())
testutil.Ok(t, e2e.StartAndWaitReady(m))

bkt, err := s3.NewBucketWithConfig(logger,
e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed")
testutil.Ok(t, err)

baseBlockDesc := blockDesc{
series: []labels.Labels{
labels.FromStrings("z", "1", "b", "2"),
labels.FromStrings("z", "1", "b", "5"),
},
extLset: labels.FromStrings("case", "downsampled-block-with-overlap"),
mint: minTime,
maxt: maxTime,
}

for i := 0; i < 2; i++ {
rawBlockID, err := baseBlockDesc.Create(context.Background(), dir, 0, metadata.NoneFunc, 1200+i)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(context.Background(), logger, bkt, path.Join(dir, rawBlockID.String()), rawBlockID.String()))
}

// Downsample them first.
bds := e2ethanos.NewToolsBucketDownsample(e, "downsample", client.BucketConfig{
Type: client.S3,
Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()),
})
testutil.Ok(t, bds.Start())

// NOTE(GiedriusS): can't use WaitSumMetrics here because the e2e library doesn't
// work well with histograms.
testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stderr), 1*time.Second, make(<-chan struct{}), func() (rerr error) {
resp, err := http.Get(fmt.Sprintf("http://%s/metrics", bds.Endpoint("http")))
if err != nil {
return fmt.Errorf("getting metrics: %w", err)
}
defer runutil.CloseWithErrCapture(&rerr, resp.Body, "close body")

b, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("reading metrics: %w", err)
}

if !bytes.Contains(b, []byte(`thanos_compact_downsample_duration_seconds_count{resolution="0"} 2`)) {
return fmt.Errorf("failed to find the right downsampling metric")
}

return nil
}))

testutil.Ok(t, bds.Stop())

// Run the compactor.
c := e2ethanos.NewCompactorBuilder(e, "working").Init(client.BucketConfig{
Type: client.S3,
Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.Dir()),
}, nil, "--compact.enable-vertical-compaction")
testutil.NotOk(t, e2e.StartAndWaitReady(c))
testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_iterations_total"}, e2emon.WaitMissingMetrics()))
}
30 changes: 30 additions & 0 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1236,3 +1236,33 @@ func NewRedis(e e2e.Environment, name string) e2e.Runnable {
},
)
}

func NewToolsBucketDownsample(e e2e.Environment, name string, bucketConfig client.BucketConfig) *e2eobs.Observable {
f := e.Runnable(fmt.Sprintf("downsampler-%s", name)).
WithPorts(map[string]int{"http": 8080}).
Future()

bktConfigBytes, err := yaml.Marshal(bucketConfig)
if err != nil {
return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrapf(err, "generate store config file: %v", bucketConfig))}
}

args := []string{"bucket", "downsample"}

args = append(args, e2e.BuildArgs(map[string]string{
"--http-address": ":8080",
"--log.level": "debug",
"--objstore.config": string(bktConfigBytes),
"--data-dir": f.InternalDir(),
})...)

return e2eobs.AsObservable(f.Init(
e2e.StartOptions{
Image: DefaultImage(),
Command: e2e.NewCommand("tools", args...),
User: strconv.Itoa(os.Getuid()),
Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200),
WaitReadyBackoff: &defaultBackoffConfig,
},
), "http")
}

0 comments on commit 3641425

Please sign in to comment.