Skip to content

Commit

Permalink
Merge branch 'habana_main' into enable-prefix-caching
Browse files Browse the repository at this point in the history
  • Loading branch information
huijjj committed Oct 17, 2024
2 parents 642af38 + 05bcdf5 commit 4d6417f
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 95 deletions.
1 change: 1 addition & 0 deletions vllm/attention/backends/hpu_attn.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def forward(
block_list=attn_metadata.block_list,
block_mapping=attn_metadata.block_mapping,
block_bias=attn_metadata.attn_bias,
block_scales=attn_metadata.block_scales,
scale=self.scale,
matmul_qk_op=self.matmul_qk,
matmul_av_op=self.matmul_av,
Expand Down
1 change: 1 addition & 0 deletions vllm/attention/ops/hpu_paged_attn.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class HPUPagedAttentionMetadata:
block_usage: Optional[torch.Tensor]
block_indices: Optional[torch.Tensor]
block_offsets: Optional[torch.Tensor]
block_scales: Optional[torch.Tensor]


class HPUPagedAttention:
Expand Down
18 changes: 17 additions & 1 deletion vllm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,9 @@ class SchedulerConfig:
a single iteration.
max_num_seqs: Maximum number of sequences to be processed in a single
iteration.
max_num_prefill_seqs: Maximum number of prefill sequences to be
processed in a single iteration. Used only with padding-aware
scheduling.
max_model_len: Maximum length of a sequence (including prompt
and generated text).
use_v2_block_manager: Whether to use the BlockSpaceManagerV2 or not.
Expand All @@ -963,11 +966,14 @@ class SchedulerConfig:
when SPMD worker architecture is enabled. I.e.,
VLLM_USE_RAY_SPMD_WORKER=1
policy: The scheduling policy to use. "fcfs" (default) or "priority".
use_padding_aware_scheduling: If True, scheduler will consider padded
tokens in prefill.
"""

def __init__(self,
max_num_batched_tokens: Optional[int],
max_num_seqs: int,
max_num_prefill_seqs: Optional[int],
max_model_len: int,
use_v2_block_manager: bool = True,
num_lookahead_slots: int = 0,
Expand All @@ -979,7 +985,8 @@ def __init__(self,
num_scheduler_steps: int = 1,
multi_step_stream_outputs: bool = False,
send_delta_data: bool = False,
policy: str = "fcfs") -> None:
policy: str = "fcfs",
use_padding_aware_scheduling=False) -> None:
if max_num_batched_tokens is None:
if enable_chunked_prefill:
if num_scheduler_steps > 1:
Expand Down Expand Up @@ -1018,6 +1025,7 @@ def __init__(self,
self.max_num_batched_tokens)

self.max_num_seqs = max_num_seqs
self.max_num_prefill_seqs = max_num_prefill_seqs
self.max_model_len = max_model_len
self.use_v2_block_manager = use_v2_block_manager
self.num_lookahead_slots = num_lookahead_slots
Expand All @@ -1029,6 +1037,7 @@ def __init__(self,
self.multi_step_stream_outputs = multi_step_stream_outputs
self.send_delta_data = send_delta_data
self.policy = policy
self.use_padding_aware_scheduling = use_padding_aware_scheduling
self._verify_args()

def _verify_args(self) -> None:
Expand Down Expand Up @@ -1059,6 +1068,13 @@ def _verify_args(self) -> None:
"num_scheduler_steps "
f"({self.num_scheduler_steps}) must be greater than or "
"equal to 1.")
if self.max_num_prefill_seqs is not None \
and not self.use_padding_aware_scheduling:
raise ValueError("max_num_prefill_seqs can be only "
"used with padding-aware-scheduling. ")
if self.use_padding_aware_scheduling and self.chunked_prefill_enabled:
raise ValueError("Padding-aware scheduling currently "
"does not work with chunked prefill ")

if (not self.use_v2_block_manager \
and not envs.VLLM_ALLOW_DEPRECATED_BLOCK_MANAGER_V1):
Expand Down
122 changes: 116 additions & 6 deletions vllm/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from vllm.core.interfaces import AllocStatus, BlockSpaceManager
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
from vllm.platforms import current_platform
from vllm.prompt_adapter.request import PromptAdapterRequest
from vllm.sequence import (Sequence, SequenceData, SequenceGroup,
SequenceGroupMetadata, SequenceGroupMetadataDelta,
Expand Down Expand Up @@ -101,6 +102,94 @@ def num_curr_seqs(self):
return self._num_curr_seqs


@dataclass
class PaddingAwareSchedulingBudget(SchedulingBudget):
max_num_prefill_seqs: Optional[int] = None
_prefill_request_ids_max_seq_lens: Dict[str,
int] = field(default_factory=dict)
_max_seq_len: int = 0
_num_curr_prefill_seqs: int = 0

def _generic_padding_fn(self, batch_size, max_seq_len) -> int:
return batch_size * max_seq_len

def _hpu_padding_fn(self, batch_size, max_seq_len):
from vllm.worker.hpu_model_runner import (HPUBucketingGlobalState,
find_bucket)
padded_bs = batch_size
padded_seq = max_seq_len

hpu_bucketing_global_state = HPUBucketingGlobalState()

bs_cfg = hpu_bucketing_global_state.prompt_bs_bucket_cfg
if bs_cfg is not None:
padded_bs = find_bucket(batch_size, bs_cfg)
else:
logger.warning(
"prompt_bs_bucket_cfg was not set! Using unpadded batch size.")
seq_cfg = hpu_bucketing_global_state.prompt_seq_bucket_cfg
if seq_cfg is not None:
padded_seq = find_bucket(max_seq_len, seq_cfg)
else:
logger.warning("prompt_seq_bucket_cfg was not set! "
"Using unpadded sequence length.")
return padded_bs * padded_seq

def _padding_fn_selector(self):
if current_platform.is_hpu():
return self._hpu_padding_fn
return self._generic_padding_fn

def _maybe_update_max_seq_len(self,
new_seq_max_seq_len: Optional[int] = None):
if new_seq_max_seq_len is not None \
and new_seq_max_seq_len > self._max_seq_len:
self._max_seq_len = new_seq_max_seq_len
return
self._max_seq_len = max(
self._prefill_request_ids_max_seq_lens.values())

def add_prefill_seqs(self, req_id, num_curr_prefill_seqs, max_seq_len):
self._prefill_request_ids_max_seq_lens[req_id] = max_seq_len
self._num_curr_prefill_seqs += num_curr_prefill_seqs
self._maybe_update_max_seq_len(max_seq_len)

def subtract_prefill_seqs(self, req_id, num_curr_prefill_seqs):
if req_id in self._prefill_request_ids_max_seq_lens:
popped_seq_len = self._prefill_request_ids_max_seq_lens.pop(req_id)
self._num_curr_prefill_seqs -= num_curr_prefill_seqs
if popped_seq_len == self._max_seq_len:
self._maybe_update_max_seq_len()

def can_schedule(self,
*args,
num_new_tokens: int,
num_new_seqs: int,
is_prefill: bool = False,
max_seq_len: int = 0):
can_parent_schedule = super().can_schedule(
*args, num_new_tokens=num_new_tokens, num_new_seqs=num_new_seqs)
if not can_parent_schedule or not is_prefill:
return can_parent_schedule
new_batch_size = self._num_curr_prefill_seqs + num_new_seqs
new_max_seq_len = max(max(self._max_seq_len, max_seq_len), 1)
padding_fn = self._padding_fn_selector()
num_new_padded_tokens = padding_fn(new_batch_size, new_max_seq_len)
result = num_new_padded_tokens <= self.token_budget
if self.max_num_prefill_seqs is not None and result:
result = self._num_curr_prefill_seqs + num_new_seqs \
<= self.max_num_prefill_seqs
return result

@property
def max_seq_len(self):
return self._max_seq_len

@property
def num_curr_prefill_seqs(self):
return self._num_curr_prefill_seqs


@dataclass
class ScheduledSequenceGroup:
# A sequence group that's scheduled.
Expand Down Expand Up @@ -938,9 +1027,18 @@ def _schedule_prefills(
continue

num_new_seqs = seq_group.get_max_num_running_seqs()
max_prefill_seq_len = None
can_schedule_kwargs = {
'num_new_tokens': num_new_tokens,
'num_new_seqs': num_new_seqs
}
if self.scheduler_config.use_padding_aware_scheduling:
max_prefill_seq_len = max(
[seq.get_num_new_tokens() for seq in seq_group.get_seqs()])
can_schedule_kwargs['is_prefill'] = True
can_schedule_kwargs['max_seq_len'] = max_prefill_seq_len
if (num_new_tokens == 0
or not budget.can_schedule(num_new_tokens=num_new_tokens,
num_new_seqs=num_new_seqs)):
or not budget.can_schedule(**can_schedule_kwargs)):
break

# Can schedule this request.
Expand Down Expand Up @@ -971,6 +1069,10 @@ def _schedule_prefills(
token_chunk_size=num_new_tokens))
budget.add_num_batched_tokens(seq_group.request_id, num_new_tokens)
budget.add_num_seqs(seq_group.request_id, num_new_seqs)
if self.scheduler_config.use_padding_aware_scheduling:
assert isinstance(budget, PaddingAwareSchedulingBudget)
budget.add_prefill_seqs(seq_group.request_id, num_new_seqs,
max_prefill_seq_len)

# Queue requests that couldn't be scheduled.
waiting_queue.extendleft(leftover_waiting_sequences)
Expand All @@ -992,10 +1094,18 @@ def _schedule_default(self) -> SchedulerOutputs:
be swapped or preempted.
"""
# Include running requests to the budget.
budget = SchedulingBudget(
token_budget=self.scheduler_config.max_num_batched_tokens,
max_num_seqs=self.scheduler_config.max_num_seqs,
)
budget: SchedulingBudget
if self.scheduler_config.use_padding_aware_scheduling:
budget = PaddingAwareSchedulingBudget(
token_budget=self.scheduler_config.max_num_batched_tokens,
max_num_seqs=self.scheduler_config.max_num_seqs,
max_num_prefill_seqs=self.scheduler_config.max_num_prefill_seqs
)
else:
budget = SchedulingBudget(
token_budget=self.scheduler_config.max_num_batched_tokens,
max_num_seqs=self.scheduler_config.max_num_seqs,
)
# Make sure we include num running seqs before scheduling prefill,
# so that we don't schedule beyond max_num_seqs for prefill.
for seq_group in self.running:
Expand Down
19 changes: 18 additions & 1 deletion vllm/engine/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ class EngineArgs:
enable_prefix_caching: bool = False
disable_sliding_window: bool = False
use_v2_block_manager: bool = True
use_padding_aware_scheduling: bool = False
swap_space: float = 4 # GiB
cpu_offload_gb: float = 0 # GiB
gpu_memory_utilization: float = 0.90
max_num_batched_tokens: Optional[int] = None
max_num_seqs: int = 256
max_num_prefill_seqs: Optional[int] = None
max_logprobs: int = 20 # Default value for OpenAI Chat Completions API
disable_log_stats: bool = False
revision: Optional[str] = None
Expand Down Expand Up @@ -391,6 +393,13 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
action='store_true',
help='Use BlockSpaceMangerV2. By default this is set to True. '
'Set to False to use BlockSpaceManagerV1')
parser.add_argument(
'--use-padding-aware-scheduling',
default=EngineArgs.use_padding_aware_scheduling,
action='store_true',
help=('Use padding-aware scheduling. If True, the scheduler '
'will consider padded tokens in prefill. '
'By default this is set to False. '))
parser.add_argument(
'--num-lookahead-slots',
type=int,
Expand Down Expand Up @@ -445,6 +454,13 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
type=int,
default=EngineArgs.max_num_seqs,
help='Maximum number of sequences per iteration.')
parser.add_argument(
'--max-num-prefill-seqs',
type=int,
default=EngineArgs.max_num_prefill_seqs,
help=('Maximum number of prefill sequences per '
'iteration. Can be used only with padding-aware '
'scheduling. Must be <= max_num_seqs.'))
parser.add_argument(
'--max-logprobs',
type=int,
Expand Down Expand Up @@ -1036,6 +1052,7 @@ def create_engine_config(self) -> EngineConfig:
scheduler_config = SchedulerConfig(
max_num_batched_tokens=self.max_num_batched_tokens,
max_num_seqs=self.max_num_seqs,
max_num_prefill_seqs=self.max_num_prefill_seqs,
max_model_len=model_config.max_model_len,
use_v2_block_manager=self.use_v2_block_manager,
num_lookahead_slots=num_lookahead_slots,
Expand All @@ -1049,7 +1066,7 @@ def create_engine_config(self) -> EngineConfig:
send_delta_data=(envs.VLLM_USE_RAY_SPMD_WORKER
and parallel_config.use_ray),
policy=self.scheduling_policy,
)
use_padding_aware_scheduling=self.use_padding_aware_scheduling)
lora_config = LoRAConfig(
max_lora_rank=self.max_lora_rank,
max_loras=self.max_loras,
Expand Down
30 changes: 9 additions & 21 deletions vllm/worker/cache_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from vllm.attention import get_attn_backend
from vllm.config import CacheConfig, DeviceConfig, ModelConfig, ParallelConfig
from vllm.logger import init_logger
from vllm.utils import (STR_DTYPE_TO_TORCH_DTYPE, get_dtype_size, is_fake_hpu,
from vllm.utils import (STR_DTYPE_TO_TORCH_DTYPE, get_dtype_size,
is_pin_memory_available)

logger = init_logger(__name__)
Expand Down Expand Up @@ -75,26 +75,14 @@ def _allocate_kv_cache(
pin_memory = is_pin_memory_available() if device == "cpu" else False
kv_cache: List[torch.Tensor] = []
for _ in range(self.num_attention_layers):
if device == 'hpu' or is_fake_hpu():
key_cache = torch.zeros(kv_cache_shape,
dtype=self.dtype,
device=device)
value_cache = torch.zeros(kv_cache_shape,
dtype=self.dtype,
device=device)
kv_layer = (key_cache, value_cache)
kv_cache.append(kv_layer)
else:
# null block in CpuGpuBlockAllocator requires at least that
# block to be zeroed-out.
# We zero-out everything for simplicity.
dtype = torch.uint8 if self.dtype == torch.float8_e4m3fn else \
self.dtype
kv_cache.append(
torch.zeros(kv_cache_shape,
dtype=dtype,
pin_memory=pin_memory,
device=device))
# null block in CpuGpuBlockAllocator requires at least that
# block to be zeroed-out.
# We zero-out everything for simplicity.
kv_cache.append(
torch.zeros(kv_cache_shape,
dtype=self.dtype,
pin_memory=pin_memory,
device=device))
return kv_cache

def swap_in(self, src_to_dst: torch.Tensor) -> None:
Expand Down
Loading

0 comments on commit 4d6417f

Please sign in to comment.