Skip to content

Commit

Permalink
rate limit number of new executions per evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni committed Jan 27, 2025
1 parent 52084a9 commit cb2d101
Show file tree
Hide file tree
Showing 13 changed files with 925 additions and 44 deletions.
7 changes: 4 additions & 3 deletions pkg/models/evaluation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ const (
EvalTriggerJobQueue = "job-queue"
EvalTriggerJobTimeout = "job-timeout"

EvalTriggerExecFailure = "exec-failure"
EvalTriggerExecUpdate = "exec-update"
EvalTriggerExecTimeout = "exec-timeout"
EvalTriggerExecFailure = "exec-failure"
EvalTriggerExecUpdate = "exec-update"
EvalTriggerExecTimeout = "exec-timeout"
EvalTriggerExecutionLimit = "exec-limit"
)

// Evaluation is just to ask the scheduler to reassess if additional job instances must be
Expand Down
7 changes: 7 additions & 0 deletions pkg/models/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,10 @@ func (p *Plan) hasRunningExecutions() bool {
}
return false
}

// HasPendingWork returns true if the plan has no pending work (executions or evaluations)
// We don't check execution updates or job state updates as they don't reflect more work,
// and can be just updates to mark existing executions as failed for example.
func (p *Plan) HasPendingWork() bool {
return len(p.NewExecutions) == 0 && len(p.NewEvaluations) == 0
}
42 changes: 42 additions & 0 deletions pkg/models/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,48 @@ func (s *PlanTestSuite) TestMarkJobCompleted() {
s.Equal("job completed", s.plan.JobEvents[0].Message)
}

func (s *PlanTestSuite) TestHasPendingWork() {
// Test completely empty plan
emptyPlan := models.NewPlan(s.eval, s.job)
s.True(emptyPlan.HasPendingWork(), "New plan should be empty")

// Test plan with new executions
planWithNewExec := models.NewPlan(s.eval, s.job)
planWithNewExec.AppendExecution(mock.ExecutionForJob(s.job), models.Event{})
s.False(planWithNewExec.HasPendingWork(), "Plan with new executions should not be empty")

// Test plan with updated executions
planWithUpdatedExec := models.NewPlan(s.eval, s.job)
planWithUpdatedExec.AppendStoppedExecution(
mock.ExecutionForJob(s.job),
models.Event{},
models.ExecutionStateCancelled,
)
s.False(planWithUpdatedExec.HasPendingWork(), "Plan with updated executions should not be empty")

// Test plan with new evaluations
planWithNewEval := models.NewPlan(s.eval, s.job)
planWithNewEval.AppendEvaluation(mock.Eval())
s.False(planWithNewEval.HasPendingWork(), "Plan with new evaluations should not be empty")

// Verify that events don't affect emptiness
planWithEvents := models.NewPlan(s.eval, s.job)
planWithEvents.AppendJobEvent(models.Event{Message: "test event"})
planWithEvents.AppendExecutionEvent("exec-1", models.Event{Message: "test event"})
s.True(planWithEvents.HasPendingWork(), "Plan with only events should be empty")

// Verify that desired state doesn't affect emptiness
planWithState := models.NewPlan(s.eval, s.job)
planWithState.MarkJobCompleted(models.Event{})
s.True(planWithState.HasPendingWork(), "Plan with only state changes should be empty")

// Test plan with mixed content
mixedPlan := models.NewPlan(s.eval, s.job)
mixedPlan.AppendExecution(mock.ExecutionForJob(s.job), models.Event{})
mixedPlan.AppendJobEvent(models.Event{})
s.False(mixedPlan.HasPendingWork(), "Plan with executions should not be empty regardless of other content")
}

func TestRunPlanTestSuite(t *testing.T) {
suite.Run(t, new(PlanTestSuite))
}
16 changes: 16 additions & 0 deletions pkg/node/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package node

import (
"time"

"github.com/bacalhau-project/bacalhau/pkg/bidstrategy"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/orchestrator"
Expand All @@ -22,6 +24,11 @@ type SystemConfig struct {
// NodeRankRandomnessRange overrides the node's rank randomness range for testing purposes
NodeRankRandomnessRange int

// MaxExecutionsPerEval limits the number of new executions that can be created in a single evaluation
MaxExecutionsPerEval int
// ExecutionLimitBackoff is the duration to wait before creating a new evaluation when hitting execution limits
ExecutionLimitBackoff time.Duration

///////////////////////////////
// Compute Specific Config
///////////////////////////////
Expand All @@ -40,6 +47,8 @@ func DefaultSystemConfig() SystemConfig {
return SystemConfig{
OverSubscriptionFactor: 1.5,
NodeRankRandomnessRange: 5,
MaxExecutionsPerEval: 20,
ExecutionLimitBackoff: 100 * time.Millisecond,
DefaultComputeJobResourceLimits: models.Resources{
CPU: 0.1, // 100m
Memory: 100 * 1024 * 1024, // 100Mi
Expand All @@ -56,6 +65,13 @@ func (c *SystemConfig) applyDefaults() {
if c.NodeRankRandomnessRange == 0 {
c.NodeRankRandomnessRange = defaults.NodeRankRandomnessRange
}
if c.MaxExecutionsPerEval == 0 {
c.MaxExecutionsPerEval = defaults.MaxExecutionsPerEval
}
if c.ExecutionLimitBackoff == 0 {
c.ExecutionLimitBackoff = defaults.ExecutionLimitBackoff
}

if c.DefaultComputeJobResourceLimits.IsZero() {
c.DefaultComputeJobResourceLimits = defaults.DefaultComputeJobResourceLimits
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/node/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,19 @@ func NewRequesterNode(
},
)

executionRateLimiter := scheduler.NewBatchRateLimiter(scheduler.BatchRateLimiterParams{
MaxExecutionsPerEval: cfg.SystemConfig.MaxExecutionsPerEval,
ExecutionLimitBackoff: cfg.SystemConfig.ExecutionLimitBackoff,
})

// scheduler provider
batchServiceJobScheduler := scheduler.NewBatchServiceJobScheduler(scheduler.BatchServiceJobSchedulerParams{
JobStore: jobStore,
Planner: planners,
NodeSelector: nodeSelector,
RetryStrategy: retryStrategy,
QueueBackoff: cfg.BacalhauConfig.Orchestrator.Scheduler.QueueBackoff.AsTimeDuration(),
RateLimiter: executionRateLimiter,
})
schedulerProvider := orchestrator.NewMappedSchedulerProvider(map[string]orchestrator.Scheduler{
models.JobTypeBatch: batchServiceJobScheduler,
Expand All @@ -154,11 +160,13 @@ func NewRequesterNode(
JobStore: jobStore,
Planner: planners,
NodeSelector: nodeSelector,
RateLimiter: executionRateLimiter,
}),
models.JobTypeDaemon: scheduler.NewDaemonJobScheduler(scheduler.DaemonJobSchedulerParams{
JobStore: jobStore,
Planner: planners,
NodeSelector: nodeSelector,
RateLimiter: executionRateLimiter,
}),
})

Expand Down
37 changes: 26 additions & 11 deletions pkg/orchestrator/scheduler/batch_service_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type BatchServiceJobSchedulerParams struct {
NodeSelector orchestrator.NodeSelector
RetryStrategy orchestrator.RetryStrategy
QueueBackoff time.Duration
// RateLimiter controls the rate at which new executions are created
// If not provided, a NoopRateLimiter is used
RateLimiter ExecutionRateLimiter
// Clock is the clock used for time-based operations.
// If not provided, the system clock is used.
Clock clock.Clock
Expand Down Expand Up @@ -57,19 +60,24 @@ type BatchServiceJobScheduler struct {
selector orchestrator.NodeSelector
retryStrategy orchestrator.RetryStrategy
queueBackoff time.Duration
rateLimiter ExecutionRateLimiter
clock clock.Clock
}

func NewBatchServiceJobScheduler(params BatchServiceJobSchedulerParams) *BatchServiceJobScheduler {
if params.Clock == nil {
params.Clock = clock.New()
}
if params.RateLimiter == nil {
params.RateLimiter = NewNoopRateLimiter()
}
return &BatchServiceJobScheduler{
jobStore: params.JobStore,
planner: params.Planner,
selector: params.NodeSelector,
retryStrategy: params.RetryStrategy,
queueBackoff: params.QueueBackoff,
rateLimiter: params.RateLimiter,
clock: params.Clock,
}
}
Expand Down Expand Up @@ -295,17 +303,10 @@ func (b *BatchServiceJobScheduler) createMissingExecs(
return nil
}

// create a delayed evaluation to retry scheduling the job if we don't have enough nodes,
// and we haven't passed the queue timeout
waitUntil := b.clock.Now().Add(b.queueBackoff)
// create a delayed evaluation to retry scheduling the job
comment := orchestrator.NewErrNotEnoughNodes(len(remainingPartitions), append(matching, rejected...)).Error()
delayedEvaluation := plan.Eval.NewDelayedEvaluation(waitUntil).
WithTriggeredBy(models.EvalTriggerJobQueue).
WithComment(comment)
plan.AppendEvaluation(delayedEvaluation)
b.createDelayedEvaluation(ctx, plan, comment)
metrics.AddAttributes(AttrOutcomeKey.String(AttrOutcomeQueueing))
log.Ctx(ctx).Debug().Msgf("Creating delayed evaluation %s to retry scheduling job %s in %s due to: %s",
delayedEvaluation.ID, plan.Job.ID, waitUntil.Sub(b.clock.Now()), comment)

// if not a single node was matched, then the job if fully queued and we should reflect that
// in the job state and events
Expand All @@ -318,9 +319,13 @@ func (b *BatchServiceJobScheduler) createMissingExecs(
}
}

// create new executions
// Apply rate limiting
execsToCreate := min(len(matching), len(remainingPartitions))
execsToCreate = b.rateLimiter.Apply(ctx, plan, execsToCreate)

// Create executions
var count float64
for i := 0; i < min(len(matching), len(remainingPartitions)); i++ {
for i := 0; i < execsToCreate; i++ {
execution := &models.Execution{
NodeID: matching[i].NodeInfo.ID(),
JobID: plan.Job.ID,
Expand All @@ -341,6 +346,16 @@ func (b *BatchServiceJobScheduler) createMissingExecs(
return nil
}

// createDelayedEvaluation creates a delayed evaluation with the queue backoff
func (b *BatchServiceJobScheduler) createDelayedEvaluation(ctx context.Context, plan *models.Plan, comment string) {
waitUntil := b.clock.Now().Add(b.queueBackoff)
delayedEvaluation := plan.Eval.NewDelayedEvaluation(waitUntil).
WithTriggeredBy(models.EvalTriggerJobQueue).
WithComment(comment)
plan.AppendEvaluation(delayedEvaluation)
log.Ctx(ctx).Debug().Msg(comment)
}

// isJobComplete determines if all partitions have completed successfully.
// For a job with Count = N, this means:
// - We have exactly N completed partitions (one per index 0 to N-1)
Expand Down
138 changes: 138 additions & 0 deletions pkg/orchestrator/scheduler/batch_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,141 @@ func (s *BatchServiceJobSchedulerTestSuite) TestProcess_ShouldPreservePartitionO
s.planner.EXPECT().Process(gomock.Any(), matcher).Times(1)
s.Require().NoError(s.scheduler.Process(context.Background(), scenario.evaluation))
}

func (s *BatchServiceJobSchedulerTestSuite) TestProcess_RateLimit_ShouldLimitInitialExecutions() {
// Configure rate limiter in scheduler
s.scheduler.rateLimiter = NewBatchRateLimiter(BatchRateLimiterParams{
MaxExecutionsPerEval: 2,
ExecutionLimitBackoff: 5 * time.Second,
Clock: s.clock,
})

scenario := NewScenario(
WithJobType(s.jobType),
WithCount(4), // Need 4 executions total
)
s.mockJobStore(scenario)

// Mock that we have 4 available nodes
s.mockMatchingNodes(scenario, "node0", "node1", "node2", "node3")

// Should only create 2 executions and schedule a delayed evaluation for the rest
matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: scenario.evaluation,
NewExecutions: []*models.Execution{
{NodeID: "node0", PartitionIndex: 0},
{NodeID: "node1", PartitionIndex: 1},
},
ExpectedNewEvaluations: []ExpectedEvaluation{
{
TriggeredBy: models.EvalTriggerExecutionLimit,
WaitUntil: s.clock.Now().Add(5 * time.Second),
},
},
})
s.planner.EXPECT().Process(gomock.Any(), matcher).Times(1)
s.Require().NoError(s.scheduler.Process(context.Background(), scenario.evaluation))
}

func (s *BatchServiceJobSchedulerTestSuite) TestProcess_RateLimit_ShouldHandleFollowupEvaluations() {
// Configure rate limiter in scheduler
s.scheduler.rateLimiter = NewBatchRateLimiter(BatchRateLimiterParams{
MaxExecutionsPerEval: 2,
ExecutionLimitBackoff: 5 * time.Second,
Clock: s.clock,
})

scenario := NewScenario(
WithJobType(s.jobType),
WithCount(4),
// Already have 2 executions from first evaluation
WithPartitionedExecution("node0", models.ExecutionStateBidAccepted, 0),
WithPartitionedExecution("node1", models.ExecutionStateBidAccepted, 1),
)
s.mockJobStore(scenario)
s.mockAllNodes("node0", "node1", "node2", "node3")
s.mockMatchingNodes(scenario, "node2", "node3")

// Should create remaining 2 executions for partitions 2 and 3
matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: scenario.evaluation,
NewExecutions: []*models.Execution{
{NodeID: "node2", PartitionIndex: 2},
{NodeID: "node3", PartitionIndex: 3},
},
})
s.planner.EXPECT().Process(gomock.Any(), matcher).Times(1)
s.Require().NoError(s.scheduler.Process(context.Background(), scenario.evaluation))
}

func (s *BatchServiceJobSchedulerTestSuite) TestProcess_RateLimit_WithUnhealthyNodes() {
s.scheduler.rateLimiter = NewBatchRateLimiter(BatchRateLimiterParams{
MaxExecutionsPerEval: 2,
ExecutionLimitBackoff: 5 * time.Second,
Clock: s.clock,
})

scenario := NewScenario(
WithJobType(s.jobType),
WithCount(4),
WithPartitionedExecution("node0", models.ExecutionStateAskForBid, 0),
WithPartitionedExecution("node1", models.ExecutionStateBidAccepted, 1),
)
s.mockJobStore(scenario)

// node0 becomes unhealthy, and we have 3 new nodes available
s.mockAllNodes("node1", "node2", "node3", "node4")
s.mockMatchingNodes(scenario, "node2", "node3", "node4")

// Should:
// 1. Mark node0's execution as failed
// 2. Create 2 new executions (due to rate limit)
// 3. Schedule delayed evaluation for the remaining execution
matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: scenario.evaluation,
NewExecutions: []*models.Execution{
{NodeID: "node2", PartitionIndex: 0}, // Retrying failed partition
{NodeID: "node3", PartitionIndex: 2}, // New partition
},
UpdatedExecutions: []ExecutionStateUpdate{
{
ExecutionID: scenario.executions[0].ID,
DesiredState: models.ExecutionDesiredStateStopped,
ComputeState: models.ExecutionStateFailed,
},
},
ExpectedNewEvaluations: []ExpectedEvaluation{
{
TriggeredBy: models.EvalTriggerExecutionLimit,
WaitUntil: s.clock.Now().Add(5 * time.Second),
},
},
})
s.planner.EXPECT().Process(gomock.Any(), matcher).Times(1)
s.Require().NoError(s.scheduler.Process(context.Background(), scenario.evaluation))
}

func (s *BatchServiceJobSchedulerTestSuite) TestProcess_NoRateLimit_ShouldCreateAllExecutions() {
// Use NoopRateLimiter (default)
s.scheduler.rateLimiter = NewNoopRateLimiter()

scenario := NewScenario(
WithJobType(s.jobType),
WithCount(4),
)
s.mockJobStore(scenario)
s.mockMatchingNodes(scenario, "node0", "node1", "node2", "node3")

// Should create all 4 executions at once
matcher := NewPlanMatcher(s.T(), PlanMatcherParams{
Evaluation: scenario.evaluation,
NewExecutions: []*models.Execution{
{NodeID: "node0", PartitionIndex: 0},
{NodeID: "node1", PartitionIndex: 1},
{NodeID: "node2", PartitionIndex: 2},
{NodeID: "node3", PartitionIndex: 3},
},
})
s.planner.EXPECT().Process(gomock.Any(), matcher).Times(1)
s.Require().NoError(s.scheduler.Process(context.Background(), scenario.evaluation))
}
Loading

0 comments on commit cb2d101

Please sign in to comment.