diff --git a/src/dbnode/integration/commitlog_bootstrap_helpers.go b/src/dbnode/integration/commitlog_bootstrap_helpers.go index 83067918a8..360482f791 100644 --- a/src/dbnode/integration/commitlog_bootstrap_helpers.go +++ b/src/dbnode/integration/commitlog_bootstrap_helpers.go @@ -144,7 +144,8 @@ func writeCommitLogDataBase( } // ensure commit log is flushing frequently - require.Equal(t, defaultIntegrationTestFlushInterval, opts.FlushInterval()) + require.Equal( + t, defaultIntegrationTestFlushInterval, opts.FlushInterval()) var ( seriesLookup = newCommitLogSeriesStates(data) diff --git a/src/dbnode/integration/disk_cleanup_helpers.go b/src/dbnode/integration/disk_cleanup_helpers.go index 11b130c02c..a99e23ee61 100644 --- a/src/dbnode/integration/disk_cleanup_helpers.go +++ b/src/dbnode/integration/disk_cleanup_helpers.go @@ -24,7 +24,6 @@ package integration import ( "errors" - "os" "testing" "time" @@ -83,14 +82,6 @@ func writeIndexFileSetFiles(t *testing.T, storageOpts storage.Options, md namesp } } -func writeCommitLogs(t *testing.T, filePathPrefix string, fileTimes []time.Time) { - for _, start := range fileTimes { - commitLogFile, _ := fs.NextCommitLogsFile(filePathPrefix, start) - _, err := os.Create(commitLogFile) - require.NoError(t, err) - } -} - type cleanupTimesCommitLog struct { filePathPrefix string times []time.Time diff --git a/src/dbnode/integration/disk_cleanup_multi_ns_test.go b/src/dbnode/integration/disk_cleanup_multi_ns_test.go index 645caa776b..b4276f7991 100644 --- a/src/dbnode/integration/disk_cleanup_multi_ns_test.go +++ b/src/dbnode/integration/disk_cleanup_multi_ns_test.go @@ -26,8 +26,10 @@ import ( "testing" "time" + "github.com/m3db/m3db/src/dbnode/integration/generate" "github.com/m3db/m3db/src/dbnode/retention" "github.com/m3db/m3db/src/dbnode/storage/namespace" + xtime "github.com/m3db/m3x/time" "github.com/stretchr/testify/require" ) @@ -133,7 +135,20 @@ func TestDiskCleanupMultipleNamespace(t *testing.T) { log.Infof("creating commit log and fileset files") shard := uint32(0) - writeCommitLogs(t, filePathPrefix, commitLogTimes) + for _, clTime := range commitLogTimes { + // Need to generate valid commit log files otherwise cleanup will fail. + data := map[xtime.UnixNano]generate.SeriesBlock{ + xtime.ToUnixNano(clTime): nil, + } + writeCommitLogDataSpecifiedTS( + t, + testSetup, + testSetup.storageOpts.CommitLogOptions().SetFlushInterval(defaultIntegrationTestFlushInterval), + data, + ns1.ID(), + clTime, + ) + } writeDataFileSetFiles(t, testSetup.storageOpts, ns1, shard, ns1Times) writeDataFileSetFiles(t, testSetup.storageOpts, ns2, shard, ns2Times) diff --git a/src/dbnode/integration/disk_cleanup_test.go b/src/dbnode/integration/disk_cleanup_test.go index 8bfb42ff06..30c25e4989 100644 --- a/src/dbnode/integration/disk_cleanup_test.go +++ b/src/dbnode/integration/disk_cleanup_test.go @@ -26,6 +26,9 @@ import ( "testing" "time" + "github.com/m3db/m3db/src/dbnode/integration/generate" + "github.com/m3db/m3x/ident" + xtime "github.com/m3db/m3x/time" "github.com/stretchr/testify/require" ) @@ -65,7 +68,20 @@ func TestDiskCleanup(t *testing.T) { fileTimes[i] = now.Add(time.Duration(i) * blockSize) } writeDataFileSetFiles(t, testSetup.storageOpts, md, shard, fileTimes) - writeCommitLogs(t, filePathPrefix, fileTimes) + for _, clTime := range fileTimes { + // Need to generate valid commit log files otherwise cleanup will fail. + data := map[xtime.UnixNano]generate.SeriesBlock{ + xtime.ToUnixNano(clTime): nil, + } + writeCommitLogDataSpecifiedTS( + t, + testSetup, + testSetup.storageOpts.CommitLogOptions().SetFlushInterval(defaultIntegrationTestFlushInterval), + data, + ident.StringID("some-ns"), + clTime, + ) + } // Move now forward by retentionPeriod + 2 * blockSize so fileset files // and commit logs at now will be deleted diff --git a/src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go b/src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go index 9988aab454..ebe21ba193 100644 --- a/src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go +++ b/src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go @@ -317,6 +317,19 @@ func (d dataPointsInTimeOrder) toSeriesMap(blockSize time.Duration) generate.Ser return nil } +// before returns a slice of the dataPointsInTimeOrder that are before the +// specified time t. +func (d dataPointsInTimeOrder) before(t time.Time) dataPointsInTimeOrder { + var i int + for i = range d { + if !d[i].time.Before(t) { + break + } + } + + return d[:i] +} + type idGen struct { baseID string } diff --git a/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go b/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go new file mode 100644 index 0000000000..936021567e --- /dev/null +++ b/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go @@ -0,0 +1,283 @@ +// +build integration + +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "fmt" + "math/rand" + "os" + "testing" + "time" + + "github.com/m3db/m3db/src/dbnode/retention" + "github.com/m3db/m3db/src/dbnode/storage/namespace" + "github.com/m3db/m3x/context" + xtime "github.com/m3db/m3x/time" + + "github.com/leanovate/gopter" + "github.com/leanovate/gopter/gen" + "github.com/leanovate/gopter/prop" + "github.com/stretchr/testify/require" +) + +const maxBlockSize = 12 * time.Hour +const maxPoints = 1000 +const minSuccessfulTests = 8 +const maxFlushWaitTime = time.Minute + +// This integration test uses property testing to make sure that the node +// can properly bootstrap all the data from a combination of fileset files, +// snapshotfiles, and commit log files. It varies the following inputs to +// the system: +// 1) block size +// 2) buffer past +// 3) buffer future +// 4) number of datapoints +// 5) whether it waits for data files to be flushed before shutting down +// 6) whether it waits for snapshot files to be written before shutting down +// +// It works by generating random datapoints, and then writing those data points +// to the node in order. At randomly selected times during the write process, the +// node will turn itself off and then bootstrap itself before resuming. +func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) { + if testing.Short() { + t.SkipNow() // Just skip if we're doing a short run + } + + var ( + parameters = gopter.DefaultTestParameters() + seed = time.Now().UnixNano() + props = gopter.NewProperties(parameters) + reporter = gopter.NewFormatedReporter(true, 160, os.Stdout) + fakeStart = time.Date(2017, time.February, 13, 15, 30, 10, 0, time.Local) + rng = rand.New(rand.NewSource(seed)) + ) + + parameters.MinSuccessfulTests = minSuccessfulTests + parameters.Rng.Seed(seed) + + props.Property( + "Node can bootstrap all data from filesetfiles, snapshotfiles, and commit log files", prop.ForAll( + func(input propTestInput) (bool, error) { + // Test setup + var ( + // Round to a second to prevent interactions between the RPC client + // and the node itself when blocksize is not rounded down to a second. + ns1BlockSize = input.blockSize.Round(time.Second) + commitLogBlockSize = 15 * time.Minute + // Make sure randomly generated data never falls out of retention + // during the course of a test. + retentionPeriod = maxBlockSize * 5 + bufferPast = input.bufferPast + bufferFuture = input.bufferFuture + ns1ROpts = retention.NewOptions(). + SetRetentionPeriod(retentionPeriod). + SetBlockSize(ns1BlockSize). + SetBufferPast(bufferPast). + SetBufferFuture(bufferFuture) + nsID = testNamespaces[0] + numPoints = input.numPoints + ) + + if bufferPast > ns1BlockSize { + bufferPast = ns1BlockSize - 1 + ns1ROpts = ns1ROpts.SetBufferPast(bufferPast) + } + if bufferFuture > ns1BlockSize { + bufferFuture = ns1BlockSize - 1 + ns1ROpts = ns1ROpts.SetBufferFuture(bufferFuture) + } + + if err := ns1ROpts.Validate(); err != nil { + return false, err + } + + ns1Opts := namespace.NewOptions(). + SetRetentionOptions(ns1ROpts). + SetSnapshotEnabled(true) + ns1, err := namespace.NewMetadata(nsID, ns1Opts) + if err != nil { + return false, err + } + opts := newTestOptions(t). + SetCommitLogRetentionPeriod(retentionPeriod). + SetCommitLogBlockSize(commitLogBlockSize). + SetNamespaces([]namespace.Metadata{ns1}) + + // Test setup + setup := newTestSetupWithCommitLogAndFilesystemBootstrapper(t, opts) + defer setup.close() + + log := setup.storageOpts.InstrumentOptions().Logger() + log.Infof("blockSize: %s\n", ns1ROpts.BlockSize().String()) + log.Infof("bufferPast: %s\n", ns1ROpts.BufferPast().String()) + log.Infof("bufferFuture: %s\n", ns1ROpts.BufferFuture().String()) + + setup.setNowFn(fakeStart) + + var ( + ids = &idGen{longTestID} + datapoints = generateDatapoints(fakeStart, numPoints, ids) + // Used to keep track of which datapoints have been written already. + lastDatapointsIdx = 0 + earliestToCheck = datapoints[0].time.Truncate(ns1BlockSize) + latestToCheck = datapoints[len(datapoints)-1].time.Add(ns1BlockSize) + timesToRestart = []time.Time{} + start = earliestToCheck + filePathPrefix = setup.storageOpts.CommitLogOptions().FilesystemOptions().FilePathPrefix() + ) + + // Generate randomly selected times during which the node will restart + // and bootstrap before continuing to write data. + for { + if start.After(latestToCheck) || start.Equal(latestToCheck) { + break + } + + timesToRestart = append(timesToRestart, start) + start = start.Add(time.Duration(rng.Intn(int(maxBlockSize)))) + } + timesToRestart = append(timesToRestart, latestToCheck) + + for _, timeToCheck := range timesToRestart { + startServerWithNewInspection(t, opts, setup) + ctx := context.NewContext() + defer ctx.Close() + + log.Infof("writing datapoints") + var i int + for i = lastDatapointsIdx; i < len(datapoints); i++ { + var ( + dp = datapoints[i] + ts = dp.time + ) + if !ts.Before(timeToCheck) { + break + } + + setup.setNowFn(ts) + + err := setup.db.Write(ctx, nsID, dp.series, ts, dp.value, xtime.Second, nil) + if err != nil { + return false, err + } + } + lastDatapointsIdx = i + log.Infof("wrote datapoints") + + expectedSeriesMap := datapoints[:lastDatapointsIdx].toSeriesMap(ns1BlockSize) + log.Infof("verifying data in database equals expected data") + if !verifySeriesMaps(t, setup, nsID, expectedSeriesMap) { + // verifySeriesMaps will make sure the actual failure is included + // in the go test output, but it uses assert() under the hood so + // there is not a clean way to return the explicit error to gopter + // as well. + return false, nil + } + log.Infof("verified data in database equals expected data") + if input.waitForFlushFiles { + log.Infof("Waiting for data files to be flushed") + now := setup.getNowFn() + latestFlushTime := now.Truncate(ns1BlockSize).Add(-ns1BlockSize) + expectedFlushedData := datapoints.before(latestFlushTime.Add(-bufferPast)).toSeriesMap(ns1BlockSize) + err := waitUntilDataFilesFlushed( + filePathPrefix, setup.shardSet, nsID, expectedFlushedData, maxFlushWaitTime) + if err != nil { + return false, fmt.Errorf("error waiting for data files to flush: %s", err) + } + } + + if input.waitForSnapshotFiles { + log.Infof("Waiting for snapshot files to be written") + now := setup.getNowFn() + snapshotBlock := now.Add(-bufferPast).Truncate(ns1BlockSize) + err := waitUntilSnapshotFilesFlushed( + filePathPrefix, + setup.shardSet, + nsID, + []time.Time{snapshotBlock}, maxFlushWaitTime) + if err != nil { + return false, fmt.Errorf("error waiting for snapshot files: %s", err.Error()) + } + } + + require.NoError(t, setup.stopServer()) + // Create a new test setup because databases do not have a completely + // clean shutdown, so they can end up in a bad state where the persist + // manager is not idle and thus no more flushes can be done, even if + // there are no other in-progress flushes. + oldNow := setup.getNowFn() + setup = newTestSetupWithCommitLogAndFilesystemBootstrapper( + // FilePathPrefix is randomly generated if not provided, so we need + // to make sure all our test setups have the same prefix so that + // they can find each others files. + t, opts.SetFilePathPrefix(filePathPrefix)) + // Make sure the new setup has the same system time as the previous one. + setup.setNowFn(oldNow) + } + + if lastDatapointsIdx != len(datapoints) { + return false, fmt.Errorf( + "expected lastDatapointsIdx to be: %d but was: %d", len(datapoints), lastDatapointsIdx) + } + + return true, nil + }, genPropTestInputs(fakeStart), + )) + + if !props.Run(reporter) { + t.Errorf( + "failed with initial seed: %d and startTime: %d", + seed, fakeStart.UnixNano()) + } +} + +func genPropTestInputs(blockStart time.Time) gopter.Gen { + return gopter.CombineGens( + gen.Int64Range(1, int64(maxBlockSize/2)*2), + gen.Int64Range(1, int64(maxBlockSize/2)*2), + gen.Int64Range(1, int64(maxBlockSize/2)*2), + gen.IntRange(0, maxPoints), + gen.Bool(), + gen.Bool(), + ).Map(func(val interface{}) propTestInput { + inputs := val.([]interface{}) + return propTestInput{ + blockSize: time.Duration(inputs[0].(int64)), + bufferPast: time.Duration(inputs[1].(int64)), + bufferFuture: time.Duration(inputs[2].(int64)), + numPoints: inputs[3].(int), + waitForFlushFiles: inputs[4].(bool), + waitForSnapshotFiles: inputs[5].(bool), + } + }) +} + +type propTestInput struct { + blockSize time.Duration + bufferPast time.Duration + bufferFuture time.Duration + numPoints int + waitForFlushFiles bool + waitForSnapshotFiles bool +} diff --git a/src/dbnode/integration/integration_data_verify.go b/src/dbnode/integration/integration_data_verify.go index 9cd25ccaaf..74c2915db9 100644 --- a/src/dbnode/integration/integration_data_verify.go +++ b/src/dbnode/integration/integration_data_verify.go @@ -38,6 +38,7 @@ import ( xlog "github.com/m3db/m3x/log" xtime "github.com/m3db/m3x/time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -73,7 +74,7 @@ func verifySeriesMapForRange( input generate.SeriesBlock, expectedDebugFilePath string, actualDebugFilePath string, -) { +) bool { // Construct a copy of the input that we will use to compare // with only the fields we need to compare against (fetch doesn't // return the tags for a series ID) @@ -91,7 +92,9 @@ func verifySeriesMapForRange( req.ResultTimeType = rpc.TimeType_UNIX_SECONDS fetched, err := ts.fetch(req) - require.NoError(t, err) + if !assert.NoError(t, err) { + return false + } expected[i] = generate.Series{ ID: input[i].ID, Data: input[i].Data, @@ -106,13 +109,24 @@ func verifySeriesMapForRange( } if len(expectedDebugFilePath) > 0 { - writeVerifyDebugOutput(t, expectedDebugFilePath, start, end, expected) + if !writeVerifyDebugOutput(t, expectedDebugFilePath, start, end, expected) { + return false + } } if len(actualDebugFilePath) > 0 { - writeVerifyDebugOutput(t, actualDebugFilePath, start, end, actual) + if !writeVerifyDebugOutput(t, actualDebugFilePath, start, end, actual) { + return false + } } - require.Equal(t, expected, actual) + for i, series := range actual { + if !assert.Equal(t, expected[i], series) { + return false + } + } + if !assert.Equal(t, expected, actual) { + return false + } // Now check the metadata of all the series match ctx := context.NewContext() @@ -133,7 +147,7 @@ func verifySeriesMapForRange( results, nextPageToken, err := ts.db.FetchBlocksMetadataV2(ctx, namespace, shard, start, end, 4096, pageToken, opts) - require.NoError(t, err) + assert.NoError(t, err) // Use the next one for the next iteration pageToken = nextPageToken @@ -141,7 +155,9 @@ func verifySeriesMapForRange( for _, actual := range results.Results() { id := actual.ID.String() expected, ok := expectedMetadata[id] - require.True(t, ok, fmt.Sprintf("unexpected ID: %s", id)) + if !assert.True(t, ok, fmt.Sprintf("unexpected ID: %s", id)) { + return false + } expectedTagsIter := ident.NewTagsIterator(expected.Tags) actualTagsIter := actual.Tags.Duplicate() @@ -176,15 +192,22 @@ func verifySeriesMapForRange( ).Error("series does not match expected tags") } - require.True(t, tagMatcher.Matches(actualTagsIter)) + if !assert.True(t, tagMatcher.Matches(actualTagsIter)) { + return false + } } } } + + return true } -func writeVerifyDebugOutput(t *testing.T, filePath string, start, end time.Time, series generate.SeriesBlock) { +func writeVerifyDebugOutput( + t *testing.T, filePath string, start, end time.Time, series generate.SeriesBlock) bool { w, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, os.ModeAppend) - require.NoError(t, err) + if !assert.NoError(t, err) { + return false + } list := make(readableSeriesList, 0, len(series)) for i := range series { @@ -211,11 +234,15 @@ func writeVerifyDebugOutput(t *testing.T, filePath string, start, end time.Time, End: end, Series: list, }, "", " ") - require.NoError(t, err) + if !assert.NoError(t, err) { + return false + } _, err = w.Write(data) - require.NoError(t, err) - require.NoError(t, w.Close()) + if !assert.NoError(t, err) { + return false + } + return assert.NoError(t, w.Close()) } func verifySeriesMaps( @@ -223,33 +250,50 @@ func verifySeriesMaps( ts *testSetup, namespace ident.ID, seriesMaps map[xtime.UnixNano]generate.SeriesBlock, -) { +) bool { debugFilePathPrefix := ts.opts.VerifySeriesDebugFilePathPrefix() - expectedDebugFilePath := createFileIfPrefixSet(t, debugFilePathPrefix, fmt.Sprintf("%s-expected.log", namespace.String())) - actualDebugFilePath := createFileIfPrefixSet(t, debugFilePathPrefix, fmt.Sprintf("%s-actual.log", namespace.String())) + expectedDebugFilePath, ok := createFileIfPrefixSet(t, debugFilePathPrefix, fmt.Sprintf("%s-expected.log", namespace.String())) + if !ok { + return false + } + actualDebugFilePath, ok := createFileIfPrefixSet(t, debugFilePathPrefix, fmt.Sprintf("%s-actual.log", namespace.String())) + if !ok { + return false + } nsMetadata, ok := ts.db.Namespace(namespace) - require.True(t, ok) + if !assert.True(t, ok) { + return false + } nsOpts := nsMetadata.Options() for timestamp, sm := range seriesMaps { start := timestamp.ToTime() end := start.Add(nsOpts.RetentionOptions().BlockSize()) - verifySeriesMapForRange( + matches := verifySeriesMapForRange( t, ts, start, end, namespace, sm, expectedDebugFilePath, actualDebugFilePath) + if !matches { + return false + } } + + return true } -func createFileIfPrefixSet(t *testing.T, prefix, suffix string) string { +func createFileIfPrefixSet(t *testing.T, prefix, suffix string) (string, bool) { if len(prefix) == 0 { - return "" + return "", true } filePath := prefix + "_" + suffix w, err := os.Create(filePath) - require.NoError(t, err) - require.NoError(t, w.Close()) - return filePath + if !assert.NoError(t, err) { + return "", false + } + if !assert.NoError(t, w.Close()) { + return "", false + } + return filePath, true } func compareSeriesList( diff --git a/src/dbnode/integration/options.go b/src/dbnode/integration/options.go index 40ad1113bb..308a9aba92 100644 --- a/src/dbnode/integration/options.go +++ b/src/dbnode/integration/options.go @@ -260,6 +260,12 @@ type testOptions interface { // WriteNewSeriesAsync returns whether we insert/index asynchronously. WriteNewSeriesAsync() bool + + // SetFilePathPrefix sets the file path prefix. + SetFilePathPrefix(value string) testOptions + + // FilePathPrefix returns the file path prefix. + FilePathPrefix() string } type options struct { @@ -274,6 +280,7 @@ type options struct { httpNodeAddr string tchannelNodeAddr string httpDebugAddr string + filePathPrefix string serverStateChangeTimeout time.Duration clusterConnectionTimeout time.Duration readRequestTimeout time.Duration @@ -597,3 +604,13 @@ func (o *options) SetWriteNewSeriesAsync(value bool) testOptions { func (o *options) WriteNewSeriesAsync() bool { return o.writeNewSeriesAsync } + +func (o *options) SetFilePathPrefix(value string) testOptions { + opts := *o + opts.filePathPrefix = value + return &opts +} + +func (o *options) FilePathPrefix() string { + return o.filePathPrefix +} diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index f78e8cccc6..02906c3707 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -233,9 +233,13 @@ func newTestSetup(t *testing.T, opts testOptions, fsOpts fs.Options) (*testSetup // Set up file path prefix idx := atomic.AddUint64(&created, 1) - 1 - filePathPrefix, err := ioutil.TempDir("", fmt.Sprintf("integration-test-%d", idx)) - if err != nil { - return nil, err + filePathPrefix := opts.FilePathPrefix() + if filePathPrefix == "" { + var err error + filePathPrefix, err = ioutil.TempDir("", fmt.Sprintf("integration-test-%d", idx)) + if err != nil { + return nil, err + } } if fsOpts == nil { diff --git a/src/dbnode/persist/fs/commitlog/info.go b/src/dbnode/persist/fs/commitlog/files.go similarity index 67% rename from src/dbnode/persist/fs/commitlog/info.go rename to src/dbnode/persist/fs/commitlog/files.go index 9973e13a72..02d15e8a67 100644 --- a/src/dbnode/persist/fs/commitlog/info.go +++ b/src/dbnode/persist/fs/commitlog/files.go @@ -23,11 +23,21 @@ package commitlog import ( "encoding/binary" "os" + "sort" "time" + "github.com/m3db/m3db/src/dbnode/persist/fs" "github.com/m3db/m3db/src/dbnode/persist/fs/msgpack" ) +// File represents a commit log file and its associated metadata. +type File struct { + FilePath string + Start time.Time + Duration time.Duration + Index int64 +} + // ReadLogInfo reads the commit log info out of a commitlog file func ReadLogInfo(filePath string, opts Options) (time.Time, time.Duration, int64, error) { var fd *os.File @@ -67,3 +77,35 @@ func ReadLogInfo(filePath string, opts Options) (time.Time, time.Duration, int64 return time.Unix(0, logInfo.Start), time.Duration(logInfo.Duration), logInfo.Index, decoderErr } + +// Files returns a slice of all available commit log files on disk along with +// their associated metadata. +func Files(opts Options) ([]File, error) { + commitLogsDir := fs.CommitLogsDirPath( + opts.FilesystemOptions().FilePathPrefix()) + filePaths, err := fs.SortedCommitLogFiles(commitLogsDir) + if err != nil { + return nil, err + } + + commitLogFiles := make([]File, 0, len(filePaths)) + for _, filePath := range filePaths { + start, duration, index, err := ReadLogInfo(filePath, opts) + if err != nil { + return nil, err + } + + commitLogFiles = append(commitLogFiles, File{ + FilePath: filePath, + Start: start, + Duration: duration, + Index: index, + }) + } + + sort.Slice(commitLogFiles, func(i, j int) bool { + return commitLogFiles[i].Start.Before(commitLogFiles[j].Start) + }) + + return commitLogFiles, nil +} diff --git a/src/dbnode/persist/fs/commitlog/files_test.go b/src/dbnode/persist/fs/commitlog/files_test.go new file mode 100644 index 0000000000..8a37738b25 --- /dev/null +++ b/src/dbnode/persist/fs/commitlog/files_test.go @@ -0,0 +1,127 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package commitlog + +import ( + "io/ioutil" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/m3db/m3db/src/dbnode/persist/fs" + "github.com/m3db/m3db/src/dbnode/ts" + "github.com/m3db/m3x/context" + "github.com/m3db/m3x/ident" + xtime "github.com/m3db/m3x/time" + + "github.com/stretchr/testify/require" +) + +func TestFiles(t *testing.T) { + dir, err := ioutil.TempDir("", "commitlogs") + require.NoError(t, err) + defer os.RemoveAll(dir) + + createTestCommitLogFiles(t, dir, 10*time.Minute, 5) + + opts := NewOptions() + opts = opts.SetFilesystemOptions( + opts.FilesystemOptions(). + SetFilePathPrefix(dir), + ) + files, err := Files(opts) + require.NoError(t, err) + require.Equal(t, 5, len(files)) + + // Make sure its sorted + var lastFileStart time.Time + for _, file := range files { + require.Equal(t, 10*time.Minute, file.Duration) + require.Equal(t, int64(0), file.Index) + require.True(t, strings.Contains(file.FilePath, dir)) + if lastFileStart.IsZero() { + lastFileStart = file.Start + continue + } + + require.True(t, file.Start.After(lastFileStart)) + } +} + +// createTestCommitLogFiles creates the specified number of commit log files +// on disk with the appropriate block size. Commit log files will be valid +// and contain readable metadata. +func createTestCommitLogFiles( + t *testing.T, filePathPrefix string, blockSize time.Duration, numBlocks int) { + require.True(t, numBlocks >= 2) + + var ( + nowLock = sync.RWMutex{} + now = time.Now().Truncate(blockSize) + nowFn = func() time.Time { + nowLock.RLock() + n := now + nowLock.RUnlock() + return n + } + setNowFn = func(t time.Time) { + nowLock.Lock() + now = t + nowLock.Unlock() + } + opts = NewOptions(). + SetBlockSize(blockSize). + SetClockOptions(NewOptions().ClockOptions().SetNowFn(nowFn)). + SetFilesystemOptions(fs.NewOptions().SetFilePathPrefix(filePathPrefix)) + commitLogsDir = fs.CommitLogsDirPath(filePathPrefix) + ) + + commitLog, err := NewCommitLog(opts) + require.NoError(t, err) + require.NoError(t, commitLog.Open()) + series := Series{ + UniqueIndex: 0, + Namespace: ident.StringID("some-namespace"), + ID: ident.StringID("some-id"), + } + // Commit log writer is asynchronous and performs batching so getting the exact number + // of files that we want is tricky. The implementation below loops infinitely, writing + // a single datapoint and increasing the time after each iteration until numBlocks -1 + // files are on disk. After that, it terminates, and the final batch flush from calling + // commitlog.Close() will generate the last file. + for { + files, err := fs.SortedCommitLogFiles(commitLogsDir) + require.NoError(t, err) + if len(files) == numBlocks-1 { + break + } + err = commitLog.Write(context.NewContext(), series, ts.Datapoint{}, xtime.Second, nil) + require.NoError(t, err) + setNowFn(nowFn().Add(blockSize)) + } + + require.NoError(t, commitLog.Close()) + files, err := fs.SortedCommitLogFiles(commitLogsDir) + require.NoError(t, err) + require.Equal(t, numBlocks, len(files)) +} diff --git a/src/dbnode/persist/fs/commitlog/iterator.go b/src/dbnode/persist/fs/commitlog/iterator.go index 0807f5ec99..d6ac370d2f 100644 --- a/src/dbnode/persist/fs/commitlog/iterator.go +++ b/src/dbnode/persist/fs/commitlog/iterator.go @@ -25,9 +25,7 @@ import ( "io" "time" - "github.com/m3db/m3db/src/dbnode/persist/fs" "github.com/m3db/m3db/src/dbnode/ts" - xerrors "github.com/m3db/m3x/errors" xlog "github.com/m3db/m3x/log" xtime "github.com/m3db/m3x/time" @@ -49,7 +47,7 @@ type iterator struct { scope tally.Scope metrics iteratorMetrics log xlog.Logger - files []string + files []File reader commitLogReader read iteratorRead err error @@ -77,15 +75,11 @@ func NewIterator(iterOpts IteratorOpts) (Iterator, error) { iops := opts.InstrumentOptions() iops = iops.SetMetricsScope(iops.MetricsScope().SubScope("iterator")) - path := fs.CommitLogsDirPath(opts.FilesystemOptions().FilePathPrefix()) - files, err := fs.SortedCommitLogFiles(path) - if err != nil { - return nil, err - } - filteredFiles, err := filterFiles(opts, files, iterOpts.FileFilterPredicate) + files, err := Files(opts) if err != nil { return nil, err } + filteredFiles := filterFiles(opts, files, iterOpts.FileFilterPredicate) scope := iops.MetricsScope() return &iterator{ @@ -173,14 +167,9 @@ func (i *iterator) nextReader() bool { file := i.files[0] i.files = i.files[1:] - t, idx, err := fs.TimeAndIndexFromCommitlogFilename(file) - if err != nil { - i.err = err - return false - } - + t, idx := file.Start, file.Index reader := newCommitLogReader(i.opts, i.seriesPred) - start, duration, index, err := reader.Open(file) + start, duration, index, err := reader.Open(file.FilePath) if err != nil { i.err = err return false @@ -202,20 +191,14 @@ func (i *iterator) nextReader() bool { return true } -func filterFiles(opts Options, files []string, predicate FileFilterPredicate) ([]string, error) { - filteredFiles := make([]string, 0, len(files)) - var multiErr xerrors.MultiError +func filterFiles(opts Options, files []File, predicate FileFilterPredicate) []File { + filteredFiles := make([]File, 0, len(files)) for _, file := range files { - start, duration, _, err := ReadLogInfo(file, opts) - if err != nil { - multiErr = multiErr.Add(err) - continue - } - if predicate(file, start, duration) { + if predicate(file.FilePath, file.Start, file.Duration) { filteredFiles = append(filteredFiles, file) } } - return filteredFiles, multiErr.FinalError() + return filteredFiles } func (i *iterator) closeAndResetReader() error { diff --git a/src/dbnode/persist/fs/commitlog/reader.go b/src/dbnode/persist/fs/commitlog/reader.go index c602a069a2..8c2b59f3d0 100644 --- a/src/dbnode/persist/fs/commitlog/reader.go +++ b/src/dbnode/persist/fs/commitlog/reader.go @@ -66,7 +66,7 @@ type seriesMetadata struct { type commitLogReader interface { // Open opens the commit log for reading - Open(filePath string) (time.Time, time.Duration, int, error) + Open(filePath string) (time.Time, time.Duration, int64, error) // Read returns the next id and data pair or error, will return io.EOF at end of volume Read() (Series, ts.Datapoint, xtime.Unit, ts.Annotation, error) @@ -156,7 +156,7 @@ func newCommitLogReader(opts Options, seriesPredicate SeriesFilterPredicate) com return reader } -func (r *reader) Open(filePath string) (time.Time, time.Duration, int, error) { +func (r *reader) Open(filePath string) (time.Time, time.Duration, int64, error) { // Commitlog reader does not currently support being reused if r.hasBeenOpened { return timeZero, 0, 0, errCommitLogReaderIsNotReusable @@ -176,7 +176,7 @@ func (r *reader) Open(filePath string) (time.Time, time.Duration, int, error) { } start := time.Unix(0, info.Start) duration := time.Duration(info.Duration) - index := int(info.Index) + index := info.Index return start, duration, index, nil } diff --git a/src/dbnode/persist/fs/files.go b/src/dbnode/persist/fs/files.go index e1ecb9daf4..49c22a5084 100644 --- a/src/dbnode/persist/fs/files.go +++ b/src/dbnode/persist/fs/files.go @@ -776,21 +776,6 @@ func SortedCommitLogFiles(commitLogsDir string) ([]string, error) { return sortedCommitlogFiles(commitLogsDir, commitLogFilePattern) } -// CommitLogFilesForTime returns all the commit log files for a given time. -func CommitLogFilesForTime(commitLogsDir string, t time.Time) ([]string, error) { - commitLogFileForTimePattern := fmt.Sprintf(commitLogFileForTimeTemplate, t.UnixNano()) - return sortedCommitlogFiles(commitLogsDir, commitLogFileForTimePattern) -} - -// SortedCommitLogFilesBefore returns all the commit log files whose timestamps are earlier than a given time. -func SortedCommitLogFilesBefore(commitLogsDir string, t time.Time) ([]string, error) { - commitLogs, err := SortedCommitLogFiles(commitLogsDir) - if err != nil { - return nil, err - } - return FilesBefore(commitLogs, t) -} - type toSortableFn func(files []string) sort.Interface func findFiles(fileDir string, pattern string, fn toSortableFn) ([]string, error) { diff --git a/src/dbnode/persist/fs/files_test.go b/src/dbnode/persist/fs/files_test.go index badb787cd6..19d9d53107 100644 --- a/src/dbnode/persist/fs/files_test.go +++ b/src/dbnode/persist/fs/files_test.go @@ -595,42 +595,6 @@ func TestSnapshotFileSetExistsAt(t *testing.T) { require.True(t, exists) } -func TestSortedCommitLogFilesBefore(t *testing.T) { - iter := 20 - perSlot := 3 - dir := createCommitLogFiles(t, iter, perSlot) - defer os.RemoveAll(dir) - - cutoffIter := 8 - cutoff := time.Unix(0, int64(cutoffIter)) - commitLogsDir := CommitLogsDirPath(dir) - files, err := SortedCommitLogFilesBefore(commitLogsDir, cutoff) - require.NoError(t, err) - require.Equal(t, cutoffIter*perSlot, len(files)) - for i := 0; i < cutoffIter; i++ { - for j := 0; j < perSlot; j++ { - validateCommitLogFiles(t, i, j, perSlot, i, dir, files) - } - } -} - -func TestCommitLogFilesForTime(t *testing.T) { - iter := 20 - perSlot := 3 - dir := createCommitLogFiles(t, iter, perSlot) - defer os.RemoveAll(dir) - - cutoffIter := 8 - cutoff := time.Unix(0, int64(cutoffIter)) - commitLogsDir := CommitLogsDirPath(dir) - files, err := CommitLogFilesForTime(commitLogsDir, cutoff) - require.NoError(t, err) - - for j := 0; j < perSlot; j++ { - validateCommitLogFiles(t, cutoffIter, j, perSlot, 0, dir, files) - } -} - func TestSortedCommitLogFiles(t *testing.T) { iter := 20 perSlot := 3 diff --git a/src/dbnode/persist/fs/fs.go b/src/dbnode/persist/fs/fs.go index 412826b36b..7ee1d2be3d 100644 --- a/src/dbnode/persist/fs/fs.go +++ b/src/dbnode/persist/fs/fs.go @@ -37,9 +37,8 @@ const ( anyNumbersPattern = "[0-9]*" anyLowerCaseCharsNumbersPattern = "[a-z0-9]*" - separator = "-" - infoFilePattern = filesetFilePrefix + separator + anyNumbersPattern + separator + infoFileSuffix + fileSuffix - filesetFilePattern = filesetFilePrefix + separator + anyNumbersPattern + separator + anyLowerCaseCharsPattern + fileSuffix - commitLogFilePattern = commitLogFilePrefix + separator + anyNumbersPattern + separator + anyNumbersPattern + fileSuffix - commitLogFileForTimeTemplate = commitLogFilePrefix + separator + "%d" + separator + anyNumbersPattern + fileSuffix + separator = "-" + infoFilePattern = filesetFilePrefix + separator + anyNumbersPattern + separator + infoFileSuffix + fileSuffix + filesetFilePattern = filesetFilePrefix + separator + anyNumbersPattern + separator + anyLowerCaseCharsPattern + fileSuffix + commitLogFilePattern = commitLogFilePrefix + separator + anyNumbersPattern + separator + anyNumbersPattern + fileSuffix ) diff --git a/src/dbnode/persist/fs/read_write_test.go b/src/dbnode/persist/fs/read_write_test.go index edc19dcca3..a8bc491c9c 100644 --- a/src/dbnode/persist/fs/read_write_test.go +++ b/src/dbnode/persist/fs/read_write_test.go @@ -358,7 +358,8 @@ func TestReusingReaderWriter(t *testing.T) { } w := newTestWriter(t, filePathPrefix) for i := range allEntries { - writeTestData(t, w, 0, testWriterStart.Add(time.Duration(i)*time.Hour), allEntries[i], persist.FileSetFlushType) + writeTestData( + t, w, 0, testWriterStart.Add(time.Duration(i)*time.Hour), allEntries[i], persist.FileSetFlushType) } r := newTestReader(t, filePathPrefix) diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index 128c2309f7..21be96d41a 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3db/src/dbnode/clock" "github.com/m3db/m3db/src/dbnode/persist/fs" + "github.com/m3db/m3db/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3db/src/dbnode/retention" xerrors "github.com/m3db/m3x/errors" "github.com/m3db/m3x/ident" @@ -34,9 +35,7 @@ import ( "github.com/uber-go/tally" ) -type commitLogFilesBeforeFn func(commitLogsDir string, t time.Time) ([]string, error) - -type commitLogFilesForTimeFn func(commitLogsDir string, t time.Time) ([]string, error) +type commitLogFilesFn func(commitlog.Options) ([]commitlog.File, error) type deleteFilesFn func(files []string) error @@ -50,8 +49,7 @@ type cleanupManager struct { nowFn clock.NowFn filePathPrefix string commitLogsDir string - commitLogFilesBeforeFn commitLogFilesBeforeFn - commitLogFilesForTimeFn commitLogFilesForTimeFn + commitLogFilesFn commitLogFilesFn deleteFilesFn deleteFilesFn deleteInactiveDirectoriesFn deleteInactiveDirectoriesFn cleanupInProgress bool @@ -69,8 +67,7 @@ func newCleanupManager(database database, scope tally.Scope) databaseCleanupMana nowFn: opts.ClockOptions().NowFn(), filePathPrefix: filePathPrefix, commitLogsDir: commitLogsDir, - commitLogFilesBeforeFn: fs.SortedCommitLogFilesBefore, - commitLogFilesForTimeFn: fs.CommitLogFilesForTime, + commitLogFilesFn: commitlog.Files, deleteFilesFn: fs.DeleteFiles, deleteInactiveDirectoriesFn: fs.DeleteInactiveDirectories, status: scope.Gauge("cleanup"), @@ -119,17 +116,17 @@ func (m *cleanupManager) Cleanup(t time.Time) error { "encountered errors when deleting inactive namespace files for %v: %v", t, err)) } - commitLogStart, commitLogTimes, err := m.commitLogTimes(t) + filesToCleanup, err := m.commitLogTimes(t) if err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when cleaning up commit logs: %v", err)) return multiErr.FinalError() } - if err := m.cleanupCommitLogs(commitLogStart, commitLogTimes); err != nil { + if err := m.cleanupCommitLogs(filesToCleanup); err != nil { multiErr = multiErr.Add(fmt.Errorf( - "encountered errors when cleaning up commit logs for commitLogStart %v commitLogTimes %v: %v", - commitLogStart, commitLogTimes, err)) + "encountered errors when cleaning up commit logs for commitLogFiles %v: %v", + filesToCleanup, err)) } return multiErr.FinalError() @@ -286,10 +283,9 @@ func (m *cleanupManager) commitLogTimeRange(t time.Time) (time.Time, time.Time) // commitLogTimes returns the earliest time before which the commit logs are expired, // as well as a list of times we need to clean up commit log files for. -func (m *cleanupManager) commitLogTimes(t time.Time) (time.Time, []time.Time, error) { +func (m *cleanupManager) commitLogTimes(t time.Time) ([]commitlog.File, error) { var ( - blockSize = m.opts.CommitLogOptions().BlockSize() - earliest, latest = m.commitLogTimeRange(t) + earliest, _ = m.commitLogTimeRange(t) ) // NB(prateek): this logic of polling the namespaces across the commit log's entire // retention history could get expensive if commit logs are retained for long periods. @@ -301,26 +297,75 @@ func (m *cleanupManager) commitLogTimes(t time.Time) (time.Time, []time.Time, er // are only retained for a period of 1-2 days (at most), after we which we'd live we with the // data loss. - candidateTimes := timesInRange(earliest, latest, blockSize) + files, err := m.commitLogFilesFn(m.opts.CommitLogOptions()) + if err != nil { + return nil, err + } namespaces, err := m.database.GetOwnedNamespaces() if err != nil { - return time.Time{}, nil, err + return nil, err } - cleanupTimes := filterTimes(candidateTimes, func(t time.Time) bool { + + var outerErr error + filesToCleanup := filterCommitLogFiles(files, func(start time.Time, duration time.Duration) bool { + if outerErr != nil { + return false + } + + if start.Before(earliest) { + // Safe to clean up expired files. + // TODO(rartoul): Now that we have commit log compaction via snapshot files we would like + // to remove the concept of commit log retention so that users cannot accidentally + // configure M3DB in such a way that it loses data (commit log retention < block size). + return true + } + for _, ns := range namespaces { - ropts := ns.Options().RetentionOptions() - start, end := commitLogNamespaceBlockTimes(t, blockSize, ropts) - if ns.NeedsFlush(start, end) { + var ( + ropts = ns.Options().RetentionOptions() + nsBlocksStart, nsBlocksEnd = commitLogNamespaceBlockTimes(start, duration, ropts) + needsFlush = ns.NeedsFlush(nsBlocksStart, nsBlocksEnd) + ) + + if !needsFlush { + // Data has been flushed to disk so the commit log file is + // safe to clean up. + continue + } + + // Add commit log blockSize to the startTime because that is the latest + // system time that the commit log file could contain data for. Note that + // this is different than the latest datapoint timestamp that the commit + // log file could contain data for (because of bufferPast/bufferFuture), + // but the commit log files and snapshot files both deal with system time. + isCapturedBySnapshot, err := ns.IsCapturedBySnapshot( + nsBlocksStart, nsBlocksEnd, start.Add(duration)) + if err != nil { + outerErr = err return false } + + if !isCapturedBySnapshot { + // The data has not been flushed and has also not been captured by + // a snapshot, so it is not safe to clean up the commit log file. + return false + } + + // All the data in the commit log file is captured by the snapshot files + // so its safe to clean up. } + return true }) - return earliest, cleanupTimes, nil + if outerErr != nil { + return nil, outerErr + } + + return filesToCleanup, nil } -// commitLogNamespaceBlockTimes returns the range of namespace block starts which for which the +// commitLogNamespaceBlockTimes returns the range of namespace block starts for which the // given commit log block may contain data for. // // consider the situation where we have a single namespace, and a commit log with the following @@ -350,24 +395,10 @@ func commitLogNamespaceBlockTimes( return earliest, latest } -func (m *cleanupManager) cleanupCommitLogs(earliestToRetain time.Time, cleanupTimes []time.Time) error { - multiErr := xerrors.NewMultiError() - toCleanup, err := m.commitLogFilesBeforeFn(m.commitLogsDir, earliestToRetain) - if err != nil { - multiErr = multiErr.Add(err) - } - - for _, t := range cleanupTimes { - files, err := m.commitLogFilesForTimeFn(m.commitLogsDir, t) - if err != nil { - multiErr = multiErr.Add(err) - } - toCleanup = append(toCleanup, files...) +func (m *cleanupManager) cleanupCommitLogs(filesToCleanup []commitlog.File) error { + filesToDelete := make([]string, 0, len(filesToCleanup)) + for _, f := range filesToCleanup { + filesToDelete = append(filesToDelete, f.FilePath) } - - if err := m.deleteFilesFn(toCleanup); err != nil { - multiErr = multiErr.Add(err) - } - - return multiErr.FinalError() + return m.deleteFilesFn(filesToDelete) } diff --git a/src/dbnode/storage/cleanup_prop_test.go b/src/dbnode/storage/cleanup_prop_test.go index 2a6c663706..e7a5e83850 100644 --- a/src/dbnode/storage/cleanup_prop_test.go +++ b/src/dbnode/storage/cleanup_prop_test.go @@ -35,6 +35,7 @@ import ( "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" + "github.com/stretchr/testify/require" "github.com/uber-go/tally" ) @@ -73,17 +74,21 @@ func TestPropertyCommitLogNotCleanedForUnflushedData(t *testing.T) { timeWindow := time.Hour * 24 * 15 properties.Property("Commit log is retained if one namespace needs to flush", prop.ForAll( - func(t time.Time, cRopts retention.Options, ns *generatedNamespace) (bool, error) { + func(cleanupTime time.Time, cRopts retention.Options, ns *generatedNamespace) (bool, error) { cm := newPropTestCleanupMgr(ctrl, cRopts, ns) - _, cleanupTimes, err := cm.commitLogTimes(t) + filesToCleanup, err := cm.commitLogTimes(cleanupTime) if err != nil { return false, err } - for _, ct := range cleanupTimes { - s, e := commitLogNamespaceBlockTimes(ct, cRopts.BlockSize(), ns.ropts) - if ns.NeedsFlush(s, e) { + for _, f := range filesToCleanup { + s, e := commitLogNamespaceBlockTimes(f.Start, f.Duration, ns.ropts) + earliest, _ := cm.commitLogTimeRange(cleanupTime) + needsFlush := ns.NeedsFlush(s, e) + isCapturedBySnapshot, err := ns.IsCapturedBySnapshot(s, e, f.Start.Add(f.Duration)) + require.NoError(t, err) + if needsFlush && !isCapturedBySnapshot && !f.Start.Before(earliest) { return false, fmt.Errorf("trying to cleanup commit log at %v, but ns needsFlush; (range: %v, %v)", - ct.String(), s.String(), e.String()) + f.Start.String(), s.String(), e.String()) } } return true, nil @@ -105,19 +110,23 @@ func TestPropertyCommitLogNotCleanedForUnflushedDataMultipleNs(t *testing.T) { timeWindow := time.Hour * 24 * 15 properties.Property("Commit log is retained if any namespace needs to flush", prop.ForAll( - func(t time.Time, cRopts retention.Options, nses []*generatedNamespace) (bool, error) { + func(cleanupTime time.Time, cRopts retention.Options, nses []*generatedNamespace) (bool, error) { dbNses := generatedNamespaces(nses).asDatabaseNamespace() cm := newPropTestCleanupMgr(ctrl, cRopts, dbNses...) - _, cleanupTimes, err := cm.commitLogTimes(t) + filesToCleanup, err := cm.commitLogTimes(cleanupTime) if err != nil { return false, err } - for _, ct := range cleanupTimes { + for _, f := range filesToCleanup { for _, ns := range nses { - s, e := commitLogNamespaceBlockTimes(ct, cRopts.BlockSize(), ns.Options().RetentionOptions()) - if ns.NeedsFlush(s, e) { + s, e := commitLogNamespaceBlockTimes(f.Start, f.Duration, ns.ropts) + earliest, _ := cm.commitLogTimeRange(cleanupTime) + needsFlush := ns.NeedsFlush(s, e) + isCapturedBySnapshot, err := ns.IsCapturedBySnapshot(s, e, f.Start.Add(f.Duration)) + require.NoError(t, err) + if needsFlush && !isCapturedBySnapshot && !f.Start.Before(earliest) { return false, fmt.Errorf("trying to cleanup commit log at %v, but ns needsFlush; (range: %v, %v)", - ct.String(), s.String(), e.String()) + f.Start.String(), s.String(), e.String()) } } } @@ -145,12 +154,13 @@ func (n generatedNamespaces) asDatabaseNamespace() []databaseNamespace { type generatedNamespace struct { databaseNamespace - opts namespace.Options - ropts *generatedRetention - blockSize time.Duration - oldestBlock time.Time - newestBlock time.Time - needsFlushMarkers []bool + opts namespace.Options + ropts *generatedRetention + blockSize time.Duration + oldestBlock time.Time + newestBlock time.Time + needsFlushMarkers []bool + isCapturedBySnapshotMarkers []bool } func (ns *generatedNamespace) String() string { @@ -201,29 +211,50 @@ func (ns *generatedNamespace) NeedsFlush(start, end time.Time) bool { return false } +func (ns *generatedNamespace) IsCapturedBySnapshot(startInclusive, endInclusive, _ time.Time) (bool, error) { + if startInclusive.Before(ns.oldestBlock) && endInclusive.Before(ns.oldestBlock) { + return false, nil + } + if startInclusive.After(ns.newestBlock) && endInclusive.After(ns.newestBlock) { + return false, nil + } + + sIdx, eIdx := ns.blockIdx(startInclusive), ns.blockIdx(endInclusive) + for i := sIdx; i <= eIdx; i++ { + if ns.needsFlushMarkers[i] { + return true, nil + } + } + return false, nil +} + // generator for generatedNamespace func genNamespace(t time.Time) gopter.Gen { return func(genParams *gopter.GenParameters) *gopter.GenResult { - rng := genParams.Rng - ropts := newRandomRetention(rng) - oldest := retention.FlushTimeStart(ropts, t) - newest := retention.FlushTimeEnd(ropts, t) + var ( + rng = genParams.Rng + ropts = newRandomRetention(rng) + oldest = retention.FlushTimeStart(ropts, t) + newest = retention.FlushTimeEnd(ropts, t) + n = numIntervals(oldest, newest, ropts.BlockSize()) + flushStates = make([]bool, n) + snapshotStates = make([]bool, n) + nopts = namespace.NewOptions().SetRetentionOptions(ropts) + ) - n := numIntervals(oldest, newest, ropts.BlockSize()) - flushStates := make([]bool, n) for i := range flushStates { flushStates[i] = rng.Float32() > 0.6 // flip a coin to get a bool + snapshotStates[i] = rng.Float32() > 0.6 } - opts := namespace.NewOptions().SetRetentionOptions(ropts) - ns := &generatedNamespace{ - opts: opts, - ropts: ropts, - blockSize: ropts.BlockSize(), - oldestBlock: oldest, - newestBlock: newest, - needsFlushMarkers: flushStates, + opts: nopts, + ropts: ropts, + blockSize: ropts.BlockSize(), + oldestBlock: oldest, + newestBlock: newest, + needsFlushMarkers: flushStates, + isCapturedBySnapshotMarkers: snapshotStates, } genResult := gopter.NewGenResult(ns, gopter.NoShrinker) diff --git a/src/dbnode/storage/cleanup_test.go b/src/dbnode/storage/cleanup_test.go index 622c58d596..b9e771061d 100644 --- a/src/dbnode/storage/cleanup_test.go +++ b/src/dbnode/storage/cleanup_test.go @@ -23,11 +23,11 @@ package storage import ( "errors" "fmt" - "strconv" "strings" "testing" "time" + "github.com/m3db/m3db/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3db/src/dbnode/retention" "github.com/m3db/m3db/src/dbnode/storage/namespace" "github.com/m3db/m3x/ident" @@ -38,6 +38,15 @@ import ( "github.com/uber-go/tally" ) +var ( + currentTime = timeFor(50) + time10 = timeFor(10) + time20 = timeFor(20) + time30 = timeFor(30) + time40 = timeFor(40) + commitLogBlockSize = 10 * time.Second +) + func testCleanupManager(ctrl *gomock.Controller) (*Mockdatabase, *cleanupManager) { db := newMockdatabase(ctrl) return db, newCleanupManager(db, tally.NoopScope).(*cleanupManager) @@ -70,14 +79,10 @@ func TestCleanupManagerCleanup(t *testing.T) { SetRetentionPeriod(rOpts.RetentionPeriod()). SetBlockSize(rOpts.BlockSize())) - mgr.commitLogFilesBeforeFn = func(_ string, t time.Time) ([]string, error) { - return []string{"foo", "bar"}, errors.New("error1") - } - mgr.commitLogFilesForTimeFn = func(_ string, t time.Time) ([]string, error) { - if t.Equal(timeFor(14400)) { - return []string{"baz"}, nil - } - return nil, errors.New("error" + strconv.Itoa(int(t.Unix()))) + mgr.commitLogFilesFn = func(_ commitlog.Options) ([]commitlog.File, error) { + return []commitlog.File{ + commitlog.File{FilePath: "foo", Start: timeFor(14400)}, + }, nil } var deletedFiles []string mgr.deleteFilesFn = func(files []string) error { @@ -85,8 +90,8 @@ func TestCleanupManagerCleanup(t *testing.T) { return nil } - require.Error(t, mgr.Cleanup(ts)) - require.Equal(t, []string{"foo", "bar", "baz"}, deletedFiles) + require.NoError(t, mgr.Cleanup(ts)) + require.Equal(t, []string{"foo"}, deletedFiles) } func TestCleanupManagerNamespaceCleanup(t *testing.T) { @@ -147,12 +152,6 @@ func TestCleanupManagerDoesntNeedCleanup(t *testing.T) { SetRetentionPeriod(rOpts.RetentionPeriod()). SetBlockSize(rOpts.BlockSize())) - mgr.commitLogFilesBeforeFn = func(_ string, t time.Time) ([]string, error) { - return []string{"foo", "bar"}, nil - } - mgr.commitLogFilesForTimeFn = func(_ string, t time.Time) ([]string, error) { - return nil, nil - } var deletedFiles []string mgr.deleteFilesFn = func(files []string) error { deletedFiles = append(deletedFiles, files...) @@ -475,26 +474,61 @@ func newCleanupManagerCommitLogTimesTest(t *testing.T, ctrl *gomock.Controller) return ns, mgr } +func newCleanupManagerCommitLogTimesTestMultiNS( + t *testing.T, + ctrl *gomock.Controller, +) (*MockdatabaseNamespace, *MockdatabaseNamespace, *cleanupManager) { + var ( + rOpts = retention.NewOptions(). + SetRetentionPeriod(30 * time.Second). + SetBufferPast(0 * time.Second). + SetBufferFuture(0 * time.Second). + SetBlockSize(10 * time.Second) + ) + no := namespace.NewMockOptions(ctrl) + no.EXPECT().RetentionOptions().Return(rOpts).AnyTimes() + + ns1 := NewMockdatabaseNamespace(ctrl) + ns1.EXPECT().Options().Return(no).AnyTimes() + + ns2 := NewMockdatabaseNamespace(ctrl) + ns2.EXPECT().Options().Return(no).AnyTimes() + + db := newMockdatabase(ctrl, ns1, ns2) + mgr := newCleanupManager(db, tally.NoopScope).(*cleanupManager) + + mgr.opts = mgr.opts.SetCommitLogOptions( + mgr.opts.CommitLogOptions(). + SetRetentionPeriod(rOpts.RetentionPeriod()). + SetBlockSize(rOpts.BlockSize())) + return ns1, ns2, mgr +} + func TestCleanupManagerCommitLogTimesAllFlushed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() ns, mgr := newCleanupManagerCommitLogTimesTest(t, ctrl) - currentTime := timeFor(50) + mgr.commitLogFilesFn = func(_ commitlog.Options) ([]commitlog.File, error) { + return []commitlog.File{ + commitlog.File{Start: time10, Duration: commitLogBlockSize}, + commitlog.File{Start: time20, Duration: commitLogBlockSize}, + commitlog.File{Start: time30, Duration: commitLogBlockSize}, + }, nil + } gomock.InOrder( - ns.EXPECT().NeedsFlush(timeFor(30), timeFor(40)).Return(false), - ns.EXPECT().NeedsFlush(timeFor(20), timeFor(30)).Return(false), - ns.EXPECT().NeedsFlush(timeFor(10), timeFor(20)).Return(false), + ns.EXPECT().NeedsFlush(time10, time20).Return(false), + ns.EXPECT().NeedsFlush(time20, time30).Return(false), + ns.EXPECT().NeedsFlush(time30, time40).Return(false), ) - earliest, times, err := mgr.commitLogTimes(currentTime) + filesToCleanup, err := mgr.commitLogTimes(currentTime) require.NoError(t, err) - require.Equal(t, timeFor(10), earliest) - require.Equal(t, 3, len(times)) - require.True(t, contains(times, timeFor(10))) - require.True(t, contains(times, timeFor(20))) - require.True(t, contains(times, timeFor(30))) + require.Equal(t, 3, len(filesToCleanup)) + require.True(t, contains(filesToCleanup, time10)) + require.True(t, contains(filesToCleanup, time20)) + require.True(t, contains(filesToCleanup, time30)) } func TestCleanupManagerCommitLogTimesMiddlePendingFlush(t *testing.T) { @@ -502,20 +536,27 @@ func TestCleanupManagerCommitLogTimesMiddlePendingFlush(t *testing.T) { defer ctrl.Finish() ns, mgr := newCleanupManagerCommitLogTimesTest(t, ctrl) - currentTime := timeFor(50) + mgr.commitLogFilesFn = func(_ commitlog.Options) ([]commitlog.File, error) { + return []commitlog.File{ + commitlog.File{Start: time10, Duration: commitLogBlockSize}, + commitlog.File{Start: time20, Duration: commitLogBlockSize}, + commitlog.File{Start: time30, Duration: commitLogBlockSize}, + }, nil + } + ns.EXPECT().IsCapturedBySnapshot( + gomock.Any(), gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes() gomock.InOrder( - ns.EXPECT().NeedsFlush(timeFor(30), timeFor(40)).Return(false), - ns.EXPECT().NeedsFlush(timeFor(20), timeFor(30)).Return(true), - ns.EXPECT().NeedsFlush(timeFor(10), timeFor(20)).Return(false), + ns.EXPECT().NeedsFlush(time10, time20).Return(false), + ns.EXPECT().NeedsFlush(time20, time30).Return(true), + ns.EXPECT().NeedsFlush(time30, time40).Return(false), ) - earliest, times, err := mgr.commitLogTimes(currentTime) + filesToCleanup, err := mgr.commitLogTimes(currentTime) require.NoError(t, err) - require.Equal(t, timeFor(10), earliest) - require.Equal(t, 2, len(times)) - require.True(t, contains(times, timeFor(10))) - require.True(t, contains(times, timeFor(30))) + require.Equal(t, 2, len(filesToCleanup)) + require.True(t, contains(filesToCleanup, time10)) + require.True(t, contains(filesToCleanup, time30)) } func TestCleanupManagerCommitLogTimesStartPendingFlush(t *testing.T) { @@ -523,20 +564,28 @@ func TestCleanupManagerCommitLogTimesStartPendingFlush(t *testing.T) { defer ctrl.Finish() ns, mgr := newCleanupManagerCommitLogTimesTest(t, ctrl) - currentTime := timeFor(50) + mgr.commitLogFilesFn = func(_ commitlog.Options) ([]commitlog.File, error) { + return []commitlog.File{ + commitlog.File{Start: time10, Duration: commitLogBlockSize}, + commitlog.File{Start: time20, Duration: commitLogBlockSize}, + commitlog.File{Start: time30, Duration: commitLogBlockSize}, + }, nil + } + ns.EXPECT().IsCapturedBySnapshot( + gomock.Any(), gomock.Any(), gomock.Any(), + ).Return(false, nil).AnyTimes() gomock.InOrder( - ns.EXPECT().NeedsFlush(timeFor(30), timeFor(40)).Return(true), - ns.EXPECT().NeedsFlush(timeFor(20), timeFor(30)).Return(false), - ns.EXPECT().NeedsFlush(timeFor(10), timeFor(20)).Return(false), + ns.EXPECT().NeedsFlush(time10, time20).Return(false), + ns.EXPECT().NeedsFlush(time20, time30).Return(false), + ns.EXPECT().NeedsFlush(time30, time40).Return(true), ) - earliest, times, err := mgr.commitLogTimes(currentTime) + filesToCleanup, err := mgr.commitLogTimes(currentTime) require.NoError(t, err) - require.Equal(t, timeFor(10), earliest) - require.Equal(t, 2, len(times)) - require.True(t, contains(times, timeFor(20))) - require.True(t, contains(times, timeFor(10))) + require.Equal(t, 2, len(filesToCleanup)) + require.True(t, contains(filesToCleanup, time20)) + require.True(t, contains(filesToCleanup, time10)) } func TestCleanupManagerCommitLogTimesAllPendingFlush(t *testing.T) { @@ -544,29 +593,151 @@ func TestCleanupManagerCommitLogTimesAllPendingFlush(t *testing.T) { defer ctrl.Finish() ns, mgr := newCleanupManagerCommitLogTimesTest(t, ctrl) - currentTime := timeFor(50) + mgr.commitLogFilesFn = func(_ commitlog.Options) ([]commitlog.File, error) { + return []commitlog.File{ + commitlog.File{Start: time10, Duration: commitLogBlockSize}, + commitlog.File{Start: time20, Duration: commitLogBlockSize}, + commitlog.File{Start: time30, Duration: commitLogBlockSize}, + }, nil + } + ns.EXPECT().IsCapturedBySnapshot( + gomock.Any(), gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes() gomock.InOrder( - ns.EXPECT().NeedsFlush(timeFor(30), timeFor(40)).Return(true), - ns.EXPECT().NeedsFlush(timeFor(20), timeFor(30)).Return(true), - ns.EXPECT().NeedsFlush(timeFor(10), timeFor(20)).Return(true), + ns.EXPECT().NeedsFlush(time10, time20).Return(true), + ns.EXPECT().NeedsFlush(time20, time30).Return(true), + ns.EXPECT().NeedsFlush(time30, time40).Return(true), ) - earliest, times, err := mgr.commitLogTimes(currentTime) + filesToCleanup, err := mgr.commitLogTimes(currentTime) require.NoError(t, err) - require.Equal(t, timeFor(10), earliest) - require.Equal(t, 0, len(times)) + require.Equal(t, 0, len(filesToCleanup)) } func timeFor(s int64) time.Time { return time.Unix(s, 0) } -func contains(arr []time.Time, t time.Time) bool { +func contains(arr []commitlog.File, t time.Time) bool { for _, at := range arr { - if at.Equal(t) { + if at.Start.Equal(t) { return true } } return false } + +func TestCleanupManagerCommitLogTimesAllPendingFlushButHaveSnapshot(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + ns, mgr = newCleanupManagerCommitLogTimesTest(t, ctrl) + currentTime = timeFor(50) + commitLogBlockSize = 10 * time.Second + ) + mgr.commitLogFilesFn = func(_ commitlog.Options) ([]commitlog.File, error) { + return []commitlog.File{ + commitlog.File{Start: time10, Duration: commitLogBlockSize}, + commitlog.File{Start: time20, Duration: commitLogBlockSize}, + commitlog.File{Start: time30, Duration: commitLogBlockSize}, + }, nil + } + + gomock.InOrder( + // Commit log with start time10 captured by snapshot, + // should be able to delete. + ns.EXPECT().NeedsFlush(time10, time20).Return(true), + ns.EXPECT().IsCapturedBySnapshot( + gomock.Any(), gomock.Any(), time20).Return(true, nil), + // Commit log with start time20 captured by snapshot, + // should be able to delete. + ns.EXPECT().NeedsFlush(time20, time30).Return(true), + ns.EXPECT().IsCapturedBySnapshot( + gomock.Any(), gomock.Any(), time30).Return(true, nil), + // Commit log with start time30 not captured by snapshot, + // will need to retain. + ns.EXPECT().NeedsFlush(time30, time40).Return(true), + ns.EXPECT().IsCapturedBySnapshot( + gomock.Any(), gomock.Any(), time40).Return(false, nil), + ) + + filesToCleanup, err := mgr.commitLogTimes(currentTime) + require.NoError(t, err) + + // Only commit log files with starts time10 and time20 were + // captured by snapshot files, so those are the only ones + // we can delete. + require.True(t, contains(filesToCleanup, time10)) + require.True(t, contains(filesToCleanup, time20)) +} + +func TestCleanupManagerCommitLogTimesHandlesIsCapturedBySnapshotError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ns, mgr := newCleanupManagerCommitLogTimesTest(t, ctrl) + mgr.commitLogFilesFn = func(_ commitlog.Options) ([]commitlog.File, error) { + return []commitlog.File{ + commitlog.File{Start: time30, Duration: commitLogBlockSize}, + }, nil + } + + gomock.InOrder( + ns.EXPECT().NeedsFlush(time30, time40).Return(true), + ns.EXPECT().IsCapturedBySnapshot( + gomock.Any(), gomock.Any(), time40).Return(false, errors.New("err")), + ) + + _, err := mgr.commitLogTimes(currentTime) + require.Error(t, err) +} + +func TestCleanupManagerCommitLogTimesMultiNS(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ns1, ns2, mgr := newCleanupManagerCommitLogTimesTestMultiNS(t, ctrl) + mgr.commitLogFilesFn = func(_ commitlog.Options) ([]commitlog.File, error) { + return []commitlog.File{ + commitlog.File{Start: time10, Duration: commitLogBlockSize}, + commitlog.File{Start: time20, Duration: commitLogBlockSize}, + commitlog.File{Start: time30, Duration: commitLogBlockSize}, + }, nil + } + + // ns1 is flushed for time10->time20 and time20->time30. + // It is not flushed for time30->time40, but it doe have + // a snapshot that covers that range. + // + // ns2 is flushed for time10->time20. It is not flushed for + // time20->time30 but it does have a snapshot that covers + // that range. It does not have a flush or snapshot for + // time30->time40. + gomock.InOrder( + ns1.EXPECT().NeedsFlush(time10, time20).Return(false), + ns2.EXPECT().NeedsFlush(time10, time20).Return(false), + + ns1.EXPECT().NeedsFlush(time20, time30).Return(false), + ns2.EXPECT().NeedsFlush(time20, time30).Return(true), + ns2.EXPECT().IsCapturedBySnapshot( + gomock.Any(), gomock.Any(), time30).Return(true, nil), + + ns1.EXPECT().NeedsFlush(time30, time40).Return(true), + ns1.EXPECT().IsCapturedBySnapshot( + gomock.Any(), gomock.Any(), time40).Return(true, nil), + ns2.EXPECT().NeedsFlush(time30, time40).Return(true), + ns2.EXPECT().IsCapturedBySnapshot( + gomock.Any(), gomock.Any(), time40).Return(false, nil), + ) + + filesToCleanup, err := mgr.commitLogTimes(currentTime) + require.NoError(t, err) + + // time10 and time20 were covered by either a flush or snapshot + // for both namespaces, but time30 was only covered for ns1 by + // a snapshot, and ns2 didn't have a snapshot or flush for that + // time so the file needs to be retained. + require.True(t, contains(filesToCleanup, time10)) + require.True(t, contains(filesToCleanup, time20)) +} diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 977939bbf0..c6c16d5ee1 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3db/src/dbnode/clock" "github.com/m3db/m3db/src/dbnode/persist" + "github.com/m3db/m3db/src/dbnode/persist/fs" "github.com/m3db/m3db/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3db/src/dbnode/sharding" "github.com/m3db/m3db/src/dbnode/storage/block" @@ -108,8 +109,10 @@ type dbNamespace struct { nopts namespace.Options seriesOpts series.Options nowFn clock.NowFn - log xlog.Logger - bootstrapState BootstrapState + // TODO: Just rely on shard? + snapshotFilesFn snapshotFilesFn + log xlog.Logger + bootstrapState BootstrapState // Contains an entry to all shards for fast shard lookup, an // entry will be nil when this shard does not belong to current database @@ -320,6 +323,7 @@ func newDatabaseNamespace( nopts: nopts, seriesOpts: seriesOpts, nowFn: opts.ClockOptions().NowFn(), + snapshotFilesFn: fs.SnapshotFiles, log: logger, increasingIndex: increasingIndex, commitLogWriter: commitLogWriter, @@ -933,22 +937,76 @@ func (n *dbNamespace) Snapshot(blockStart, snapshotTime time.Time, flush persist return res } -func (n *dbNamespace) NeedsFlush(alignedInclusiveStart time.Time, alignedInclusiveEnd time.Time) bool { - var ( - blockSize = n.nopts.RetentionOptions().BlockSize() - blockStarts = timesInRange(alignedInclusiveStart, alignedInclusiveEnd, blockSize) - ) - +func (n *dbNamespace) NeedsFlush( + alignedInclusiveStart time.Time, alignedInclusiveEnd time.Time) bool { // NB(r): Essentially if all are success, we don't need to flush, if any // are failed with the minimum num failures less than max retries then // we need to flush - otherwise if any in progress we can't flush and if // any not started then we need to flush. n.RLock() defer n.RUnlock() + return n.needsFlushWithLock(alignedInclusiveStart, alignedInclusiveEnd) +} + +func (n *dbNamespace) IsCapturedBySnapshot( + alignedInclusiveStart, alignedInclusiveEnd, capturedUpTo time.Time) (bool, error) { + var ( + blockSize = n.nopts.RetentionOptions().BlockSize() + blockStarts = timesInRange(alignedInclusiveStart, alignedInclusiveEnd, blockSize) + filePathPrefix = n.opts.CommitLogOptions().FilesystemOptions().FilePathPrefix() + ) + + n.RLock() + defer n.RUnlock() + + for _, shard := range n.shards { + if shard == nil { + continue + } + + for _, blockStart := range blockStarts { + snapshotFiles, err := n.snapshotFilesFn(filePathPrefix, n.ID(), shard.ID()) + if err != nil { + return false, err + } + + if snapshotFiles == nil { + return false, nil + } + + snapshot, ok := snapshotFiles.LatestVolumeForBlock(blockStart) + if !ok { + // If a single shard is missing a snapshot for the blockStart then + // the entire namespace is not covered by snapshots up to time t. + return false, nil + } + + snapshotTime, err := snapshot.SnapshotTime() + if err != nil { + return false, err + } + + if snapshotTime.Before(capturedUpTo) { + // If a single shard's most recent snapshot has a snapshot time before + // capturedUpTo then we can't be sure that the entire namespace is + // covered by snapshots. + return false, nil + } + } + + } + + return true, nil +} + +func (n *dbNamespace) needsFlushWithLock(alignedInclusiveStart time.Time, alignedInclusiveEnd time.Time) bool { + var ( + blockSize = n.nopts.RetentionOptions().BlockSize() + blockStarts = timesInRange(alignedInclusiveStart, alignedInclusiveEnd, blockSize) + ) // NB(prateek): we do not check if any other flush is in progress in this method, // instead relying on the databaseFlushManager to ensure atomicity of flushes. - maxRetries := n.opts.MaxFlushRetries() // Check for not started or failed that might need a flush for _, shard := range n.shards { diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index efc0b0d6c5..5e20f7bdd2 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -28,6 +28,7 @@ import ( "time" "github.com/m3db/m3cluster/shard" + "github.com/m3db/m3db/src/dbnode/persist/fs" "github.com/m3db/m3db/src/dbnode/retention" "github.com/m3db/m3db/src/dbnode/runtime" "github.com/m3db/m3db/src/dbnode/sharding" @@ -1178,6 +1179,180 @@ func TestNamespaceBootstrapState(t *testing.T) { }, ns.BootstrapState()) } +func TestNamespaceIsCapturedBySnapshot(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ns, closer := newTestNamespace(t) + defer closer() + + var ( + testTime = time.Now() + blockSize = ns.nopts.RetentionOptions().BlockSize() + blockStart = time.Now().Truncate(blockSize) + testCases = []struct { + title string + alignedInclusiveStart time.Time + alignedInclusiveEnd time.Time + shards []uint32 + snapshotFilesByShard map[uint32][]fs.FileSetFile + expectedResult bool + expectedErr error + }{ + { + title: "Returns true if no shards", + shards: nil, + alignedInclusiveStart: blockStart, + alignedInclusiveEnd: blockStart, + snapshotFilesByShard: nil, + expectedResult: true, + expectedErr: nil, + }, + { + title: "Handles nil files", + shards: []uint32{0}, + alignedInclusiveStart: blockStart, + alignedInclusiveEnd: blockStart, + snapshotFilesByShard: map[uint32][]fs.FileSetFile{ + 0: nil, + }, + expectedResult: false, + expectedErr: nil, + }, + { + title: "Handles no latest volume for block", + shards: []uint32{0}, + alignedInclusiveStart: blockStart, + alignedInclusiveEnd: blockStart, + snapshotFilesByShard: map[uint32][]fs.FileSetFile{ + 0: nil, + }, + expectedResult: false, + expectedErr: nil, + }, + { + title: "Handles latest snapshot time before", + shards: []uint32{0}, + alignedInclusiveStart: blockStart, + alignedInclusiveEnd: blockStart, + snapshotFilesByShard: map[uint32][]fs.FileSetFile{ + 0: []fs.FileSetFile{ + fs.FileSetFile{ + ID: fs.FileSetFileIdentifier{ + BlockStart: testTime.Truncate(blockSize), + }, + // Must contain checkpoint file to be "valid". + AbsoluteFilepaths: []string{"checkpoint"}, + CachedSnapshotTime: testTime.Add(-1 * time.Second), + }, + }, + }, + expectedResult: false, + expectedErr: nil, + }, + { + title: "Handles latest snapshot time after", + shards: []uint32{0}, + alignedInclusiveStart: blockStart, + alignedInclusiveEnd: blockStart, + snapshotFilesByShard: map[uint32][]fs.FileSetFile{ + 0: []fs.FileSetFile{ + fs.FileSetFile{ + ID: fs.FileSetFileIdentifier{ + BlockStart: blockStart, + }, + // Must contain checkpoint file to be "valid". + AbsoluteFilepaths: []string{"checkpoint"}, + CachedSnapshotTime: testTime.Add(1 * time.Second), + }, + }, + }, + expectedResult: true, + expectedErr: nil, + }, + { + title: "Handles multiple blocks - One block with snapshot and one without", + shards: []uint32{0}, + alignedInclusiveStart: blockStart, + // Will iterate over two blocks, but only one will have a snapshot + // file. + alignedInclusiveEnd: blockStart.Add(blockSize), + snapshotFilesByShard: map[uint32][]fs.FileSetFile{ + 0: []fs.FileSetFile{ + fs.FileSetFile{ + ID: fs.FileSetFileIdentifier{ + BlockStart: blockStart, + }, + // Must contain checkpoint file to be "valid". + AbsoluteFilepaths: []string{"checkpoint"}, + CachedSnapshotTime: testTime.Add(1 * time.Second), + }, + }, + }, + expectedResult: false, + expectedErr: nil, + }, + { + title: "Handles multiple blocks - Both have snapshot", + shards: []uint32{0}, + alignedInclusiveStart: blockStart, + // Will iterate over two blocks and both will have a snapshot. + alignedInclusiveEnd: blockStart.Add(blockSize), + snapshotFilesByShard: map[uint32][]fs.FileSetFile{ + 0: []fs.FileSetFile{ + fs.FileSetFile{ + ID: fs.FileSetFileIdentifier{ + BlockStart: blockStart, + }, + // Must contain checkpoint file to be "valid". + AbsoluteFilepaths: []string{"checkpoint"}, + CachedSnapshotTime: testTime.Add(1 * time.Second), + }, + fs.FileSetFile{ + ID: fs.FileSetFileIdentifier{ + BlockStart: blockStart.Add(blockSize), + }, + // Must contain checkpoint file to be "valid". + AbsoluteFilepaths: []string{"checkpoint"}, + CachedSnapshotTime: testTime.Add(1 * time.Second), + }, + }, + }, + expectedResult: true, + expectedErr: nil, + }, + } + ) + + for _, tc := range testCases { + t.Run(tc.title, func(t *testing.T) { + ns, closer := newTestNamespace(t) + defer closer() + + ns.snapshotFilesFn = func(_ string, _ ident.ID, shard uint32) (fs.FileSetFilesSlice, error) { + files, ok := tc.snapshotFilesByShard[shard] + if !ok { + return nil, nil + } + return files, nil + } + // Make sure to nil out all other shards before hand. + for i := range ns.shards { + ns.shards[i] = nil + } + for _, shard := range tc.shards { + mockShard := NewMockdatabaseShard(ctrl) + mockShard.EXPECT().ID().Return(shard).AnyTimes() + ns.shards[shard] = mockShard + result, err := ns.IsCapturedBySnapshot( + tc.alignedInclusiveStart, tc.alignedInclusiveEnd, testTime) + require.Equal(t, tc.expectedErr, err) + require.Equal(t, tc.expectedResult, result) + } + }) + } +} + func waitForStats( reporter xmetrics.TestStatsReporter, check func(xmetrics.TestStatsReporter) bool, diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 8e1797f8ff..00c6ba1d8a 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -995,6 +995,19 @@ func (mr *MockdatabaseNamespaceMockRecorder) NeedsFlush(alignedInclusiveStart, a return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NeedsFlush", reflect.TypeOf((*MockdatabaseNamespace)(nil).NeedsFlush), alignedInclusiveStart, alignedInclusiveEnd) } +// IsCapturedBySnapshot mocks base method +func (m *MockdatabaseNamespace) IsCapturedBySnapshot(alignedInclusiveStart, alignedInclusiveEnd, t time.Time) (bool, error) { + ret := m.ctrl.Call(m, "IsCapturedBySnapshot", alignedInclusiveStart, alignedInclusiveEnd, t) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsCapturedBySnapshot indicates an expected call of IsCapturedBySnapshot +func (mr *MockdatabaseNamespaceMockRecorder) IsCapturedBySnapshot(alignedInclusiveStart, alignedInclusiveEnd, t interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsCapturedBySnapshot", reflect.TypeOf((*MockdatabaseNamespace)(nil).IsCapturedBySnapshot), alignedInclusiveStart, alignedInclusiveEnd, t) +} + // Truncate mocks base method func (m *MockdatabaseNamespace) Truncate() (int64, error) { ret := m.ctrl.Call(m, "Truncate") diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 3f273f88c5..2d2605774b 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -312,6 +312,16 @@ type databaseNamespace interface { // NB: The start/end times are assumed to be aligned to block size boundary. NeedsFlush(alignedInclusiveStart time.Time, alignedInclusiveEnd time.Time) bool + // IsCapturedBySnapshot accepts a time t (system time, not datapoint timestamp time) + // as well as a [start, end] range (inclusive on both sides) and determines if all of + // the data for all of its shards in the namespace blocks contained within the range + // are captured by snapshot files at least up until time t. + // This function will not take into account whether or not the data has been flushed + // already. I.E even if the data has been flushed successfully to disk, this function + // will return false if the data is not also present in a snapshot file. + IsCapturedBySnapshot( + alignedInclusiveStart, alignedInclusiveEnd, t time.Time) (bool, error) + // Truncate truncates the in-memory data for this namespace Truncate() (int64, error) diff --git a/src/dbnode/storage/util.go b/src/dbnode/storage/util.go index a4512c2432..eb9c5ea77b 100644 --- a/src/dbnode/storage/util.go +++ b/src/dbnode/storage/util.go @@ -22,6 +22,8 @@ package storage import ( "time" + + "github.com/m3db/m3db/src/dbnode/persist/fs/commitlog" ) // numIntervals returns the number of intervals between [start, end] for a given @@ -53,13 +55,30 @@ func timesInRange(startInclusive, endInclusive time.Time, windowSize time.Durati return times } -// filterTimes returns the values in the slice `times` which satisfy the provided predicate +// filterCommitLogFiles returns the values in the slice `files` which +// satisfy the provided predicate. +func filterCommitLogFiles( + files []commitlog.File, + predicate func(start time.Time, duration time.Duration) bool, +) []commitlog.File { + filtered := make([]commitlog.File, 0, len(files)) + for _, f := range files { + if predicate(f.Start, f.Duration) { + filtered = append(filtered, f) + } + } + return filtered +} + +// filterTimes returns the values in the slice `times` which satisfy +// the provided predicate. func filterTimes(times []time.Time, predicate func(t time.Time) bool) []time.Time { - retTimes := make([]time.Time, 0, len(times)) + filtered := make([]time.Time, 0, len(times)) for _, t := range times { if predicate(t) { - retTimes = append(retTimes, t) + filtered = append(filtered, t) } } - return retTimes + + return filtered }