Skip to content

Commit

Permalink
Fix rounding error in htex block scale in (#3721)
Browse files Browse the repository at this point in the history
# Description

PR #2196 calculates a number of blocks to scale in, in the htex
strategy, rather than scaling in one block per strategy iteration.
However, it rounds the wrong way: it scales in a rounded up, rather than
rounded down, number of blocks.

Issue #3696 shows that then resulting in oscillating behaviour: With 14
tasks and 48 workers per block, on alternating strategy runs, the code
will either scale up to the rounded up number of needed blocks (14/48 =>
1), or scale down to the rounded down number of needed blocks (14/48 =>
0).

This PR changes the rounding introduced in #2196 to be consistent:
rounding up the number of blocks to scale up, and rounding down the
number of blocks to scale down.

# Changed Behaviour

HTEX scale down should oscillate less

# Fixes

Fixes #3696 

## Type of change

- Bug fix
  • Loading branch information
benclifford authored Jan 6, 2025
1 parent ab5e247 commit ed80dad
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 2 deletions.
4 changes: 2 additions & 2 deletions parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,8 @@ def _general_strategy(self, executors: List[BlockProviderExecutor], *, strategy_
# Scale in for htex
if isinstance(executor, HighThroughputExecutor):
if active_blocks > min_blocks:
excess_slots = math.ceil(active_slots - (active_tasks * parallelism))
excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block))
excess_slots = math.floor(active_slots - (active_tasks * parallelism))
excess_blocks = math.floor(float(excess_slots) / (tasks_per_node * nodes_per_block))
excess_blocks = min(excess_blocks, active_blocks - min_blocks)
logger.debug(f"Requesting scaling in by {excess_blocks} blocks with idle time {self.max_idletime}s")
executor.scale_in_facade(excess_blocks, max_idletime=self.max_idletime)
Expand Down
103 changes: 103 additions & 0 deletions parsl/tests/test_scaling/test_regression_3696_oscillation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import math
from unittest.mock import MagicMock

import pytest

from parsl.executors.high_throughput.executor import HighThroughputExecutor
from parsl.jobs.states import JobState, JobStatus
from parsl.jobs.strategy import Strategy


# the parameterize tuple consists of:
# Input:
# * number of tasks to mock the load as
# * number of workers per node
# Expected output:
# * the number of blocks we should expect to be launched
# in this situation
#
# This test will configure an executor, then run strategize
# a few times, asserting that it converges to the correct
# number of blocks without oscillating.
@pytest.mark.local
@pytest.mark.parametrize("ns", [(14, 48, 1), # values from issue #3696
(1, 1, 1), # one task needs one block
(100, 1, 20), # many one-task blocks, hitting hard-coded max blocks
(47, 48, 1), # some edge cases around #3696 values
(48, 48, 1), # "
(49, 48, 2), # "
(149, 50, 3)]) # "
def test_htex_strategy_does_not_oscillate(ns):
"""Check for oscillations in htex scaling.
In issue 3696, with a large number of workers per block
and a smaller number of active tasks, the htex scaling
strategy oscillates between 0 and 1 active block, rather
than converging to 1 active block.
"""

n_tasks, n_workers, n_blocks = ns

s = Strategy(strategy='htex_auto_scale', max_idletime=0)

provider = MagicMock()
executor = MagicMock(spec=HighThroughputExecutor)

statuses = {}

executor.provider = provider
executor.outstanding = n_tasks
executor.status_facade = statuses
executor.workers_per_node = n_workers

provider.parallelism = 1
provider.init_blocks = 0
provider.min_blocks = 0
provider.max_blocks = 20
provider.nodes_per_block = 1

def scale_out(n):
for _ in range(n):
statuses[len(statuses)] = JobStatus(state=JobState.PENDING)

executor.scale_out_facade.side_effect = scale_out

def scale_in(n, max_idletime=None):
# find n PENDING jobs and set them to CANCELLED
for k in statuses:
if n == 0:
return
if statuses[k].state == JobState.PENDING:
statuses[k].state = JobState.CANCELLED
n -= 1

executor.scale_in_facade.side_effect = scale_in

s.add_executors([executor])

# In issue #3696, this first strategise does initial and load based
# scale outs, because n_tasks > n_workers*0
s.strategize([executor])

executor.scale_out_facade.assert_called()
assert len(statuses) == n_blocks, "Should have launched n_blocks"
assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == n_blocks
# there might be several calls to scale_out_facade inside strategy,
# but the end effect should be that exactly one block is scaled out.

executor.scale_in_facade.assert_not_called()

# In issue #3696, this second strategize does a scale in, because n_tasks < n_workers*1
s.strategize([executor])

# assert that there should still be n_blocks pending blocks
assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == n_blocks
# this assert fails due to issue #3696

# Now check scale in happens with 0 load
executor.outstanding = 0
s.strategize([executor])
executor.scale_in_facade.assert_called()
assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == 0

0 comments on commit ed80dad

Please sign in to comment.