Skip to content

Commit

Permalink
Ensure distinct replication (#5604)
Browse files Browse the repository at this point in the history
* Ensure distinct replication

Signed-off-by: Filip Petkovski <[email protected]>

* Precalculate hashring replicas

The ketama hashring algorithm can cause requests to be replicated
to the same node since one node can claim two subsequent sections
in the ring.

In addition to this, the replication algorithm in the receive handler
assumes that a single batch will always be replicated to one node.
While this is a good optimization for the hashmod algorithm,
it cannot be generalized for every other type of hashring.

This commit fixes both issues.

Signed-off-by: Filip Petkovski <[email protected]>

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored Aug 23, 2022
1 parent 72e9156 commit ee512ae
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 78 deletions.
47 changes: 31 additions & 16 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func runReceive(

level.Debug(logger).Log("msg", "setting up tsdb")
{
if err := startTSDBAndUpload(g, logger, reg, dbs, reloadGRPCServer, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt); err != nil {
if err := startTSDBAndUpload(g, logger, reg, dbs, reloadGRPCServer, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm)); err != nil {
return err
}
}
Expand Down Expand Up @@ -465,6 +465,7 @@ func setupHashring(g *run.Group,
// In the single-node case, which has no configuration
// watcher, we close the chan ourselves.
updates := make(chan receive.Hashring, 1)
algorithm := receive.HashringAlgorithm(conf.hashringsAlgorithm)

// The Hashrings config file path is given initializing config watcher.
if conf.hashringsFilePath != "" {
Expand All @@ -483,7 +484,7 @@ func setupHashring(g *run.Group,
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
level.Info(logger).Log("msg", "the hashring initialized with config watcher.")
return receive.HashringFromConfigWatcher(ctx, receive.HashringAlgorithm(conf.hashringsAlgorithm), updates, cw)
return receive.HashringFromConfigWatcher(ctx, algorithm, conf.replicationFactor, updates, cw)
}, func(error) {
cancel()
})
Expand All @@ -494,7 +495,7 @@ func setupHashring(g *run.Group,
)
// The Hashrings config file content given initialize configuration from content.
if len(conf.hashringsFileContent) > 0 {
ring, err = receive.HashringFromConfig(receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.hashringsFileContent)
ring, err = receive.HashringFromConfig(algorithm, conf.replicationFactor, conf.hashringsFileContent)
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
Expand Down Expand Up @@ -567,7 +568,7 @@ func startTSDBAndUpload(g *run.Group,
uploadDone chan struct{},
statusProber prober.Probe,
bkt objstore.Bucket,

hashringAlgorithm receive.HashringAlgorithm,
) error {

log.With(logger, "component", "storage")
Expand Down Expand Up @@ -605,6 +606,7 @@ func startTSDBAndUpload(g *run.Group,
level.Info(logger).Log("msg", "storage is closed")
}()

var initialized bool
for {
select {
case <-cancel:
Expand All @@ -613,22 +615,35 @@ func startTSDBAndUpload(g *run.Group,
if !ok {
return nil
}
dbUpdatesStarted.Inc()
level.Info(logger).Log("msg", "updating storage")

if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
if err := dbs.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
if upload {
uploadC <- struct{}{}
<-uploadDone
// When using Ketama as the hashring algorithm, there is no need to flush the TSDB head.
// If new receivers were added to the hashring, existing receivers will not need to
// ingest additional series.
// If receivers are removed from the hashring, existing receivers will only need
// to ingest a subset of the series that were assigned to the removed receivers.
// As a result, changing the hashring produces no churn, hence no need to force
// head compaction and upload.
flushHead := !initialized || hashringAlgorithm != receive.AlgorithmKetama
if flushHead {
level.Info(logger).Log("msg", "updating storage")
dbUpdatesStarted.Inc()
if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
if err := dbs.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
if upload {
uploadC <- struct{}{}
<-uploadDone
}
dbUpdatesCompleted.Inc()
}
initialized = true

statusProber.Ready()
level.Info(logger).Log("msg", "storage started, and server is ready to receive web requests")
dbUpdatesCompleted.Inc()

reloadGRPCServer <- struct{}{}
}
}
Expand Down
73 changes: 45 additions & 28 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,12 @@ type replica struct {
replicated bool
}

// endpointReplica is a pair of a receive endpoint and a write request replica.
type endpointReplica struct {
endpoint string
replica replica
}

func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error {
tLogger := log.With(h.logger, "tenant", tenant)

Expand Down Expand Up @@ -713,9 +719,6 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
span, ctx := tracing.StartSpan(ctx, "receive_fanout_forward")
defer span.Finish()

wreqs := make(map[string]*prompb.WriteRequest)
replicas := make(map[string]replica)

// It is possible that hashring is ready in testReady() but unready now,
// so need to lock here.
h.mtx.RLock()
Expand All @@ -731,22 +734,23 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
// at most one outgoing write request will be made
// to every other node in the hashring, rather than
// one request per time series.
wreqs := make(map[endpointReplica]*prompb.WriteRequest)
for i := range wreq.Timeseries {
endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n)
if err != nil {
h.mtx.RUnlock()
return err
}
if _, ok := wreqs[endpoint]; !ok {
wreqs[endpoint] = &prompb.WriteRequest{}
replicas[endpoint] = r
key := endpointReplica{endpoint: endpoint, replica: r}
if _, ok := wreqs[key]; !ok {
wreqs[key] = &prompb.WriteRequest{}
}
wr := wreqs[endpoint]
wr := wreqs[key]
wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i])
}
h.mtx.RUnlock()

return h.fanoutForward(ctx, tenant, replicas, wreqs, len(wreqs))
return h.fanoutForward(ctx, tenant, wreqs, len(wreqs))
}

// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success.
Expand All @@ -756,7 +760,7 @@ func (h *Handler) writeQuorum() int {

// fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of
// requests succeeds or fails or if context is canceled.
func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas map[string]replica, wreqs map[string]*prompb.WriteRequest, successThreshold int) error {
func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*prompb.WriteRequest, successThreshold int) error {
var errs errutil.MultiError

fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout)
Expand All @@ -781,19 +785,22 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
ec := make(chan error)

var wg sync.WaitGroup
for endpoint := range wreqs {
wg.Add(1)
for er := range wreqs {
er := er
r := er.replica
endpoint := er.endpoint

wg.Add(1)
// If the request is not yet replicated, let's replicate it.
// If the replication factor isn't greater than 1, let's
// just forward the requests.
if !replicas[endpoint].replicated && h.options.ReplicationFactor > 1 {
if !r.replicated && h.options.ReplicationFactor > 1 {
go func(endpoint string) {
defer wg.Done()

var err error
tracing.DoInSpan(fctx, "receive_replicate", func(ctx context.Context) {
err = h.replicate(ctx, tenant, wreqs[endpoint])
err = h.replicate(ctx, tenant, wreqs[er])
})
if err != nil {
h.replications.WithLabelValues(labelError).Inc()
Expand All @@ -820,7 +827,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma

var err error
tracing.DoInSpan(fctx, "receive_tsdb_write", func(_ context.Context) {
err = h.writer.Write(fctx, tenant, wreqs[endpoint])
err = h.writer.Write(fctx, tenant, wreqs[er])
})
if err != nil {
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
Expand Down Expand Up @@ -873,10 +880,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
tracing.DoInSpan(fctx, "receive_forward", func(ctx context.Context) {
// Actually make the request against the endpoint we determined should handle these time series.
_, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{
Timeseries: wreqs[endpoint].Timeseries,
Timeseries: wreqs[er].Timeseries,
Tenant: tenant,
// Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated.
Replica: int64(replicas[endpoint].n + 1),
Replica: int64(r.n + 1),
})
})
if err != nil {
Expand Down Expand Up @@ -952,10 +959,6 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
// The function only returns when all replication requests have finished
// or the context is canceled.
func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.WriteRequest) error {
wreqs := make(map[string]*prompb.WriteRequest)
replicas := make(map[string]replica)
var i uint64

// It is possible that hashring is ready in testReady() but unready now,
// so need to lock here.
h.mtx.RLock()
Expand All @@ -964,20 +967,34 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri
return errors.New("hashring is not ready")
}

for i = 0; i < h.options.ReplicationFactor; i++ {
endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[0], i)
if err != nil {
h.mtx.RUnlock()
return err
replicatedRequests := make(map[endpointReplica]*prompb.WriteRequest)
for i := uint64(0); i < h.options.ReplicationFactor; i++ {
for _, ts := range wreq.Timeseries {
endpoint, err := h.hashring.GetN(tenant, &ts, i)
if err != nil {
h.mtx.RUnlock()
return err
}

er := endpointReplica{
endpoint: endpoint,
replica: replica{n: i, replicated: true},
}
replicatedRequest, ok := replicatedRequests[er]
if !ok {
replicatedRequest = &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, 0),
}
replicatedRequests[er] = replicatedRequest
}
replicatedRequest.Timeseries = append(replicatedRequest.Timeseries, ts)
}
wreqs[endpoint] = wreq
replicas[endpoint] = replica{i, true}
}
h.mtx.RUnlock()

quorum := h.writeQuorum()
// fanoutForward only returns an error if successThreshold (quorum) is not reached.
if err := h.fanoutForward(ctx, tenant, replicas, wreqs, quorum); err != nil {
if err := h.fanoutForward(ctx, tenant, replicatedRequests, quorum); err != nil {
return errors.Wrap(determineWriteErrorCause(err, quorum), "quorum not reached")
}
return nil
Expand Down
19 changes: 17 additions & 2 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint)
peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h}
}
hashring := newMultiHashring(AlgorithmHashmod, cfg)
hashring := newMultiHashring(AlgorithmHashmod, replicationFactor, cfg)
for _, h := range handlers {
h.Hashring(hashring)
}
Expand Down Expand Up @@ -1258,7 +1258,7 @@ func (a *tsOverrideAppender) GetRef(lset labels.Labels) (storage.SeriesRef, labe
}

// serializeSeriesWithOneSample returns marshaled and compressed remote write requests like it would
// be send to Thanos receive.
// be sent to Thanos receive.
// It has one sample and allow passing multiple series, in same manner as typical Prometheus would batch it.
func serializeSeriesWithOneSample(t testing.TB, series [][]labelpb.ZLabel) []byte {
r := &prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, 0, len(series))}
Expand Down Expand Up @@ -1338,6 +1338,21 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) {
return series
}()),
},
{
name: "typical labels under 1KB, 20000 of them",
writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel {
series := make([][]labelpb.ZLabel, 20000)
for s := 0; s < len(series); s++ {
lbls := make([]labelpb.ZLabel, 10)
for i := 0; i < len(lbls); i++ {
// Label ~20B name, 50B value.
lbls[i] = labelpb.ZLabel{Name: fmt.Sprintf("abcdefghijabcdefghijabcdefghij%d", i), Value: fmt.Sprintf("abcdefghijabcdefghijabcdefghijabcdefghijabcdefghij%d", i)}
}
series[s] = lbls
}
return series
}()),
},
{
name: "extremely large label value 10MB, 10 of them",
writeRequest: serializeSeriesWithOneSample(b, func() [][]labelpb.ZLabel {
Expand Down
Loading

0 comments on commit ee512ae

Please sign in to comment.