Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring concurrency in scaler's pending HTTP queue fetcher logic #291

Merged
merged 14 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion operator/controllers/ping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestPingInterceptors(t *testing.T) {
r.NoError(err)
defer srv.Close()
ctx := context.Background()
endpoints := k8s.FakeEndpointsForURL(url, ns, svcName, 2)
endpoints, err := k8s.FakeEndpointsForURL(url, ns, svcName, 2)
r.NoError(err)
cl := fake.NewClientBuilder().WithObjects(endpoints).Build()
r.NoError(pingInterceptors(
ctx,
Expand Down
31 changes: 28 additions & 3 deletions pkg/k8s/fake_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package k8s

import (
"net/url"
"strconv"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,13 +17,36 @@ func FakeEndpointsForURL(
namespace,
name string,
num int,
) *v1.Endpoints {
addrs := make([]v1.EndpointAddress, num)
) (*v1.Endpoints, error) {
urls := make([]*url.URL, num)
for i := 0; i < num; i++ {
urls[i] = u
}
return FakeEndpointsForURLs(urls, namespace, name)
}

// FakeEndpointsForURLs creates and returns a new
// *v1.Endpoints with a single v1.EndpointSubset in it
// that has each url in the urls parameter in it.
func FakeEndpointsForURLs(
urls []*url.URL,
namespace,
name string,
) (*v1.Endpoints, error) {
addrs := make([]v1.EndpointAddress, len(urls))
ports := make([]v1.EndpointPort, len(urls))
for i, u := range urls {
addrs[i] = v1.EndpointAddress{
Hostname: u.Hostname(),
IP: u.Hostname(),
}
portInt, err := strconv.Atoi(u.Port())
if err != nil {
return nil, err
}
ports[i] = v1.EndpointPort{
Port: int32(portInt),
}
}
return &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -32,7 +56,8 @@ func FakeEndpointsForURL(
Subsets: []v1.EndpointSubset{
{
Addresses: addrs,
Ports: ports,
},
},
}
}, nil
}
9 changes: 9 additions & 0 deletions pkg/queue/queue_counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ func NewCounts() *Counts {
}
}

// Aggregate returns the total count across all hosts
func (q *Counts) Aggregate() int {
agg := 0
for _, count := range q.Counts {
agg += count
}
return agg
}

// MarshalJSON implements json.Marshaler
func (q *Counts) MarshalJSON() ([]byte, error) {
return json.Marshal(q.Counts)
Expand Down
23 changes: 23 additions & 0 deletions pkg/queue/queue_counts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package queue

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestAggregate(t *testing.T) {
r := require.New(t)
counts := NewCounts()
counts.Counts = map[string]int{
"host1": 123,
"host2": 234,
"host3": 456,
"host4": 567,
}
expectedAgg := 0
for _, v := range counts.Counts {
expectedAgg += v
}
r.Equal(expectedAgg, counts.Aggregate())
}
2 changes: 2 additions & 0 deletions scaler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type config struct {
// UpdateRoutingTableDur is the duration between manual
// updates to the routing table.
UpdateRoutingTableDur time.Duration `envconfig:"KEDA_HTTP_SCALER_ROUTING_TABLE_UPDATE_DUR" default:"100ms"`
// QueueTickDuration is the duration between queue requests
QueueTickDuration time.Duration `envconfig:"KEDA_HTTP_QUEUE_TICK_DURATION" default:"500ms"`
// This will be the 'Target Pending Requests' for the interceptor
TargetPendingRequestsInterceptor int `envconfig:"KEDA_HTTP_SCALER_TARGET_PENDING_REQUESTS_INTERCEPTOR" default:"100"`
}
Expand Down
25 changes: 19 additions & 6 deletions scaler/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ func TestIsActive(t *testing.T) {
ctx := context.Background()
lggr := logr.Discard()
table := routing.NewTable()
ticker, pinger := newFakeQueuePinger(ctx, lggr)
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
r.NoError(err)
defer ticker.Stop()
pinger.pingMut.Lock()
pinger.allCounts[host] = 0
Expand Down Expand Up @@ -80,7 +81,8 @@ func TestGetMetricSpec(t *testing.T) {
"testdepl",
int32(target),
))
ticker, pinger := newFakeQueuePinger(ctx, lggr)
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
r.NoError(err)
defer ticker.Stop()
hdl := newImpl(lggr, pinger, table, 123, 200)
meta := map[string]string{
Expand Down Expand Up @@ -109,7 +111,8 @@ func TestGetMetricsMissingHostInMetadata(t *testing.T) {
ScaledObjectRef: &externalscaler.ScaledObjectRef{},
}
table := routing.NewTable()
ticker, pinger := newFakeQueuePinger(ctx, lggr)
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
r.NoError(err)
defer ticker.Stop()
hdl := newImpl(lggr, pinger, table, 123, 200)

Expand All @@ -136,7 +139,9 @@ func TestGetMetricsMissingHostInQueue(t *testing.T) {
}

table := routing.NewTable()
ticker, pinger := newFakeQueuePinger(ctx, lggr)
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
r.NoError(err)

defer ticker.Stop()
hdl := newImpl(lggr, pinger, table, 123, 200)

Expand Down Expand Up @@ -201,14 +206,21 @@ func TestGetMetricsHostFoundInQueueCounts(t *testing.T) {
table := routing.NewTable()
// create a fake queue pinger. this is the simulated
// scaler that pings the above fake interceptor
ticker, pinger := newFakeQueuePinger(
ticker, pinger, err := newFakeQueuePinger(
ctx,
lggr,
func(opts *fakeQueuePingerOpts) { opts.endpoints = endpoints },
func(opts *fakeQueuePingerOpts) { opts.tickDur = 1 * time.Millisecond },
func(opts *fakeQueuePingerOpts) { opts.port = fakeSrvURL.Port() },
)
r.NoError(err)
defer ticker.Stop()
// start the pinger watch loop
go func() {

pinger.start(ctx, ticker)
}()

// sleep for more than enough time for the pinger to do its
// first tick
time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -272,13 +284,14 @@ func TestGetMetricsInterceptorReturnsAggregate(t *testing.T) {
// create a fake queue pinger. this is the simulated
// scaler that pings the above fake interceptor
const tickDur = 5 * time.Millisecond
ticker, pinger := newFakeQueuePinger(
ticker, pinger, err := newFakeQueuePinger(
ctx,
lggr,
func(opts *fakeQueuePingerOpts) { opts.endpoints = endpoints },
func(opts *fakeQueuePingerOpts) { opts.tickDur = tickDur },
func(opts *fakeQueuePingerOpts) { opts.port = fakeSrvURL.Port() },
)
r.NoError(err)
defer ticker.Stop()

// sleep for more than enough time for the pinger to do its
Expand Down
18 changes: 15 additions & 3 deletions scaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,31 @@ func main() {
lggr.Error(err, "getting a Kubernetes client")
os.Exit(1)
}
pinger := newQueuePinger(
pinger, err := newQueuePinger(
context.Background(),
lggr,
k8s.EndpointsFuncForK8sClientset(k8sCl),
namespace,
svcName,
targetPortStr,
time.NewTicker(500*time.Millisecond),
)
if err != nil {
lggr.Error(err, "creating a queue pinger")
os.Exit(1)
}

table := routing.NewTable()

grp, ctx := errgroup.WithContext(ctx)

grp.Go(func() error {
defer done()
return pinger.start(
ctx,
time.NewTicker(cfg.QueueTickDuration),
)
})

grp.Go(func() error {
defer done()
return startGrpcServer(
Expand Down Expand Up @@ -165,7 +177,7 @@ func startHealthcheckServer(
mux.HandleFunc("/queue_ping", func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
lggr := lggr.WithName("route.counts_ping")
if err := pinger.requestCounts(ctx); err != nil {
if err := pinger.fetchAndSaveCounts(ctx); err != nil {
lggr.Error(err, "requesting counts failed")
w.WriteHeader(500)
w.Write([]byte("error requesting counts from interceptors"))
Expand Down
3 changes: 2 additions & 1 deletion scaler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func TestHealthChecks(t *testing.T) {

errgrp, ctx := errgroup.WithContext(ctx)

ticker, pinger := newFakeQueuePinger(ctx, lggr)
ticker, pinger, err := newFakeQueuePinger(ctx, lggr)
r.NoError(err)
defer ticker.Stop()
srvFunc := func() error {
return startHealthcheckServer(ctx, lggr, port, pinger)
Expand Down
Loading