Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete commit log files that are covered by snapshot files #802

Merged
merged 19 commits into from
Jul 26, 2018
3 changes: 2 additions & 1 deletion src/dbnode/integration/commitlog_bootstrap_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func writeCommitLogDataBase(
}

// ensure commit log is flushing frequently
require.Equal(t, defaultIntegrationTestFlushInterval, opts.FlushInterval())
require.Equal(
t, defaultIntegrationTestFlushInterval, opts.FlushInterval())

var (
seriesLookup = newCommitLogSeriesStates(data)
Expand Down
9 changes: 0 additions & 9 deletions src/dbnode/integration/disk_cleanup_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package integration

import (
"errors"
"os"
"testing"
"time"

Expand Down Expand Up @@ -83,14 +82,6 @@ func writeIndexFileSetFiles(t *testing.T, storageOpts storage.Options, md namesp
}
}

func writeCommitLogs(t *testing.T, filePathPrefix string, fileTimes []time.Time) {
for _, start := range fileTimes {
commitLogFile, _ := fs.NextCommitLogsFile(filePathPrefix, start)
_, err := os.Create(commitLogFile)
require.NoError(t, err)
}
}

type cleanupTimesCommitLog struct {
filePathPrefix string
times []time.Time
Expand Down
19 changes: 17 additions & 2 deletions src/dbnode/integration/disk_cleanup_multi_ns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"testing"
"time"

"github.com/m3db/m3db/src/dbnode/integration/generate"
"github.com/m3db/m3db/src/dbnode/retention"
"github.com/m3db/m3db/src/dbnode/storage/namespace"
xtime "github.com/m3db/m3x/time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -132,8 +134,21 @@ func TestDiskCleanupMultipleNamespace(t *testing.T) {
}()

log.Infof("creating commit log and fileset files")
shard := uint32(0)
writeCommitLogs(t, filePathPrefix, commitLogTimes)
var (
shard = uint32(0)
commitLogOpts = testSetup.storageOpts.CommitLogOptions().
SetFlushInterval(defaultIntegrationTestFlushInterval)
)

for _, clTime := range commitLogTimes {
// Need to generate valid commit log files otherwise cleanup will fail.
data := map[xtime.UnixNano]generate.SeriesBlock{
xtime.ToUnixNano(clTime): nil,
}
writeCommitLogDataSpecifiedTS(
t, testSetup, commitLogOpts, data,
ns1, clTime, false)
}
writeDataFileSetFiles(t, testSetup.storageOpts, ns1, shard, ns1Times)
writeDataFileSetFiles(t, testSetup.storageOpts, ns2, shard, ns2Times)

Expand Down
27 changes: 22 additions & 5 deletions src/dbnode/integration/disk_cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"testing"
"time"

"github.com/m3db/m3db/src/dbnode/integration/generate"
"github.com/m3db/m3db/src/dbnode/storage/namespace"
xtime "github.com/m3db/m3x/time"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -57,15 +60,29 @@ func TestDiskCleanup(t *testing.T) {
}()

// Now create some fileset files and commit logs
shard := uint32(0)
numTimes := 10
fileTimes := make([]time.Time, numTimes)
now := testSetup.getNowFn()
var (
shard = uint32(0)
numTimes = 10
fileTimes = make([]time.Time, numTimes)
now = testSetup.getNowFn()
commitLogOpts = testSetup.storageOpts.CommitLogOptions().
SetFlushInterval(defaultIntegrationTestFlushInterval)
)
ns1, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions())
require.NoError(t, err)
for i := 0; i < numTimes; i++ {
fileTimes[i] = now.Add(time.Duration(i) * blockSize)
}
writeDataFileSetFiles(t, testSetup.storageOpts, md, shard, fileTimes)
writeCommitLogs(t, filePathPrefix, fileTimes)
for _, clTime := range fileTimes {
// Need to generate valid commit log files otherwise cleanup will fail.
data := map[xtime.UnixNano]generate.SeriesBlock{
xtime.ToUnixNano(clTime): nil,
}
writeCommitLogDataSpecifiedTS(
t, testSetup, commitLogOpts,
data, ns1, clTime, false)
}

// Move now forward by retentionPeriod + 2 * blockSize so fileset files
// and commit logs at now will be deleted
Expand Down
13 changes: 13 additions & 0 deletions src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,19 @@ func (d dataPointsInTimeOrder) toSeriesMap(blockSize time.Duration) generate.Ser
return nil
}

// before returns a slice of the dataPointsInTimeOrder that are before the
// specified time t.
func (d dataPointsInTimeOrder) before(t time.Time) dataPointsInTimeOrder {
var i int
for i = range d {
if !d[i].time.Before(t) {
break
}
}

return d[:i]
}

type idGen struct {
baseID string
}
Expand Down
Loading