From 4fcb067052caa45e834f1062a412ddb418d5f7ff Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Tue, 29 Oct 2019 13:23:42 +0100 Subject: [PATCH] datamanager: clean old data files keep the last n (now set to 3) data status files and remove all other data status files and unneeded data files. --- internal/datamanager/data.go | 196 ++++++++++- internal/datamanager/datamanager.go | 70 ++-- internal/datamanager/datamanager_test.go | 420 ++++++++++++++++++----- internal/datamanager/wal.go | 40 +++ 4 files changed, 596 insertions(+), 130 deletions(-) diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index e1aec8f9f..4f713fce1 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -16,10 +16,13 @@ package datamanager import ( "bytes" + "container/ring" "context" "encoding/json" "fmt" "io" + "path" + "regexp" "sort" "strings" @@ -32,6 +35,12 @@ import ( const ( DefaultMaxDataFileSize = 10 * 1024 * 1024 + dataStatusToKeep = 3 +) + +var ( + DataFileRegexp = regexp.MustCompile(`^([a-zA-Z0-9]+-[a-zA-Z0-9]+)-([a-zA-Z0-9-]+)\.(data|index)$`) + DataStatusFileRegexp = regexp.MustCompile(`^([a-zA-Z0-9]+-[a-zA-Z0-9]+)\.status$`) ) type DataStatus struct { @@ -527,33 +536,49 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { return bytes.NewReader(de.Data), nil } -func (d *DataManager) GetLastDataStatusPath() (string, error) { +func (d *DataManager) GetLastDataStatusSequences(n int) ([]*sequence.Sequence, error) { + if n < 1 { + return nil, errors.Errorf("n must be greater than 0") + } + r := ring.New(n) + re := r + doneCh := make(chan struct{}) defer close(doneCh) - var dataStatusPath string for object := range d.ost.List(d.storageDataDir()+"/", "", false, doneCh) { if object.Err != nil { - return "", object.Err + return nil, object.Err } - if strings.HasSuffix(object.Path, ".status") { - dataStatusPath = object.Path + if m := DataStatusFileRegexp.FindStringSubmatch(path.Base(object.Path)); m != nil { + seq, err := sequence.Parse(m[1]) + if err != nil { + d.log.Warnf("cannot parse sequence for data status file %q", object.Path) + continue + } + re.Value = seq + re = re.Next() + } else { + d.log.Warnf("bad file %q found in storage data dir", object.Path) } } - if dataStatusPath == "" { - return "", ostypes.ErrNotExist - } - return dataStatusPath, nil -} + dataStatusSequences := []*sequence.Sequence{} + re.Do(func(x interface{}) { + if x != nil { + dataStatusSequences = append([]*sequence.Sequence{x.(*sequence.Sequence)}, dataStatusSequences...) + } + }) -func (d *DataManager) GetLastDataStatus() (*DataStatus, error) { - dataStatusPath, err := d.GetLastDataStatusPath() - if err != nil { - return nil, err + if len(dataStatusSequences) == 0 { + return nil, ostypes.ErrNotExist } - dataStatusf, err := d.ost.ReadObject(dataStatusPath) + return dataStatusSequences, nil +} + +func (d *DataManager) GetDataStatus(dataSequence *sequence.Sequence) (*DataStatus, error) { + dataStatusf, err := d.ost.ReadObject(d.dataStatusPath(dataSequence)) if err != nil { return nil, err } @@ -564,6 +589,24 @@ func (d *DataManager) GetLastDataStatus() (*DataStatus, error) { return dataStatus, dec.Decode(&dataStatus) } +func (d *DataManager) GetLastDataStatusSequence() (*sequence.Sequence, error) { + dataStatusSequences, err := d.GetLastDataStatusSequences(1) + if err != nil { + return nil, err + } + + return dataStatusSequences[0], nil +} + +func (d *DataManager) GetLastDataStatus() (*DataStatus, error) { + dataStatusSequence, err := d.GetLastDataStatusSequence() + if err != nil { + return nil, err + } + + return d.GetDataStatus(dataStatusSequence) +} + func (d *DataManager) Export(ctx context.Context, w io.Writer) error { if err := d.checkpoint(ctx, true); err != nil { return err @@ -725,3 +768,126 @@ func (d *DataManager) Import(ctx context.Context, r io.Reader) error { return nil } + +func (d *DataManager) CleanOldCheckpoints(ctx context.Context) error { + dataStatusSequences, err := d.GetLastDataStatusSequences(dataStatusToKeep) + if err != nil { + return err + } + + return d.cleanOldCheckpoints(ctx, dataStatusSequences) +} + +func (d *DataManager) cleanOldCheckpoints(ctx context.Context, dataStatusSequences []*sequence.Sequence) error { + if len(dataStatusSequences) == 0 { + return nil + } + + lastDataStatusSequence := dataStatusSequences[0] + + // Remove old data status paths + if len(dataStatusSequences) >= dataStatusToKeep { + dataStatusPathsMap := map[string]struct{}{} + for _, seq := range dataStatusSequences { + dataStatusPathsMap[d.dataStatusPath(seq)] = struct{}{} + } + + doneCh := make(chan struct{}) + defer close(doneCh) + for object := range d.ost.List(d.storageDataDir()+"/", "", false, doneCh) { + if object.Err != nil { + return object.Err + } + + skip := false + if m := DataStatusFileRegexp.FindStringSubmatch(path.Base(object.Path)); m != nil { + seq, err := sequence.Parse(m[1]) + if err == nil && seq.String() > lastDataStatusSequence.String() { + d.log.Infof("skipping file %q since its sequence is greater than %q", object.Path, lastDataStatusSequence) + skip = true + } + } + if skip { + continue + } + + if _, ok := dataStatusPathsMap[object.Path]; !ok { + d.log.Infof("removing %q", object.Path) + if err := d.ost.DeleteObject(object.Path); err != nil { + if err != ostypes.ErrNotExist { + return err + } + } + } + } + } + + // A list of files to keep + files := map[string]struct{}{} + + for _, dataStatusSequence := range dataStatusSequences { + dataStatus, err := d.GetDataStatus(dataStatusSequence) + if err != nil { + return err + } + + for dataType := range dataStatus.Files { + for _, file := range dataStatus.Files[dataType] { + files[d.DataFileBasePath(dataType, file.ID)] = struct{}{} + } + } + } + + doneCh := make(chan struct{}) + defer close(doneCh) + + for object := range d.ost.List(d.storageDataDir()+"/", "", true, doneCh) { + if object.Err != nil { + return object.Err + } + + p := object.Path + // object file relative to the storageDataDir + pr := strings.TrimPrefix(p, d.storageDataDir()+"/") + // object file full path without final extension + pne := strings.TrimSuffix(p, path.Ext(p)) + // object file base name + pb := path.Base(p) + + // skip status files + if !strings.Contains(pr, "/") && strings.HasSuffix(pr, ".status") { + continue + } + + // skip data files with a sequence greater than the last known sequence. + // this is to avoid possible conditions where there's a Clean concurrent + // with a running Checkpoint (also if protect by etcd locks, they cannot + // enforce these kind of operations that are acting on resources + // external to etcd during network errors) that will remove the objects + // created by this checkpoint since the data status file doesn't yet + // exist. + skip := false + // extract the data sequence from the object name + if m := DataFileRegexp.FindStringSubmatch(pb); m != nil { + seq, err := sequence.Parse(m[1]) + if err == nil && seq.String() > lastDataStatusSequence.String() { + d.log.Infof("skipping file %q since its sequence is greater than %q", p, lastDataStatusSequence) + skip = true + } + } + if skip { + continue + } + + if _, ok := files[pne]; !ok { + d.log.Infof("removing %q", object.Path) + if err := d.ost.DeleteObject(object.Path); err != nil { + if err != ostypes.ErrNotExist { + return err + } + } + } + } + + return nil +} diff --git a/internal/datamanager/datamanager.go b/internal/datamanager/datamanager.go index c9895df20..911ae2eef 100644 --- a/internal/datamanager/datamanager.go +++ b/internal/datamanager/datamanager.go @@ -34,9 +34,10 @@ import ( // * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one const ( - DefaultCheckpointInterval = 10 * time.Second - DefaultEtcdWalsKeepNum = 100 - DefaultMinCheckpointWalsNum = 100 + DefaultCheckpointInterval = 10 * time.Second + DefaultCheckpointCleanInterval = 5 * time.Minute + DefaultEtcdWalsKeepNum = 100 + DefaultMinCheckpointWalsNum = 100 ) var ( @@ -79,12 +80,13 @@ const ( ) type DataManagerConfig struct { - BasePath string - E *etcd.Store - OST *objectstorage.ObjStorage - DataTypes []string - EtcdWalsKeepNum int - CheckpointInterval time.Duration + BasePath string + E *etcd.Store + OST *objectstorage.ObjStorage + DataTypes []string + EtcdWalsKeepNum int + CheckpointInterval time.Duration + CheckpointCleanInterval time.Duration // MinCheckpointWalsNum is the minimum number of wals required before doing a checkpoint MinCheckpointWalsNum int MaxDataFileSize int64 @@ -92,17 +94,18 @@ type DataManagerConfig struct { } type DataManager struct { - basePath string - log *zap.SugaredLogger - e *etcd.Store - ost *objectstorage.ObjStorage - changes *WalChanges - dataTypes []string - etcdWalsKeepNum int - checkpointInterval time.Duration - minCheckpointWalsNum int - maxDataFileSize int64 - maintenanceMode bool + basePath string + log *zap.SugaredLogger + e *etcd.Store + ost *objectstorage.ObjStorage + changes *WalChanges + dataTypes []string + etcdWalsKeepNum int + checkpointInterval time.Duration + checkpointCleanInterval time.Duration + minCheckpointWalsNum int + maxDataFileSize int64 + maintenanceMode bool } func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerConfig) (*DataManager, error) { @@ -115,6 +118,9 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo if conf.CheckpointInterval == 0 { conf.CheckpointInterval = DefaultCheckpointInterval } + if conf.CheckpointCleanInterval == 0 { + conf.CheckpointCleanInterval = DefaultCheckpointCleanInterval + } if conf.MinCheckpointWalsNum == 0 { conf.MinCheckpointWalsNum = DefaultMinCheckpointWalsNum } @@ -126,17 +132,18 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo } d := &DataManager{ - basePath: conf.BasePath, - log: logger.Sugar(), - e: conf.E, - ost: conf.OST, - changes: NewWalChanges(conf.DataTypes), - dataTypes: conf.DataTypes, - etcdWalsKeepNum: conf.EtcdWalsKeepNum, - checkpointInterval: conf.CheckpointInterval, - minCheckpointWalsNum: conf.MinCheckpointWalsNum, - maxDataFileSize: conf.MaxDataFileSize, - maintenanceMode: conf.MaintenanceMode, + basePath: conf.BasePath, + log: logger.Sugar(), + e: conf.E, + ost: conf.OST, + changes: NewWalChanges(conf.DataTypes), + dataTypes: conf.DataTypes, + etcdWalsKeepNum: conf.EtcdWalsKeepNum, + checkpointInterval: conf.CheckpointInterval, + checkpointCleanInterval: conf.CheckpointCleanInterval, + minCheckpointWalsNum: conf.MinCheckpointWalsNum, + maxDataFileSize: conf.MaxDataFileSize, + maintenanceMode: conf.MaintenanceMode, } // add trailing slash the basepath @@ -231,6 +238,7 @@ func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error { go d.watcherLoop(ctx) go d.syncLoop(ctx) go d.checkpointLoop(ctx) + go d.checkpointCleanLoop(ctx) go d.walCleanerLoop(ctx) go d.compactChangeGroupsLoop(ctx) go d.etcdPingerLoop(ctx) diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index a2604b774..5354cbb48 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -32,6 +32,7 @@ import ( "agola.io/agola/internal/objectstorage/posix" ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/testutil" + "github.com/google/go-cmp/cmp" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -501,6 +502,110 @@ func doAndCheckCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, ac return expectedEntries, nil } +func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error { + // read the data file + curDataStatus, err := dm.GetLastDataStatus() + if err != nil { + return err + } + + allEntriesMap := map[string]*DataEntry{} + + for dataType := range curDataStatus.Files { + var prevLastEntryID string + for i, file := range curDataStatus.Files[dataType] { + dataFileIndexf, err := dm.ost.ReadObject(dm.DataFileIndexPath(dataType, file.ID)) + if err != nil { + return err + } + var dataFileIndex *DataFileIndex + dec := json.NewDecoder(dataFileIndexf) + err = dec.Decode(&dataFileIndex) + if err != nil { + dataFileIndexf.Close() + return err + } + + dataFileIndexf.Close() + dataEntriesMap := map[string]*DataEntry{} + dataEntries := []*DataEntry{} + dataf, err := dm.ost.ReadObject(dm.DataFilePath(dataType, file.ID)) + if err != nil { + return err + } + dec = json.NewDecoder(dataf) + var prevEntryID string + for { + var de *DataEntry + + err := dec.Decode(&de) + if err == io.EOF { + // all done + break + } + if err != nil { + dataf.Close() + return err + } + // check that there are no duplicate entries + if _, ok := allEntriesMap[de.ID]; ok { + return fmt.Errorf("duplicate entry id: %s", de.ID) + } + // check that the entries are in order + if de.ID < prevEntryID { + return fmt.Errorf("previous entry id: %s greater than entry id: %s", prevEntryID, de.ID) + } + + dataEntriesMap[de.ID] = de + dataEntries = append(dataEntries, de) + allEntriesMap[de.ID] = de + } + dataf.Close() + + // check that the index matches the entries + if len(dataFileIndex.Index) != len(dataEntriesMap) { + return fmt.Errorf("index entries: %d different than data entries: %d", len(dataFileIndex.Index), len(dataEntriesMap)) + } + indexIDs := make([]string, len(dataFileIndex.Index)) + entriesIDs := make([]string, len(dataEntriesMap)) + for id := range dataFileIndex.Index { + indexIDs = append(indexIDs, id) + } + for id := range dataEntriesMap { + entriesIDs = append(entriesIDs, id) + } + sort.Strings(indexIDs) + sort.Strings(entriesIDs) + if !reflect.DeepEqual(indexIDs, entriesIDs) { + return fmt.Errorf("index entries ids don't match data entries ids: index: %v, data: %v", indexIDs, entriesIDs) + } + + if file.LastEntryID != dataEntries[len(dataEntries)-1].ID { + return fmt.Errorf("lastEntryID for datafile %d: %s is different than real last entry id: %s", i, file.LastEntryID, dataEntries[len(dataEntries)-1].ID) + } + + // check that all the files are in order + if file.LastEntryID == prevLastEntryID { + return fmt.Errorf("lastEntryID for datafile %d is equal than previous file lastEntryID: %s == %s", i, file.LastEntryID, prevLastEntryID) + } + if file.LastEntryID < prevLastEntryID { + return fmt.Errorf("lastEntryID for datafile %d is less than previous file lastEntryID: %s < %s", i, file.LastEntryID, prevLastEntryID) + } + prevLastEntryID = file.LastEntryID + } + } + + // check that the number of entries is right + if len(allEntriesMap) != len(expectedEntriesMap) { + return fmt.Errorf("expected %d total entries, got %d", len(expectedEntriesMap), len(allEntriesMap)) + } + if !reflect.DeepEqual(expectedEntriesMap, allEntriesMap) { + return fmt.Errorf("expected entries don't match current entries") + } + + return nil +} + // TODO(sgotti) some fuzzy testing will be really good func TestCheckpoint(t *testing.T) { tests := []struct { @@ -734,6 +839,10 @@ func testCheckpoint(t *testing.T, basePath string) { if err != nil { t.Fatalf("unexpected err: %v", err) } + + if err := dm.CleanOldCheckpoints(ctx); err != nil { + t.Fatalf("unexpected err: %v", err) + } } func TestRead(t *testing.T) { @@ -828,108 +937,251 @@ func TestRead(t *testing.T) { } } -func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error { - // read the data file - curDataStatus, err := dm.GetLastDataStatus() +func TestClean(t *testing.T) { + tests := []struct { + name string + basePath string + }{ + { + name: "test with empty basepath", + basePath: "", + }, + { + name: "test with relative basepath", + basePath: "base/path", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testClean(t, tt.basePath) + }) + } +} + +func testClean(t *testing.T, basePath string) { + dir, err := ioutil.TempDir("", "agola") if err != nil { - return err + t.Fatalf("unexpected err: %v", err) } + defer os.RemoveAll(dir) - allEntriesMap := map[string]*DataEntry{} + etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) - for dataType := range curDataStatus.Files { - var prevLastEntryID string - for i, file := range curDataStatus.Files[dataType] { - dataFileIndexf, err := dm.ost.ReadObject(dm.DataFileIndexPath(dataType, file.ID)) - if err != nil { - return err - } - var dataFileIndex *DataFileIndex - dec := json.NewDecoder(dataFileIndexf) - err = dec.Decode(&dataFileIndex) - if err != nil { - dataFileIndexf.Close() - return err - } + ctx := context.Background() - dataFileIndexf.Close() - dataEntriesMap := map[string]*DataEntry{} - dataEntries := []*DataEntry{} - dataf, err := dm.ost.ReadObject(dm.DataFilePath(dataType, file.ID)) - if err != nil { - return err - } - dec = json.NewDecoder(dataf) - var prevEntryID string - for { - var de *DataEntry + ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + ost, err := posix.New(ostDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } - err := dec.Decode(&de) - if err == io.EOF { - // all done - break - } - if err != nil { - dataf.Close() - return err - } - // check that there are no duplicate entries - if _, ok := allEntriesMap[de.ID]; ok { - return fmt.Errorf("duplicate entry id: %s", de.ID) - } - // check that the entries are in order - if de.ID < prevEntryID { - return fmt.Errorf("previous entry id: %s greater than entry id: %s", prevEntryID, de.ID) - } + dmConfig := &DataManagerConfig{ + BasePath: basePath, + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + // remove almost all wals to see that they are removed also from changes + EtcdWalsKeepNum: 1, + DataTypes: []string{"datatype01"}, + // checkpoint also with only one wal + MinCheckpointWalsNum: 1, + // use a small maxDataFileSize + MaxDataFileSize: 10 * 1024, + } + dm, err := NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + dmReadyCh := make(chan struct{}) + go func() { _ = dm.Run(ctx, dmReadyCh) }() + <-dmReadyCh - dataEntriesMap[de.ID] = de - dataEntries = append(dataEntries, de) - allEntriesMap[de.ID] = de - } - dataf.Close() + time.Sleep(5 * time.Second) - // check that the index matches the entries - if len(dataFileIndex.Index) != len(dataEntriesMap) { - return fmt.Errorf("index entries: %d different than data entries: %d", len(dataFileIndex.Index), len(dataEntriesMap)) - } - indexIDs := make([]string, len(dataFileIndex.Index)) - entriesIDs := make([]string, len(dataEntriesMap)) - for id := range dataFileIndex.Index { - indexIDs = append(indexIDs, id) - } - for id := range dataEntriesMap { - entriesIDs = append(entriesIDs, id) - } - sort.Strings(indexIDs) - sort.Strings(entriesIDs) - if !reflect.DeepEqual(indexIDs, entriesIDs) { - return fmt.Errorf("index entries ids don't match data entries ids: index: %v, data: %v", indexIDs, entriesIDs) - } + contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - if file.LastEntryID != dataEntries[len(dataEntries)-1].ID { - return fmt.Errorf("lastEntryID for datafile %d: %s is different than real last entry id: %s", i, file.LastEntryID, dataEntries[len(dataEntries)-1].ID) + var currentEntries map[string]*DataEntry + actions := []*Action{} + for n := 0; n < 10; n++ { + for i := 0; i < 400; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), } + actions = append(actions, action) + } - // check that all the files are in order - if file.LastEntryID == prevLastEntryID { - return fmt.Errorf("lastEntryID for datafile %d is equal than previous file lastEntryID: %s == %s", i, file.LastEntryID, prevLastEntryID) - } - if file.LastEntryID < prevLastEntryID { - return fmt.Errorf("lastEntryID for datafile %d is less than previous file lastEntryID: %s < %s", i, file.LastEntryID, prevLastEntryID) + currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + } + + // get the last data status sequence + lastDataStatusSequences, err := dm.GetLastDataStatusSequences(dataStatusToKeep) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if err := dm.CleanOldCheckpoints(ctx); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // check last data file + if err := checkDataFiles(ctx, t, dm, currentEntries); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // check that only the last dataStatusToKeep status files are left + curDataStatusSequences, err := dm.GetLastDataStatusSequences(1000) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if len(curDataStatusSequences) != dataStatusToKeep { + t.Fatalf("expected %d data status files, got %d: %s", dataStatusToKeep, len(curDataStatusSequences), curDataStatusSequences) + } + if diff := cmp.Diff(lastDataStatusSequences, curDataStatusSequences); diff != "" { + t.Fatalf("different data status sequences: %v", diff) + } +} + +func TestCleanConcurrentCheckpoint(t *testing.T) { + tests := []struct { + name string + basePath string + }{ + { + name: "test with empty basepath", + basePath: "", + }, + { + name: "test with relative basepath", + basePath: "base/path", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testCleanConcurrentCheckpoint(t, tt.basePath) + }) + } +} + +func testCleanConcurrentCheckpoint(t *testing.T, basePath string) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) + + ctx := context.Background() + + ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + ost, err := posix.New(ostDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + dmConfig := &DataManagerConfig{ + BasePath: basePath, + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + // remove almost all wals to see that they are removed also from changes + EtcdWalsKeepNum: 1, + DataTypes: []string{"datatype01"}, + // checkpoint also with only one wal + MinCheckpointWalsNum: 1, + // use a small maxDataFileSize + MaxDataFileSize: 10 * 1024, + } + dm, err := NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + dmReadyCh := make(chan struct{}) + go func() { _ = dm.Run(ctx, dmReadyCh) }() + <-dmReadyCh + + time.Sleep(5 * time.Second) + + contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + var currentEntries map[string]*DataEntry + actions := []*Action{} + for n := 0; n < 10; n++ { + for i := 0; i < 400; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), } - prevLastEntryID = file.LastEntryID + actions = append(actions, action) + } + + currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + if err != nil { + t.Fatalf("unexpected err: %v", err) } } - // check that the number of entries is right - if len(allEntriesMap) != len(expectedEntriesMap) { - return fmt.Errorf("expected %d total entries, got %d", len(expectedEntriesMap), len(allEntriesMap)) + // get the current last data status sequences before doing other actions and checkpoints + dataStatusSequences, err := dm.GetLastDataStatusSequences(dataStatusToKeep) + if err != nil { + t.Fatalf("unexpected err: %v", err) } - if !reflect.DeepEqual(expectedEntriesMap, allEntriesMap) { - return fmt.Errorf("expected entries don't match current entries") + + for i := 0; i < 400; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) } - return nil + if _, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if err := dm.cleanOldCheckpoints(ctx, dataStatusSequences); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // check the datastatus after clean + curDataStatus, err := dm.GetLastDataStatus() + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if curDataStatus.DataSequence <= dataStatusSequences[0].String() { + t.Fatalf("expected data status sequence greater than %q", dataStatusSequences[0]) + } + + // check last data file + if err := checkDataFiles(ctx, t, dm, currentEntries); err != nil { + t.Fatalf("unexpected err: %v", err) + } } func TestExportImport(t *testing.T) { diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index aa97f90ac..6add5456e 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -721,6 +721,46 @@ func (d *DataManager) checkpoint(ctx context.Context, force bool) error { return nil } +func (d *DataManager) checkpointCleanLoop(ctx context.Context) { + for { + d.log.Debugf("checkpointCleanLoop") + if err := d.checkpointClean(ctx); err != nil { + d.log.Errorf("checkpointClean error: %v", err) + } + + sleepCh := time.NewTimer(d.checkpointCleanInterval).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } + } +} + +func (d *DataManager) checkpointClean(ctx context.Context) error { + session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := concurrency.NewMutex(session, etcdCheckpointLockKey) + + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple task updaters will enqueue and start when another + // finishes (unuseful and consume resources) + if err := m.Lock(ctx); err != nil { + return err + } + defer func() { _ = m.Unlock(ctx) }() + + if err := d.CleanOldCheckpoints(ctx); err != nil { + return err + } + + return nil +} + func (d *DataManager) walCleanerLoop(ctx context.Context) { for { d.log.Debugf("walcleaner")