Skip to content

Commit

Permalink
Use endpointregistry in the hotpath (#2823)
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Zavodskikh <[email protected]>
Co-authored-by: Roman Zavodskikh <[email protected]>
  • Loading branch information
RomanZavodskikh and Roman Zavodskikh authored Jan 10, 2024
1 parent df65830 commit a2bb7e8
Show file tree
Hide file tree
Showing 15 changed files with 76 additions and 83 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.18
v0.19
2 changes: 1 addition & 1 deletion docs/kubernetes/ingress-controller.md
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ the communication work with TCP and UDP to the specified `swarm-port`:
## Upgrades

Please always read the announcements of the vX.Y.**0**
[release page](https://github.com/zalando/skipper/releases/tag/v0.18.0),
[release page](https://github.com/zalando/skipper/releases/tag/v0.19.0),
because these will document in case we break something in a backwards non
compatible way. Most of the time it will be safe to deploy minor
version updates, but better to know in advance if something could
Expand Down
1 change: 0 additions & 1 deletion filters/fadein/fadein.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ func (p *postProcessor) Do(r []*routing.Route) []*routing.Route {
detected = now
}

ep.Detected = detected
if p.endpointRegisty != nil {
metrics := p.endpointRegisty.GetMetrics(ep.Host)
if endpointsCreated[key].After(metrics.DetectedTime()) {
Expand Down
17 changes: 1 addition & 16 deletions filters/fadein/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,6 @@ func TestPostProcessor(t *testing.T) {
}

for _, ep := range bar.LBEndpoints {
if ep.Detected.IsZero() {
t.Fatal("failed to set detection time")
}
if endpointRegistry.GetMetrics(ep.Host).DetectedTime().IsZero() {
t.Fatal("failed to set detection time")
}
Expand Down Expand Up @@ -284,7 +281,7 @@ func TestPostProcessor(t *testing.T) {
endpointRegisty := routing.NewEndpointRegistry(routing.RegistryOptions{})
rt, _ := createRouting(t, routes, endpointRegisty)
r := route(rt, "/")
if r == nil || len(r.LBEndpoints) == 0 || !r.LBEndpoints[0].Detected.IsZero() {
if r == nil || len(r.LBEndpoints) == 0 {
t.Fatal("failed to ignore negative duration")
}
if endpointRegisty.GetMetrics(r.LBEndpoints[0].Host).DetectedTime().IsZero() {
Expand All @@ -311,9 +308,6 @@ func TestPostProcessor(t *testing.T) {
var found bool
for _, ep := range r.LBEndpoints {
if ep.Host == "10.0.0.1:8080" {
if ep.Detected.After(firstDetected) {
t.Fatal("Failed to keep detection time.")
}
if endpointRegistry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to keep detection time.")
}
Expand Down Expand Up @@ -348,9 +342,6 @@ func TestPostProcessor(t *testing.T) {
var found bool
for _, ep := range r.LBEndpoints {
if ep.Host == "10.0.0.1:8080" {
if ep.Detected.After(firstDetected) {
t.Fatal("Failed to keep detection time.")
}
if endpointRegistry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to keep detection time.")
}
Expand Down Expand Up @@ -387,9 +378,6 @@ func TestPostProcessor(t *testing.T) {
var found bool
for _, ep := range r.LBEndpoints {
if ep.Host == "10.0.0.1:8080" {
if !ep.Detected.After(firstDetected) {
t.Fatal("Failed to clear detection time.")
}
if !endpointRegistry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to clear detection time.")
}
Expand Down Expand Up @@ -428,9 +416,6 @@ func TestPostProcessor(t *testing.T) {
var found bool
for _, ep := range r.LBEndpoints {
if ep.Host == "10.0.0.1:8080" {
if !ep.Detected.After(firstDetected) {
t.Fatal("Failed to reset detection time.")
}
if !endpointRegistry.GetMetrics(ep.Host).DetectedTime().After(firstDetected) {
t.Fatal("Failed to reset detection time.")
}
Expand Down
21 changes: 10 additions & 11 deletions loadbalancer/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ func shiftWeighted(rnd *rand.Rand, ctx *routing.LBContext, now time.Time) routin
rt := ctx.Route
ep := ctx.LBEndpoints
for _, epi := range ep {
wi := fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Detected)
wi := fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Metrics.DetectedTime())
sum += wi
}

choice := ep[len(ep)-1]
r := rnd.Float64() * sum
var upto float64
for i, epi := range ep {
upto += fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Detected)
upto += fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Metrics.DetectedTime())
if upto > r {
choice = ep[i]
break
Expand Down Expand Up @@ -113,15 +113,15 @@ func withFadeIn(rnd *rand.Rand, ctx *routing.LBContext, choice int, algo routing
now,
ctx.Route.LBFadeInDuration,
ctx.Route.LBFadeInExponent,
ctx.LBEndpoints[choice].Detected,
ctx.LBEndpoints[choice].Metrics.DetectedTime(),
)

if rnd.Float64() < f {
return ep[choice]
}
notFadingIndexes := make([]int, 0, len(ep))
for i := 0; i < len(ep); i++ {
if _, fadingIn := fadeInState(now, ctx.Route.LBFadeInDuration, ep[i].Detected); !fadingIn {
if _, fadingIn := fadeInState(now, ctx.Route.LBFadeInDuration, ep[i].Metrics.DetectedTime()); !fadingIn {
notFadingIndexes = append(notFadingIndexes, i)
}
}
Expand Down Expand Up @@ -260,7 +260,7 @@ func computeLoadAverage(ctx *routing.LBContext) float64 {
sum := 1.0 // add 1 to include the request that just arrived
endpoints := ctx.LBEndpoints
for _, v := range endpoints {
sum += float64(v.Metrics.GetInflightRequests())
sum += float64(v.Metrics.InflightRequests())
}
return sum / float64(len(endpoints))
}
Expand All @@ -277,10 +277,10 @@ func (ch *consistentHash) boundedLoadSearch(key string, balanceFactor float64, c
if skipEndpoint(endpointIndex) {
continue
}
load := ctx.LBEndpoints[endpointIndex].Metrics.GetInflightRequests()
load := ctx.LBEndpoints[endpointIndex].Metrics.InflightRequests()
// We know there must be an endpoint whose load <= average load.
// Since targetLoad >= average load (balancerFactor >= 1), there must also be an endpoint with load <= targetLoad.
if load <= int(targetLoad) {
if float64(load) <= targetLoad {
break
}
ringIndex = (ringIndex + 1) % ch.Len()
Expand Down Expand Up @@ -372,7 +372,7 @@ func (p *powerOfRandomNChoices) Apply(ctx *routing.LBContext) routing.LBEndpoint
// getScore returns negative value of inflightrequests count.
func (p *powerOfRandomNChoices) getScore(e routing.LBEndpoint) int64 {
// endpoints with higher inflight request should have lower score
return -int64(e.Metrics.GetInflightRequests())
return -int64(e.Metrics.InflightRequests())
}

type (
Expand Down Expand Up @@ -431,9 +431,8 @@ func parseEndpoints(r *routing.Route) error {
}

r.LBEndpoints[i] = routing.LBEndpoint{
Scheme: scheme,
Host: host,
Metrics: &routing.LBMetrics{},
Scheme: scheme,
Host: host,
}
}

Expand Down
24 changes: 19 additions & 5 deletions loadbalancer/algorithm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestSelectAlgorithm(t *testing.T) {

t.Run("LB route with default algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand All @@ -38,6 +39,7 @@ func TestSelectAlgorithm(t *testing.T) {
}

rr := p.Do([]*routing.Route{r})
endpointRegistry.Do([]*routing.Route{r})
if len(rr) != 1 {
t.Fatal("failed to process LB route")
}
Expand All @@ -56,6 +58,7 @@ func TestSelectAlgorithm(t *testing.T) {

t.Run("LB route with explicit round-robin algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand All @@ -65,6 +68,7 @@ func TestSelectAlgorithm(t *testing.T) {
}

rr := p.Do([]*routing.Route{r})
endpointRegistry.Do([]*routing.Route{r})
if len(rr) != 1 {
t.Fatal("failed to process LB route")
}
Expand All @@ -83,6 +87,7 @@ func TestSelectAlgorithm(t *testing.T) {

t.Run("LB route with explicit consistentHash algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand All @@ -92,6 +97,7 @@ func TestSelectAlgorithm(t *testing.T) {
}

rr := p.Do([]*routing.Route{r})
endpointRegistry.Do([]*routing.Route{r})
if len(rr) != 1 {
t.Fatal("failed to process LB route")
}
Expand All @@ -110,6 +116,7 @@ func TestSelectAlgorithm(t *testing.T) {

t.Run("LB route with explicit random algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand All @@ -119,6 +126,7 @@ func TestSelectAlgorithm(t *testing.T) {
}

rr := p.Do([]*routing.Route{r})
endpointRegistry.Do([]*routing.Route{r})
if len(rr) != 1 {
t.Fatal("failed to process LB route")
}
Expand All @@ -137,6 +145,7 @@ func TestSelectAlgorithm(t *testing.T) {

t.Run("LB route with explicit powerOfRandomNChoices algorithm", func(t *testing.T) {
p := NewAlgorithmProvider()
endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{})
r := &routing.Route{
Route: eskip.Route{
BackendType: eskip.LBBackend,
Expand All @@ -146,6 +155,7 @@ func TestSelectAlgorithm(t *testing.T) {
}

rr := p.Do([]*routing.Route{r})
endpointRegistry.Do([]*routing.Route{r})
if len(rr) != 1 {
t.Fatal("failed to process LB route")
}
Expand Down Expand Up @@ -264,6 +274,7 @@ func TestApply(t *testing.T) {
LBEndpoints: rt[0].LBEndpoints,
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}
lbctx.Registry.Do([]*routing.Route{r})

h := make(map[string]int)
for i := 0; i < R; i++ {
Expand Down Expand Up @@ -316,6 +327,7 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) {
LBEndpoints: endpoints,
},
}})[0]

ch := route.LBAlgorithm.(*consistentHash)
ctx := &routing.LBContext{
Request: r,
Expand All @@ -324,6 +336,7 @@ func TestConsistentHashBoundedLoadSearch(t *testing.T) {
Params: map[string]interface{}{ConsistentHashBalanceFactor: 1.25},
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}
ctx.Registry.Do([]*routing.Route{route})
noLoad := ch.Apply(ctx)
nonBounded := ch.Apply(&routing.LBContext{Request: r, Route: route, LBEndpoints: route.LBEndpoints, Params: map[string]interface{}{}})

Expand Down Expand Up @@ -393,6 +406,7 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
LBEndpoints: endpoints,
},
}})[0]

ch := route.LBAlgorithm.(*consistentHash)
balanceFactor := 1.25
ctx := &routing.LBContext{
Expand All @@ -402,23 +416,23 @@ func TestConsistentHashBoundedLoadDistribution(t *testing.T) {
Params: map[string]interface{}{ConsistentHashBalanceFactor: balanceFactor},
Registry: routing.NewEndpointRegistry(routing.RegistryOptions{}),
}
ctx.Registry.Do([]*routing.Route{route})

for i := 0; i < 100; i++ {
ep := ch.Apply(ctx)
ifr0 := route.LBEndpoints[0].Metrics.GetInflightRequests()
ifr1 := route.LBEndpoints[1].Metrics.GetInflightRequests()
ifr2 := route.LBEndpoints[2].Metrics.GetInflightRequests()
ifr0 := route.LBEndpoints[0].Metrics.InflightRequests()
ifr1 := route.LBEndpoints[1].Metrics.InflightRequests()
ifr2 := route.LBEndpoints[2].Metrics.InflightRequests()

assert.Equal(t, int64(ifr0), ctx.Registry.GetMetrics(route.LBEndpoints[0].Host).InflightRequests())
assert.Equal(t, int64(ifr1), ctx.Registry.GetMetrics(route.LBEndpoints[1].Host).InflightRequests())
assert.Equal(t, int64(ifr2), ctx.Registry.GetMetrics(route.LBEndpoints[2].Host).InflightRequests())

avg := float64(ifr0+ifr1+ifr2) / 3.0
limit := int(avg*balanceFactor) + 1
limit := int64(avg*balanceFactor) + 1
if ifr0 > limit || ifr1 > limit || ifr2 > limit {
t.Errorf("Expected in-flight requests for each endpoint to be less than %d. In-flight request counts: %d, %d, %d", limit, ifr0, ifr1, ifr2)
}
ep.Metrics.IncInflightRequest()
ctx.Registry.GetMetrics(ep.Host).IncInflightRequest()
}
}
Expand Down
8 changes: 4 additions & 4 deletions loadbalancer/fadein_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func initializeEndpoints(endpointAges []time.Duration, fadeInDuration time.Durat

for i := range eps {
ctx.Route.LBEndpoints = append(ctx.Route.LBEndpoints, routing.LBEndpoint{
Host: eps[i],
Detected: detectionTimes[i],
Host: eps[i],
Metrics: ctx.Registry.GetMetrics(eps[i]),
})
ctx.Registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}
Expand Down Expand Up @@ -329,8 +329,8 @@ func benchmarkFadeIn(
registry := routing.NewEndpointRegistry(routing.RegistryOptions{})
for i := range eps {
route.LBEndpoints = append(route.LBEndpoints, routing.LBEndpoint{
Host: eps[i],
Detected: detectionTimes[i],
Host: eps[i],
Metrics: registry.GetMetrics(eps[i]),
})
registry.GetMetrics(eps[i]).SetDetected(detectionTimes[i])
}
Expand Down
22 changes: 13 additions & 9 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,8 @@ func selectEndpoint(ctx *context, registry *routing.EndpointRegistry) *routing.L

// creates an outgoing http request to be forwarded to the route endpoint
// based on the augmented incoming request
func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHeaders bool, registry *routing.EndpointRegistry) (*http.Request, *routing.LBEndpoint, error) {
var endpoint *routing.LBEndpoint
func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHeaders bool, registry *routing.EndpointRegistry) (*http.Request, routing.Metrics, error) {
var endpointMetrics routing.Metrics
r := ctx.request
rt := ctx.route
host := ctx.outgoingHost
Expand All @@ -497,9 +497,13 @@ func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHea
setRequestURLFromRequest(u, r)
setRequestURLForDynamicBackend(u, stateBag)
case eskip.LBBackend:
endpoint = selectEndpoint(ctx, registry)
endpoint := selectEndpoint(ctx, registry)
endpointMetrics = endpoint.Metrics
u.Scheme = endpoint.Scheme
u.Host = endpoint.Host
case eskip.NetworkBackend:
endpointMetrics = registry.GetMetrics(rt.Host)
fallthrough
default:
u.Scheme = rt.Scheme
u.Host = rt.Host
Expand All @@ -512,7 +516,7 @@ func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHea

rr, err := http.NewRequestWithContext(requestContext, r.Method, u.String(), body)
if err != nil {
return nil, endpoint, err
return nil, nil, err
}

rr.ContentLength = r.ContentLength
Expand Down Expand Up @@ -543,7 +547,7 @@ func mapRequest(ctx *context, requestContext stdlibcontext.Context, removeHopHea
rr = forwardToProxy(r, rr)
}

return rr, endpoint, nil
return rr, endpointMetrics, nil
}

type proxyUrlContextKey struct{}
Expand Down Expand Up @@ -836,7 +840,7 @@ func (p *Proxy) makeUpgradeRequest(ctx *context, req *http.Request) {
}

func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Context) (*http.Response, *proxyError) {
req, endpoint, err := mapRequest(ctx, requestContext, p.flags.HopHeadersRemoval(), p.registry)
req, endpointMetrics, err := mapRequest(ctx, requestContext, p.flags.HopHeadersRemoval(), p.registry)
if err != nil {
return nil, &proxyError{err: fmt.Errorf("could not map backend request: %w", err)}
}
Expand All @@ -845,9 +849,9 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
return res, nil
}

if endpoint != nil {
endpoint.Metrics.IncInflightRequest()
defer endpoint.Metrics.DecInflightRequest()
if endpointMetrics != nil {
endpointMetrics.IncInflightRequest()
defer endpointMetrics.DecInflightRequest()
}

if p.experimentalUpgrade && isUpgradeRequest(req) {
Expand Down
Loading

0 comments on commit a2bb7e8

Please sign in to comment.