Skip to content

Commit

Permalink
receive: fail early if ketama hashring is ill-configured
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Feb 27, 2023
1 parent fdeea39 commit 5bb16dc
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed

### Changed
- [#6168](https://github.com/thanos-io/thanos/pull/6168) Receiver: make ketama hashring fail early when configured in a bad way.

### Removed

Expand Down
24 changes: 18 additions & 6 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (f *fakeAppender) Rollback() error {
return f.rollbackErr()
}

func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgo HashringAlgorithm) ([]*Handler, Hashring) {
func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgo HashringAlgorithm) ([]*Handler, Hashring, error) {
var (
cfg = []HashringConfig{{Hashring: "test"}}
handlers []*Handler
Expand Down Expand Up @@ -202,11 +202,14 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
hashringAlgo = AlgorithmHashmod
}

hashring := newMultiHashring(hashringAlgo, replicationFactor, cfg)
hashring, err := newMultiHashring(hashringAlgo, replicationFactor, cfg)
if err != nil {
return nil, nil, err
}
for _, h := range handlers {
h.Hashring(hashring)
}
return handlers, hashring
return handlers, hashring, nil
}

func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsistencyDelay bool) {
Expand Down Expand Up @@ -576,7 +579,10 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist
},
} {
t.Run(tc.name, func(t *testing.T) {
handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo)
handlers, hashring, err := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo)
if err != nil {
t.Fatalf("unable to create test handler: %v", err)
}
tenant := "test"
// Test from the point of view of every node
// so that we know status code does not depend
Expand Down Expand Up @@ -706,7 +712,10 @@ func TestReceiveWriteRequestLimits(t *testing.T) {
appender: newFakeAppender(nil, nil, nil),
},
}
handlers, _ := newTestHandlerHashring(appendables, 3, AlgorithmHashmod)
handlers, _, err := newTestHandlerHashring(appendables, 3, AlgorithmHashmod)
if err != nil {
t.Fatalf("unable to create test handler: %v", err)
}
handler := handlers[0]

tenant := "test"
Expand Down Expand Up @@ -915,7 +924,10 @@ func makeSeriesWithValues(numSeries int) []prompb.TimeSeries {
func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
dir := b.TempDir()

handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1, AlgorithmHashmod)
handlers, _, err := newTestHandlerHashring([]*fakeAppendable{nil}, 1, AlgorithmHashmod)
if err != nil {
b.Fatalf("unable to create test handler: %v", err)
}
handler := handlers[0]

reg := prometheus.NewRegistry()
Expand Down
35 changes: 24 additions & 11 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,14 @@ type ketamaHashring struct {
numEndpoints uint64
}

func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFactor uint64) *ketamaHashring {
func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFactor uint64) (*ketamaHashring, error) {
numSections := len(endpoints) * sectionsPerNode

if len(endpoints) < int(replicationFactor) {
return nil, fmt.Errorf("ketama: amount of endpoints needs to be larger than replication factor")

}

hash := xxhash.New()
ringSections := make(sections, 0, numSections)
for endpointIndex, endpoint := range endpoints {
Expand All @@ -135,7 +140,7 @@ func newKetamaHashring(endpoints []string, sectionsPerNode int, replicationFacto
endpoints: endpoints,
sections: ringSections,
numEndpoints: uint64(len(endpoints)),
}
}, nil
}

// calculateSectionReplicas pre-calculates replicas for each section,
Expand Down Expand Up @@ -234,17 +239,21 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
// groups.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) Hashring {
func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) (Hashring, error) {
m := &multiHashring{
cache: make(map[string]Hashring),
}

for _, h := range cfg {
var hashring Hashring
var err error
if h.Algorithm != "" {
hashring = newHashring(h.Algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
hashring, err = newHashring(h.Algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
} else {
hashring = newHashring(algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
hashring, err = newHashring(algorithm, h.Endpoints, replicationFactor, h.Hashring, h.Tenants)
}
if err != nil {
return nil, err
}
m.hashrings = append(m.hashrings, hashring)
var t map[string]struct{}
Expand All @@ -256,7 +265,7 @@ func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
}
m.tenantSets = append(m.tenantSets, t)
}
return m
return m, nil
}

// HashringFromConfigWatcher creates multi-tenant hashrings from a
Expand All @@ -276,7 +285,11 @@ func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm,
if !ok {
return errors.New("hashring config watcher stopped unexpectedly")
}
updates <- newMultiHashring(algorithm, replicationFactor, cfg)
h, err := newMultiHashring(algorithm, replicationFactor, cfg)
if !ok {
return errors.Wrap(err, "unable to create new hashring from config")
}
updates <- h
case <-ctx.Done():
return ctx.Err()
}
Expand All @@ -295,20 +308,20 @@ func HashringFromConfig(algorithm HashringAlgorithm, replicationFactor uint64, c
return nil, errors.Wrapf(err, "failed to load configuration")
}

return newMultiHashring(algorithm, replicationFactor, config), err
return newMultiHashring(algorithm, replicationFactor, config)
}

func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) Hashring {
func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) {
switch algorithm {
case AlgorithmHashmod:
return simpleHashring(endpoints)
return simpleHashring(endpoints), nil
case AlgorithmKetama:
return newKetamaHashring(endpoints, SectionsPerNode, replicationFactor)
default:
l := log.NewNopLogger()
level.Warn(l).Log("msg", "Unrecognizable hashring algorithm. Fall back to hashmod algorithm.",
"hashring", hashring,
"tenants", tenants)
return simpleHashring(endpoints)
return simpleHashring(endpoints), nil
}
}
17 changes: 14 additions & 3 deletions pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ func TestHashringGet(t *testing.T) {
},
},
} {
hs := newMultiHashring(AlgorithmHashmod, 3, tc.cfg)
hs, err := newMultiHashring(AlgorithmHashmod, 3, tc.cfg)
require.NoError(t, err)

h, err := hs.Get(tc.tenant, ts)
if tc.nodes != nil {
if err != nil {
Expand Down Expand Up @@ -226,7 +228,8 @@ func TestKetamaHashringGet(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
hashRing := newKetamaHashring(test.nodes, 10, test.n+1)
hashRing, err := newKetamaHashring(test.nodes, 10, test.n+1)
require.NoError(t, err)

result, err := hashRing.GetN("tenant", test.ts, test.n)
require.NoError(t, err)
Expand All @@ -235,6 +238,11 @@ func TestKetamaHashringGet(t *testing.T) {
}
}

func TestKetamaHashringBadConfigIsRejected(t *testing.T) {
_, err := newKetamaHashring([]string{"node-1"}, 1, 2)
require.Error(t, err)
}

func TestKetamaHashringConsistency(t *testing.T) {
series := makeSeries()

Expand Down Expand Up @@ -348,7 +356,10 @@ func assignSeries(series []prompb.TimeSeries, nodes []string) (map[string][]prom
}

func assignReplicatedSeries(series []prompb.TimeSeries, nodes []string, replicas uint64) (map[string][]prompb.TimeSeries, error) {
hashRing := newKetamaHashring(nodes, SectionsPerNode, replicas)
hashRing, err := newKetamaHashring(nodes, SectionsPerNode, replicas)
if err != nil {
return nil, err
}
assignments := make(map[string][]prompb.TimeSeries)
for i := uint64(0); i < replicas; i++ {
for _, ts := range series {
Expand Down

0 comments on commit 5bb16dc

Please sign in to comment.