Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ring handler on lifecycler #112

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions ring/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
)

Expand Down Expand Up @@ -90,7 +91,7 @@ func init() {
pageTemplate = template.Must(t.Parse(pageContent))
}

func (r *Ring) forget(ctx context.Context, id string) error {
func (h *ringPageHandler) forget(ctx context.Context, id string) error {
simonswine marked this conversation as resolved.
Show resolved Hide resolved
unregister := func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
return nil, false, fmt.Errorf("found empty ring when trying to unregister")
Expand All @@ -100,7 +101,7 @@ func (r *Ring) forget(ctx context.Context, id string) error {
ringDesc.RemoveIngester(id)
return ringDesc, true, nil
}
return r.KVClient.CAS(ctx, r.key, unregister)
return h.r.casRing(ctx, unregister)
}

type ingesterDesc struct {
Expand All @@ -121,11 +122,30 @@ type httpResponse struct {
ShowTokens bool `json:"-"`
}

func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
type ringObserver interface {
simonswine marked this conversation as resolved.
Show resolved Hide resolved
casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error
getRing(context.Context) (*Desc, map[string]uint32, error)
}

type ringPageHandler struct {
logger log.Logger
r ringObserver
heartbeatPeriod time.Duration
}

func newRingPageHandler(logger log.Logger, r ringObserver, heartbeatPeriod time.Duration) *ringPageHandler {
return &ringPageHandler{
logger: logger,
r: r,
heartbeatPeriod: heartbeatPeriod,
}
}

func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodPost {
ingesterID := req.FormValue("forget")
if err := r.forget(req.Context(), ingesterID); err != nil {
level.Error(r.logger).Log("msg", "error forgetting instance", "err", err)
if err := h.forget(req.Context(), ingesterID); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: it would be nice if this error was reported to the user, instead of (or in addition to) being logged. (If we removed logging here, we could remove logger completely)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I have removed it in 13169e4.

I guess there is no riks of exposing additional information to the HTTP caller, as the caller has to have quite a lot of access to the cluster anyhow, if they are able to remove ingesters.

level.Error(h.logger).Log("msg", "error forgetting instance", "err", err)
}

// Implement PRG pattern to prevent double-POST and work with CSRF middleware.
Expand All @@ -140,23 +160,25 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}

r.mtx.RLock()
defer r.mtx.RUnlock()
ringDesc, ownedTokens, err := h.r.getRing(req.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

ingesterIDs := []string{}
for id := range r.ringDesc.Ingesters {
for id := range ringDesc.Ingesters {
ingesterIDs = append(ingesterIDs, id)
}
sort.Strings(ingesterIDs)

now := time.Now()
var ingesters []ingesterDesc
_, owned := r.countTokens()
for _, id := range ingesterIDs {
ing := r.ringDesc.Ingesters[id]
ing := ringDesc.Ingesters[id]
heartbeatTimestamp := time.Unix(ing.Timestamp, 0)
state := ing.State.String()
if !r.IsHealthy(&ing, Reporting, now) {
if !ing.IsHealthy(Reporting, h.heartbeatPeriod, now) {
state = unhealthy
}

Expand All @@ -175,7 +197,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Tokens: ing.Tokens,
Zone: ing.Zone,
NumTokens: len(ing.Tokens),
Ownership: (float64(owned[id]) / float64(math.MaxUint32)) * 100,
Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100,
})
}

Expand Down
22 changes: 22 additions & 0 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"net/http"
"os"
"sort"
"sync"
Expand Down Expand Up @@ -849,6 +850,27 @@ func (i *Lifecycler) processShutdown(ctx context.Context) {
time.Sleep(i.cfg.FinalSleep)
}

func (i *Lifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return i.KVStore.CAS(ctx, i.RingKey, f)
}

func (i *Lifecycler) getRing(ctx context.Context) (*Desc, map[string]uint32, error) {
obj, err := i.KVStore.Get(ctx, i.RingKey)
if err != nil {
return nil, nil, err
}

ringDesc := obj.(*Desc)

_, ownedTokens := countTokens(ringDesc, ringDesc.GetTokens(), ringDesc.getTokensInfo())

return ringDesc, ownedTokens, nil
}

func (i *Lifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newRingPageHandler(i.logger, i, i.cfg.HeartbeatPeriod).handle(w, req)
}

// unregister removes our entry from consul.
func (i *Lifecycler) unregister(ctx context.Context) error {
level.Debug(i.logger).Log("msg", "unregistering instance from ring", "ring", i.RingName)
Expand Down
40 changes: 32 additions & 8 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"fmt"
"math"
"math/rand"
"net/http"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -513,27 +515,26 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro
}

// countTokens returns the number of tokens and tokens within the range for each instance.
// The ring read lock must be already taken when calling this function.
func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) {
func countTokens(ringDesc *Desc, ringTokens []uint32, ringInstanceByToken map[uint32]instanceInfo) (map[string]uint32, map[string]uint32) {
simonswine marked this conversation as resolved.
Show resolved Hide resolved
owned := map[string]uint32{}
numTokens := map[string]uint32{}
for i, token := range r.ringTokens {
for i, token := range ringTokens {
var diff uint32

// Compute how many tokens are within the range.
if i+1 == len(r.ringTokens) {
diff = (math.MaxUint32 - token) + r.ringTokens[0]
if i+1 == len(ringTokens) {
diff = (math.MaxUint32 - token) + ringTokens[0]
} else {
diff = r.ringTokens[i+1] - token
diff = ringTokens[i+1] - token
}

info := r.ringInstanceByToken[token]
info := ringInstanceByToken[token]
numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1
owned[info.InstanceID] = owned[info.InstanceID] + diff
}

// Set to 0 the number of owned tokens by instances which don't have tokens yet.
for id := range r.ringDesc.Ingesters {
for id := range ringDesc.Ingesters {
if _, ok := owned[id]; !ok {
owned[id] = 0
numTokens[id] = 0
Expand All @@ -543,6 +544,11 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) {
return numTokens, owned
}

// The ring read lock must be already taken when calling this function.
func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) {
return countTokens(r.ringDesc, r.ringTokens, r.ringInstanceByToken)
}

// updateRingMetrics updates ring metrics. Caller must be holding the Write lock!
func (r *Ring) updateRingMetrics(compareResult CompareResult) {
if compareResult == Equal {
Expand Down Expand Up @@ -840,6 +846,24 @@ func (r *Ring) CleanupShuffleShardCache(identifier string) {
}
}

func (r *Ring) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error {
return r.KVClient.CAS(ctx, r.key, f)
}

func (r *Ring) getRing(ctx context.Context) (*Desc, map[string]uint32, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

ringDesc := proto.Clone(r.ringDesc).(*Desc)
_, ownedTokens := r.countTokens()

return ringDesc, ownedTokens, nil
}

func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newRingPageHandler(r.logger, r, r.cfg.HeartbeatTimeout).handle(w, req)
}

// Operation describes which instances can be included in the replica set, based on their state.
//
// Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states.
Expand Down