Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/storage/filestorage] Add CleanupOnStart flag for compaction temporary files #32863

Merged
merged 15 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions extension/storage/filestorage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ The default timeout is `1s`.
`compaction.max_transaction_size` (default: 65536): defines maximum size of the compaction transaction.
A value of zero will ignore transaction sizes.

`compaction.cleanup_on_start` (default: false) - specifies if removal of compaction temporary files is performed on start.
It will remove all the files in the compaction directory starting with tempdb,
pandres-varian marked this conversation as resolved.
Show resolved Hide resolved
temp files will be left if a previous run of the process is killed while compacting.

### Rebound (online) compaction

For rebound compaction, there are two additional parameters available:
Expand Down
33 changes: 33 additions & 0 deletions extension/storage/filestorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -63,6 +64,9 @@ func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compa
}

client := &fileStorageClient{logger: logger, db: db, compactionCfg: compactionCfg, openTimeout: timeout}
if compactionCfg.CleanupOnStart {
client.cleanup(compactionCfg.Directory)
}
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
if compactionCfg.OnRebound {
client.startCompactionLoop(context.Background())
}
Expand Down Expand Up @@ -342,3 +346,32 @@ func moveFileWithFallback(src string, dest string) error {
err = os.Remove(src)
return err
}

// cleanup left compaction temporary files from previous killed process
func (c *fileStorageClient) cleanup(compactionDirectory string) error {
pattern := filepath.Join(compactionDirectory, "tempdb*")
pandres-varian marked this conversation as resolved.
Show resolved Hide resolved
contents, err := filepath.Glob(pattern)
if err != nil {
return err
}
delCont := 0
lockedCont := 0
for _, item := range contents {
err = os.Remove(item)
if err == nil {
delCont++
c.logger.Debug("cleanup",
zap.String("deletedFile", item))
} else {
lockedCont++
c.logger.Debug("cleanup",
zap.String("lockedFile", item),
pandres-varian marked this conversation as resolved.
Show resolved Hide resolved
zap.Error(err))
}

}
c.logger.Info("cleanup summary",
zap.Int("deletedFiles", delCont),
zap.Int("lockedFiles", lockedCont))
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
28 changes: 28 additions & 0 deletions extension/storage/filestorage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,34 @@ func TestClientConcurrentCompaction(t *testing.T) {
}
}

func TestClientCleanupOnStart(t *testing.T) {
tempDir := t.TempDir()
dbFile := filepath.Join(tempDir, "my_db")
temp, _ := os.CreateTemp(tempDir, "tempdb")
// simulate ongoing compaction in another instance
tempLocked, _ := os.CreateTemp(tempDir, "tempdb")
temp.Close()

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{
Directory: tempDir,
CleanupOnStart: true,
}, false)
require.NoError(t, err)

t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
tempLocked.Close()
})

// check if cleanup removed the unlocked file and left db and locked file
files, err := os.ReadDir(tempDir)
require.NoError(t, err)
require.Equal(t, 2, len(files))
require.Equal(t, "my_db", files[0].Name())
_, f := filepath.Split(tempLocked.Name())
require.Equal(t, f, files[1].Name())
}

func BenchmarkClientGet(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")
Expand Down
4 changes: 4 additions & 0 deletions extension/storage/filestorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type CompactionConfig struct {
MaxTransactionSize int64 `mapstructure:"max_transaction_size,omitempty"`
// CheckInterval specifies frequency of compaction check
CheckInterval time.Duration `mapstructure:"check_interval,omitempty"`
// CleanupOnStart specifies removal of temporary files is performed on start.
// It will remove all the files in the compaction directory starting with tempdb,
// temp files will be left if a previous run of the process is killed while compacting.
CleanupOnStart bool `mapstructure:"cleanup_on_start,omitempty"`
}

func (cfg *Config) Validate() error {
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestLoadConfig(t *testing.T) {
ReboundTriggerThresholdMiB: 16,
ReboundNeededThresholdMiB: 128,
CheckInterval: time.Second * 5,
CleanupOnStart: true,
},
Timeout: 2 * time.Second,
FSync: true,
Expand Down
35 changes: 35 additions & 0 deletions extension/storage/filestorage/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,38 @@ func TestCompactionRemoveTemp(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(files))
}

func TestCleanupOnStart(t *testing.T) {
ctx := context.Background()

tempDir := t.TempDir()
// simulate left temporary compaction file from killed process
temp, _ := os.CreateTemp(tempDir, "tempdb")
temp.Close()

f := NewFactory()
cfg := f.CreateDefaultConfig().(*Config)
cfg.Directory = tempDir
cfg.Compaction.Directory = tempDir
cfg.Compaction.CleanupOnStart = true
extension, err := f.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg)
require.NoError(t, err)

se, ok := extension.(storage.Extension)
require.True(t, ok)

client, err := se.GetClient(
ctx,
component.KindReceiver,
newTestEntity("my_component"),
"",
)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(ctx))
})

files, err := os.ReadDir(tempDir)
require.NoError(t, err)
require.Equal(t, 1, len(files))
}
1 change: 1 addition & 0 deletions extension/storage/filestorage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func createDefaultConfig() component.Config {
ReboundNeededThresholdMiB: defaultReboundNeededThresholdMib,
ReboundTriggerThresholdMiB: defaultReboundTriggerThresholdMib,
CheckInterval: defaultCompactionInterval,
CleanupOnStart: false,
},
Timeout: time.Second,
FSync: false,
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ file_storage/all_settings:
rebound_trigger_threshold_mib: 16
rebound_needed_threshold_mib: 128
max_transaction_size: 2048
cleanup_on_start: true
timeout: 2s
fsync: true
Loading