Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Current level gauge for FlyteWorkflows #132

Merged
merged 12 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ storage:
access-key: minio
auth-type: accesskey
disable-ssl: true
endpoint: http://localhost:9000
endpoint: http://localhost:30084
region: us-east-1
secret-key: miniostorage
type: minio
Expand Down
103 changes: 103 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package controller

import (
"context"
"fmt"
"github.com/lyft/flytestdlib/contextutils"
"k8s.io/apimachinery/pkg/labels"
"runtime/pprof"
"time"

stdErrs "github.com/lyft/flytestdlib/errors"

Expand Down Expand Up @@ -33,6 +38,7 @@ import (
clientset "github.com/lyft/flytepropeller/pkg/client/clientset/versioned"
flyteScheme "github.com/lyft/flytepropeller/pkg/client/clientset/versioned/scheme"
informers "github.com/lyft/flytepropeller/pkg/client/informers/externalversions"
lister "github.com/lyft/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/nodes"
"github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/lyft/flytepropeller/pkg/controller/workflow"
Expand All @@ -57,6 +63,7 @@ type Controller struct {
recorder record.EventRecorder
metrics *metrics
leaderElector *leaderelection.LeaderElector
levelMonitor *ResourceLevelMonitor
}

// Runs either as a leader -if configured- or as a standalone process.
Expand Down Expand Up @@ -86,6 +93,9 @@ func (c *Controller) run(ctx context.Context) error {
return err
}

// Start the collector process
c.levelMonitor.RunCollector(ctx)

// Start the informer factories to begin populating the informer caches
logger.Info(ctx, "Starting FlyteWorkflow controller")
return c.workerPool.Run(ctx, c.numWorkers, c.flyteworkflowSynced)
Expand Down Expand Up @@ -169,6 +179,97 @@ func (c *Controller) getWorkflowUpdatesHandler() cache.ResourceEventHandler {
}
}

// This object is responsible for emitting metrics that show the current number of Flyte workflows, cut by project and domain.
// It contains
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
// It needs to be kicked off. The periodicity is not currently configurable because it seems unnecessary.
type ResourceLevelMonitor struct {
Scope promutils.Scope

// Meta timer - this times each collection cycle
CollectorTimer promutils.StopWatch

// This is a labeled gauge
levels *prometheus.GaugeVec

// The thing that we want to current levels of
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
lister lister.FlyteWorkflowLister
}

func (r *ResourceLevelMonitor) countList(ctx context.Context, workflows []*v1alpha1.FlyteWorkflow) {
// Map of Projects to Domains to counts
counts := map[string]map[string]int{}

// Collect all workflow metrics
for i, wf := range workflows {
execID := wf.GetExecutionID()
if execID.WorkflowExecutionIdentifier == nil {
logger.Warningf(ctx, "Workflow does not have an execution identifier! [%v]", wf)
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
continue
}
fmt.Printf("in the loop %d: %s %s %s %s\n", i, wf.ExecutionID.Project, wf.ExecutionID.Domain, wf.GetID(), wf.ExecutionID.Name)
project := wf.ExecutionID.Project
domain := wf.ExecutionID.Domain
if _, ok := counts[project]; !ok {
counts[project] = map[string]int{}
}
counts[project][domain] += 1
}
}

func (r *ResourceLevelMonitor) collect(ctx context.Context) {
// Emit gauges at both the project/domain level, but also at the aggregate level
workflows, err := r.lister.List(labels.Everything())
if err != nil {
logger.Errorf(ctx, "Error listing workflows when attempting to collect data for gauges %s", err)
}

counts := r.countList(ctx, workflows)

// Emit labeled metrics, for each project/domain combination. This can be aggregated later with Prometheus queries.
metricKeys := []contextutils.Key{contextutils.ProjectKey, contextutils.DomainKey}
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
for project, val := range counts {
for domain, num := range val {
tempContext := contextutils.WithProjectDomain(ctx, project, domain)
gauge, err := r.levels.GetMetricWith(contextutils.Values(tempContext, metricKeys...))
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
panic(err)
}
gauge.Set(float64(num))
}
}
}

func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
collectorCtx := contextutils.WithGoroutineLabel(ctx, "resource-level-monitor")
pprof.SetGoroutineLabels(collectorCtx)
EngHabu marked this conversation as resolved.
Show resolved Hide resolved

go func() {
for {
select {
case <-collectorCtx.Done():
return
case <-ticker.C:
t := r.CollectorTimer.Start()
r.collect(collectorCtx)
t.Stop()
}
}
}()
}

func NewResourceLevelMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister) *ResourceLevelMonitor {
gauge := scope.MustNewGaugeVec("flyteworkflow", "Current FlyteWorkflow levels", contextutils.ProjectKey.String(),
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
contextutils.DomainKey.String())

return &ResourceLevelMonitor{
Scope: scope,
CollectorTimer: promutils.StopWatch{},
levels: gauge,
lister: lister,
}
}

func newControllerMetrics(scope promutils.Scope) *metrics {
c := scope.MustNewCounterVec("wf_enqueue", "workflow enqueue count.", "type")
return &metrics{
Expand Down Expand Up @@ -298,6 +399,8 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store")
}

controller.levelMonitor = NewResourceLevelMonitor(scope.NewSubScope("collector"), flyteworkflowInformer.Lister())

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
launchPlanActor, launchPlanActor, cfg.MaxDatasetSizeBytes,
storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, scope)
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package k8s
import (
"context"
"fmt"
"k8s.io/client-go/kubernetes/scheme"
"strings"
"time"

Expand Down Expand Up @@ -406,6 +407,8 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
return nil, err
}

iCtx.KubeClient().GetCache().GetInformerForKind()
scheme.Scheme.AllKnownTypes()
metricsScope := iCtx.MetricsScope().NewSubScope(entry.ID)
updateCount := labeled.NewCounter("informer_update", "Update events from informer", metricsScope)
droppedUpdateCount := labeled.NewCounter("informer_update_dropped", "Update events from informer that have the same resource version", metricsScope)
Expand Down