Skip to content

Commit

Permalink
[DBNode] - Repairs followup (#1862)
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Artoul authored Aug 8, 2019
1 parent 10ac303 commit 6697fd7
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 13 deletions.
12 changes: 8 additions & 4 deletions src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"

apachethrift "github.com/apache/thrift/lib/go/thrift"
"github.com/uber-go/tally"
"github.com/uber/tchannel-go/thrift"
"go.uber.org/zap"
Expand Down Expand Up @@ -2265,15 +2266,18 @@ func (s *session) streamBlocksMetadataFromPeer(
data.IncRef()
data.AppendAll(elem.ID)
data.DecRef()

clonedID := idPool.BinaryID(data)
// Return thrift bytes to pool once the ID has been copied.
apachethrift.BytesPoolPut(elem.ID)

var encodedTags checked.Bytes
if bytes := elem.EncodedTags; len(bytes) != 0 {
encodedTags = bytesPool.Get(len(bytes))
if tagBytes := elem.EncodedTags; len(tagBytes) != 0 {
encodedTags = bytesPool.Get(len(tagBytes))
encodedTags.IncRef()
encodedTags.AppendAll(bytes)
encodedTags.AppendAll(tagBytes)
encodedTags.DecRef()
// Return thrift bytes to pool once the tags have been copied.
apachethrift.BytesPoolPut(tagBytes)
}

// Error occurred retrieving block metadata, use default values
Expand Down
16 changes: 15 additions & 1 deletion src/dbnode/storage/repair/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,25 @@ func (m replicaMetadataComparer) Compare() MetadataComparisonResult {

for _, hm := range bm {
if !originContainsBlock {
if hm.Host.String() == m.origin.String() {
if hm.Host.ID() == m.origin.ID() {
originContainsBlock = true
}
}

if hm.Metadata.Checksum == nil {
// Skip metadata that doesn't have a checksum. This usually means that the
// metadata represents unmerged or pending data. Better to skip for now and
// repair it once it has been merged as opposed to repairing it now and
// ping-ponging the same data back and forth between all the repairing nodes.
//
// The impact of this is that recently modified data may take longer to be
// repaired, but it saves a ton of work by preventing nodes from repairing
// from each other unnecessarily even when they have identical data.
//
// TODO(rartoul): Consider skipping series with duplicate metadata as well?
continue
}

// Check size.
if firstSize {
sizeVal = hm.Metadata.Size
Expand Down
25 changes: 18 additions & 7 deletions src/dbnode/storage/repair/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ func TestReplicaMetadataComparerCompare(t *testing.T) {
Host: hosts[1],
Metadata: block.NewMetadata(ident.StringID("bar"), ident.Tags{}, now.Add(time.Second), int64(1), &ten, time.Time{}),
},
// hosts[0] has a checksum but hosts[1] doesn't so this block will not be repaired (skipped until the next attempt) at
// which points hosts[1] will have merged the blocks and an accurate comparison can be made.
block.ReplicaMetadata{
Host: hosts[0],
Metadata: block.NewMetadata(ident.StringID("baz"), ident.Tags{}, now.Add(2*time.Second), int64(2), &twenty, time.Time{}),
Expand All @@ -260,6 +262,15 @@ func TestReplicaMetadataComparerCompare(t *testing.T) {
Host: hosts[1],
Metadata: block.NewMetadata(ident.StringID("baz"), ident.Tags{}, now.Add(2*time.Second), int64(2), nil, time.Time{}),
},
// hosts[0] and hosts[1] both have a checksum, but they differ, so this should trigger a checksum mismatch.
block.ReplicaMetadata{
Host: hosts[0],
Metadata: block.NewMetadata(ident.StringID("boz"), ident.Tags{}, now.Add(2*time.Second), int64(2), &twenty, time.Time{}),
},
block.ReplicaMetadata{
Host: hosts[1],
Metadata: block.NewMetadata(ident.StringID("boz"), ident.Tags{}, now.Add(2*time.Second), int64(2), &ten, time.Time{}),
},
// Block only exists for host[1] but host[0] is the origin so should be consider a size/checksum mismatch.
block.ReplicaMetadata{
Host: hosts[1],
Expand All @@ -282,26 +293,26 @@ func TestReplicaMetadataComparerCompare(t *testing.T) {
inputs[3],
}},
{ident.StringID("gah"), now.Add(3 * time.Second), []block.ReplicaMetadata{
inputs[6],
inputs[8],
}},
}

checksumExpected := []testBlock{
{ident.StringID("baz"), now.Add(2 * time.Second), []block.ReplicaMetadata{
inputs[4],
inputs[5],
{ident.StringID("boz"), now.Add(2 * time.Second), []block.ReplicaMetadata{
inputs[6],
inputs[7],
}},
{ident.StringID("gah"), now.Add(3 * time.Second), []block.ReplicaMetadata{
inputs[6],
inputs[8],
}},
}

m := NewReplicaMetadataComparer(hosts[0], testRepairOptions()).(replicaMetadataComparer)
m.metadata = metadata

res := m.Compare()
require.Equal(t, int64(5), res.NumSeries)
require.Equal(t, int64(5), res.NumBlocks)
require.Equal(t, int64(6), res.NumSeries)
require.Equal(t, int64(6), res.NumBlocks)
assertEqual(t, sizeExpected, res.SizeDifferences)
assertEqual(t, checksumExpected, res.ChecksumDifferences)
}
4 changes: 3 additions & 1 deletion src/dbnode/storage/repair/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import (
)

const (
defaultRepairConsistencyLevel = topology.ReadConsistencyLevelMajority
// Allow repairs to progress when a single peer is down (I.E during single node failure
// or deployments).
defaultRepairConsistencyLevel = topology.ReadConsistencyLevelUnstrictMajority
defaultRepairCheckInterval = time.Minute
defaultRepairThrottle = 90 * time.Second
defaultRepairShardConcurrency = 1
Expand Down

0 comments on commit 6697fd7

Please sign in to comment.