diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/shipper/compactor/compactor_test.go index 4a8a1645beb70..e330d5b92592c 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_test.go +++ b/pkg/storage/stores/shipper/compactor/compactor_test.go @@ -1,16 +1,40 @@ package compactor import ( + "context" "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" "testing" "time" "github.com/cortexproject/cortex/pkg/chunk" - "github.com/prometheus/common/model" + "github.com/cortexproject/cortex/pkg/chunk/local" + "github.com/cortexproject/cortex/pkg/chunk/storage" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + + loki_storage "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" ) +func setupTestCompactor(t *testing.T, tempDir string) *Compactor { + cfg := Config{} + flagext.DefaultValues(&cfg) + cfg.WorkingDirectory = filepath.Join(tempDir, workingDirName) + cfg.SharedStoreType = "filesystem" + cfg.RetentionEnabled = false + + c, err := NewCompactor(cfg, storage.Config{FSConfig: local.FSConfig{Directory: tempDir}}, loki_storage.SchemaConfig{}, nil, nil) + require.NoError(t, err) + + return c +} + func TestIsDefaults(t *testing.T) { for i, tc := range []struct { in *Config @@ -71,3 +95,76 @@ func TestExtractIntervalFromTableName(t *testing.T) { } } + +func TestCompactor_RunCompaction(t *testing.T) { + tempDir, err := ioutil.TempDir("", "compactor-run-compaction") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() + + tablesPath := filepath.Join(tempDir, "index") + tablesCopyPath := filepath.Join(tempDir, "index-copy") + + tables := map[string]map[string]testutil.DBRecords{ + "table1": { + "db1": { + Start: 0, + NumRecords: 10, + }, + "db2": { + Start: 10, + NumRecords: 10, + }, + "db3": { + Start: 20, + NumRecords: 10, + }, + "db4": { + Start: 30, + NumRecords: 10, + }, + }, + "table2": { + "db1": { + Start: 40, + NumRecords: 10, + }, + "db2": { + Start: 50, + NumRecords: 10, + }, + "db3": { + Start: 60, + NumRecords: 10, + }, + "db4": { + Start: 70, + NumRecords: 10, + }, + }, + } + + for name, dbs := range tables { + testutil.SetupDBTablesAtPath(t, name, tablesPath, dbs, false) + + // setup exact same copy of dbs for comparison. + testutil.SetupDBTablesAtPath(t, name, tablesCopyPath, dbs, false) + } + + compactor := setupTestCompactor(t, tempDir) + err = compactor.RunCompaction(context.Background()) + require.NoError(t, err) + + for name := range tables { + // verify that we have only 1 file left in storage after compaction. + files, err := ioutil.ReadDir(filepath.Join(tablesPath, name)) + require.NoError(t, err) + require.Len(t, files, 1) + require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + + // verify we have all the kvs in compacted db which were there in source dbs. + compareCompactedDB(t, filepath.Join(tablesPath, name, files[0].Name()), filepath.Join(tablesCopyPath, name)) + } +} diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration.go b/pkg/storage/stores/shipper/compactor/retention/expiration.go index 301b0b9ddccad..5cf9c9fdf2d7c 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration.go @@ -50,7 +50,7 @@ func (e *expirationChecker) MarkPhaseFailed() {} func (e *expirationChecker) MarkPhaseFinished() {} func (e *expirationChecker) IntervalHasExpiredChunks(interval model.Interval) bool { - return e.earliestRetentionStartTime.Before(interval.Start) || e.earliestRetentionStartTime.Before(interval.End) + return interval.Start.Before(e.earliestRetentionStartTime) } type TenantsRetention struct { diff --git a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go index f193041dcbc4b..d86b602e1a84e 100644 --- a/pkg/storage/stores/shipper/compactor/retention/expiration_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/expiration_test.go @@ -208,3 +208,49 @@ func TestFindEarliestRetentionStartTime(t *testing.T) { }) } } + +func TestExpirationChecker_IntervalHasExpiredChunks(t *testing.T) { + for _, tc := range []struct { + name string + expirationChecker expirationChecker + interval model.Interval + hasExpiredChunks bool + }{ + { + name: "not expired", + expirationChecker: expirationChecker{ + earliestRetentionStartTime: model.Now().Add(-24 * time.Hour), + }, + interval: model.Interval{ + Start: model.Now().Add(-time.Hour), + End: model.Now(), + }, + }, + { + name: "partially expired", + expirationChecker: expirationChecker{ + earliestRetentionStartTime: model.Now().Add(-24 * time.Hour), + }, + interval: model.Interval{ + Start: model.Now().Add(-25 * time.Hour), + End: model.Now().Add(-22 * time.Hour), + }, + hasExpiredChunks: true, + }, + { + name: "fully expired", + expirationChecker: expirationChecker{ + earliestRetentionStartTime: model.Now().Add(-24 * time.Hour), + }, + interval: model.Interval{ + Start: model.Now().Add(-26 * time.Hour), + End: model.Now().Add(-25 * time.Hour), + }, + hasExpiredChunks: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.hasExpiredChunks, tc.expirationChecker.IntervalHasExpiredChunks(tc.interval)) + }) + } +}