Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Skip entries for unowned shards in commit log bootstrapper #2145

Merged
merged 9 commits into from
Feb 24, 2020
28 changes: 27 additions & 1 deletion src/dbnode/integration/admin_session_fetch_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
Expand Down Expand Up @@ -123,6 +123,32 @@ func testSetupMetadatas(
return metadatasByShard
}

func filterSeriesByShard(
testSetup *testSetup,
seriesMap map[xtime.UnixNano]generate.SeriesBlock,
desiredShards []uint32,
) map[xtime.UnixNano]generate.SeriesBlock {
filteredMap := make(map[xtime.UnixNano]generate.SeriesBlock)
for blockStart, series := range seriesMap {
filteredSeries := make([]generate.Series, 0, len(series))
for _, serie := range series {
shard := testSetup.shardSet.Lookup(serie.ID)
for _, ss := range desiredShards {
if ss == shard {
filteredSeries = append(filteredSeries, serie)
break
}
}
}

if len(filteredSeries) > 0 {
filteredMap[blockStart] = filteredSeries
}
}

return filteredMap
}

func verifySeriesMapsEqual(
t *testing.T,
expectedSeriesMap map[xtime.UnixNano]generate.SeriesBlock,
Expand Down
145 changes: 145 additions & 0 deletions src/dbnode/integration/commitlog_bootstrap_unowned_shard_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// +build integration

// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"fmt"
"testing"
"time"

"github.com/m3db/m3/src/cluster/services"
"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/integration/fake"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/topology"
xtest "github.com/m3db/m3/src/x/test"

"github.com/stretchr/testify/require"
)

func TestCommitLogBootstrapUnownedShard(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

log := xtest.NewLogger(t)
retentionOpts := retention.NewOptions().
SetRetentionPeriod(20 * time.Hour).
SetBlockSize(2 * time.Hour).
SetBufferPast(10 * time.Minute).
SetBufferFuture(10 * time.Minute)
blockSize := retentionOpts.BlockSize()

ns1, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions().
SetRetentionOptions(retentionOpts))
require.NoError(t, err)
numShards := 6

// Helper function to create node instances for fake cluster service.
node := func(index int, shards []uint32) services.ServiceInstance {
id := fmt.Sprintf("testhost%d", index)
endpoint := fmt.Sprintf("127.0.0.1:%d", multiAddrPortStart+(index*multiAddrPortEach))

result := services.NewServiceInstance().
SetInstanceID(id).
SetEndpoint(endpoint)
resultShards := make([]shard.Shard, len(shards))
for i, id := range shards {
resultShards[i] = shard.NewShard(id).SetState(shard.Available)
}
return result.SetShards(shard.NewShards(resultShards))
}

// Pretend there are two nodes sharing 6 shards (RF1).
node0OwnedShards := []uint32{0, 1, 2}
svc := fake.NewM3ClusterService().
SetInstances([]services.ServiceInstance{
node(0, node0OwnedShards),
node(1, []uint32{3, 4, 5}),
}).
SetReplication(services.NewServiceReplication().SetReplicas(1)).
SetSharding(services.NewServiceSharding().SetNumShards(numShards))
svcs := fake.NewM3ClusterServices()
svcs.RegisterService("m3db", svc)
topoOpts := topology.NewDynamicOptions().
SetConfigServiceClient(fake.NewM3ClusterClient(svcs, nil))
topoInit := topology.NewDynamicInitializer(topoOpts)

opts := newTestOptions(t).
SetNamespaces([]namespace.Metadata{ns1}).
SetNumShards(numShards)
setupOpts := []bootstrappableTestSetupOptions{
{disablePeersBootstrapper: true, topologyInitializer: topoInit},
{disablePeersBootstrapper: true, topologyInitializer: topoInit},
}

setups, closeFn := newDefaultBootstrappableTestSetups(t, opts, setupOpts)
defer closeFn()

// Only set this up for the first setup because we're only writing commit
// logs for the first server.
setup := setups[0]
commitLogOpts := setup.storageOpts.CommitLogOptions().
SetFlushInterval(defaultIntegrationTestFlushInterval)
setup.storageOpts = setup.storageOpts.SetCommitLogOptions(commitLogOpts)

log.Info("generating data")
now := setup.getNowFn()
seriesMaps := generateSeriesMaps(30, nil, now.Add(-2*blockSize), now.Add(-blockSize))
log.Info("writing data")
// Write commit log with generated data that spreads across all shards
// (including shards that this node should not own). This node should still
// be able to bootstrap successfully with commit log entries from shards
// that it does not own.
writeCommitLogData(t, setup, commitLogOpts, seriesMaps, ns1, false)
log.Info("finished writing data")

// Setup bootstrapper after writing data so filesystem inspection can find it.
setupCommitLogBootstrapperWithFSInspection(t, setup, commitLogOpts)

// Start the servers.
for _, setup := range setups {
require.NoError(t, setup.startServer())
}

// Defer stop the servers.
defer func() {
setups.parallel(func(s *testSetup) {
require.NoError(t, s.stopServer())
})
log.Debug("servers are now down")
}()

// Only fetch blocks for shards owned by node 0.
metadatasByShard, err := m3dbClientFetchBlocksMetadata(
setup.m3dbVerificationAdminClient, testNamespaces[0], node0OwnedShards,
now.Add(-2*blockSize), now, topology.ReadConsistencyLevelMajority)
require.NoError(t, err)

observedSeriesMaps := testSetupToSeriesMaps(t, setup, ns1, metadatasByShard)
// Filter out the written series that node 0 does not own.
filteredSeriesMaps := filterSeriesByShard(setup, seriesMaps, node0OwnedShards)
// Expect to only see data that node 0 owns.
verifySeriesMapsEqual(t, filteredSeriesMaps, observedSeriesMaps)
}
16 changes: 9 additions & 7 deletions src/dbnode/storage/bootstrap/bootstrap_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 29 additions & 8 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ type seriesMapKey struct {
}

type seriesMapEntry struct {
namespace *bootstrapNamespace
series bootstrap.CheckoutSeriesResult
shardNoLongerOwned bool
namespace *bootstrapNamespace
series bootstrap.CheckoutSeriesResult
}

// accumulateArg contains all the information a worker go-routine needs to
Expand Down Expand Up @@ -289,6 +290,7 @@ func (s *commitLogSource) Read(
}
datapointsSkippedNotBootstrappingNamespace = 0
datapointsSkippedNotBootstrappingShard = 0
datapointsSkippedShardNoLongerOwned = 0
startCommitLogsRead = s.nowFn()
)
s.log.Info("read commit logs start")
Expand All @@ -301,7 +303,8 @@ func (s *commitLogSource) Read(
zap.Stringer("took", s.nowFn().Sub(startCommitLogsRead)),
zap.Int("datapointsRead", datapointsRead),
zap.Int("datapointsSkippedNotBootstrappingNamespace", datapointsSkippedNotBootstrappingNamespace),
zap.Int("datapointsSkippedNotBootstrappingShard", datapointsSkippedNotBootstrappingShard))
zap.Int("datapointsSkippedNotBootstrappingShard", datapointsSkippedNotBootstrappingShard),
zap.Int("datapointsSkippedShardNoLongerOwned", datapointsSkippedShardNoLongerOwned))
}()

iter, corruptFiles, err := s.newIteratorFn(iterOpts)
Expand Down Expand Up @@ -450,13 +453,19 @@ func (s *commitLogSource) Read(
// Check out the series for writing, no need for concurrency
// as commit log bootstrapper does not perform parallel
// checking out of series.
series, err := accumulator.CheckoutSeriesWithoutLock(
series, owned, err := accumulator.CheckoutSeriesWithoutLock(
entry.Series.Shard,
entry.Series.ID,
tagIter,
)

tagIter)
if err != nil {
if !owned {
// If we encounter a log entry for a shard that we're
// not responsible for, skip this entry. This can occur
// when a topology change happens and we bootstrap from
// a commit log which contains this data.
commitLogSeries[seriesKey] = seriesMapEntry{shardNoLongerOwned: true}
continue
}
return bootstrap.NamespaceResults{}, err
}

Expand All @@ -469,11 +478,19 @@ func (s *commitLogSource) Read(
commitLogSeries[seriesKey] = seriesEntry
}

// If series is no longer owned, then we can safely skip trying to
// bootstrap the result.
if seriesEntry.shardNoLongerOwned {
datapointsSkippedShardNoLongerOwned++
continue
}

// If not bootstrapping this namespace then skip this result.
if !seriesEntry.namespace.bootstrapping {
datapointsSkippedNotBootstrappingNamespace++
continue
}

// If not bootstrapping shard for this series then also skip.
// NB(r): This can occur when a topology change happens then we
// bootstrap from the commit log data that the node no longer owns.
Expand Down Expand Up @@ -783,8 +800,12 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot(
}

// NB(r): No parallelization required to checkout the series.
ref, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags)
ref, owned, err := accumulator.CheckoutSeriesWithoutLock(shard, id, tags)
if err != nil {
if !owned {
// Skip bootstrapping this series if we don't own it.
continue
}
return err
}

Expand Down
6 changes: 5 additions & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,12 @@ func (s *fileSystemSource) readNextEntryAndRecordBlock(
return fmt.Errorf("error reading data file: %v", err)
}

ref, err := accumulator.CheckoutSeriesWithLock(shardID, id, tagsIter)
ref, owned, err := accumulator.CheckoutSeriesWithLock(shardID, id, tagsIter)
if err != nil {
if !owned {
// Ignore if we no longer own the shard for this series.
return nil
}
return fmt.Errorf("unable to checkout series: %v", err)
}

Expand Down
7 changes: 6 additions & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,13 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers(
for _, elem := range shardResult.AllSeries().Iter() {
entry := elem.Value()
tagsIter.Reset(entry.Tags)
ref, err := accumulator.CheckoutSeriesWithLock(shard, entry.ID, tagsIter)
ref, owned, err := accumulator.CheckoutSeriesWithLock(shard, entry.ID, tagsIter)
if err != nil {
if !owned {
// Only if we own this shard do we care consider this an
// error in bootstrapping.
continue
}
unfulfill(currRange)
s.log.Error("could not checkout series", zap.Error(err))
continue
Expand Down
19 changes: 11 additions & 8 deletions src/dbnode/storage/bootstrap/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,14 @@ type NamespaceDataAccumulator interface {
// CheckoutSeriesWithoutLock retrieves a series for writing to
// and when the accumulator is closed it will ensure that the
// series is released.
//
// If indexing is not enabled, tags is still required, simply pass
// ident.EmptyTagIterator.
//
// Returns the result, whether the node owns the specified shard, along with
// an error if any. This allows callers to handle unowned shards differently
// than other errors. If owned == false, err should not be nil.
//
// Note: Without lock variant does not perform any locking and callers
// must ensure non-parallel access themselves, this helps avoid
// overhead of the lock for the commit log bootstrapper which reads
Expand All @@ -236,20 +242,17 @@ type NamespaceDataAccumulator interface {
shardID uint32,
id ident.ID,
tags ident.TagIterator,
) (CheckoutSeriesResult, error)
) (result CheckoutSeriesResult, owned bool, err error)

// CheckoutSeriesWithLock retrieves a series for writing to
// and when the accumulator is closed it will ensure that the
// series is released.
// If indexing is not enabled, tags is still required, simply pass
// ident.EmptyTagIterator.
// Note: With lock variant perform locking and callers do not need
// CheckoutSeriesWithLock is the "with lock" version of
// CheckoutSeriesWithoutLock.
// Note: With lock variant performs locking and callers do not need
// to be concerned about parallel access.
CheckoutSeriesWithLock(
shardID uint32,
id ident.ID,
tags ident.TagIterator,
) (CheckoutSeriesResult, error)
) (result CheckoutSeriesResult, owned bool, err error)

// Close will close the data accumulator and will release
// all series read/write refs.
Expand Down
Loading