Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 44 additions & 17 deletions src/cloudai/workloads/common/llm_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,40 @@ def all_gpu_ids(tdef: LLMServingTestDefinition[LLMServingCmdArgsT], system_gpus_
return list(range(system_gpus_per_node or 1))


def calculate_prefill_gpu_ids(
tdef: LLMServingTestDefinition[LLMServingCmdArgsT],
num_nodes: int,
system_gpus_per_node: int | None,
) -> list[int]:
if not tdef.cmd_args.prefill:
return []
if tdef.cmd_args.prefill.gpu_ids:
return parse_gpu_ids(tdef.cmd_args.prefill.gpu_ids)

gpu_ids = all_gpu_ids(tdef, system_gpus_per_node)
if num_nodes == 2:
return gpu_ids
mid = len(gpu_ids) // 2
return gpu_ids[:mid]


def calculate_decode_gpu_ids(
tdef: LLMServingTestDefinition[LLMServingCmdArgsT],
num_nodes: int,
system_gpus_per_node: int | None,
) -> list[int]:
if tdef.cmd_args.decode.gpu_ids:
return parse_gpu_ids(tdef.cmd_args.decode.gpu_ids)

gpu_ids = all_gpu_ids(tdef, system_gpus_per_node)
if not tdef.cmd_args.prefill:
return gpu_ids
if num_nodes == 2:
return gpu_ids
mid = len(gpu_ids) // 2
return gpu_ids[mid:]


class LLMServingArgs(CmdArgs):
"""Shared serve-argument serialization for LLM serving workloads."""

Expand All @@ -64,15 +98,18 @@ def serve_args_exclude(self) -> set[str]:
"""Fields consumed internally and excluded from generic serve args."""
return {"gpu_ids"}

def serialize_serve_arg(self, key: str, value: Any) -> list[str]:
"""Serialize a single serve argument to CLI tokens."""
opt = f"--{key.replace('_', '-')}"
if value == "":
return [opt]
return [opt, str(value)]

@property
def serve_args(self) -> list[str]:
args: list[str] = []
for key, value in self.model_dump(exclude=self.serve_args_exclude, exclude_none=True).items():
opt = f"--{key.replace('_', '-')}"
if value == "":
args.append(opt)
else:
args.extend([opt, str(value)])
args.extend(self.serialize_serve_arg(key, value))
return args


Expand Down Expand Up @@ -299,21 +336,11 @@ def is_two_node_disaggregated(self) -> bool:

@property
def prefill_gpu_ids(self) -> list[int]:
if self.tdef.cmd_args.prefill and self.tdef.cmd_args.prefill.gpu_ids:
return parse_gpu_ids(self.tdef.cmd_args.prefill.gpu_ids)
if self.is_two_node_disaggregated:
return self.gpu_ids
mid = len(self.gpu_ids) // 2
return self.gpu_ids[:mid]
return calculate_prefill_gpu_ids(self.tdef, self.test_run.nnodes, self.system.gpus_per_node)

@property
def decode_gpu_ids(self) -> list[int]:
if self.tdef.cmd_args.decode.gpu_ids:
return parse_gpu_ids(self.tdef.cmd_args.decode.gpu_ids)
if self.is_two_node_disaggregated:
return self.gpu_ids
mid = len(self.gpu_ids) // 2
return self.gpu_ids[mid:]
return calculate_decode_gpu_ids(self.tdef, self.test_run.nnodes, self.system.gpus_per_node)

def _disagg_srun_prefix(self, relative: int | None = None) -> str:
srun_command_parts = self.gen_srun_prefix(with_num_nodes=(relative is None))
Expand Down
74 changes: 72 additions & 2 deletions src/cloudai/workloads/vllm/vllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@
from __future__ import annotations

import logging
from typing import Optional, cast

from pydantic import ConfigDict, Field

from cloudai.core import GitRepo, Installable, JobStatusResult, TestRun
from cloudai.core import GitRepo, Installable, JobStatusResult, System, TestRun
from cloudai.models.workload import CmdArgs
from cloudai.workloads.common.llm_serving import LLMServingArgs, LLMServingCmdArgs, LLMServingTestDefinition
from cloudai.workloads.common.llm_serving import (
LLMServingArgs,
LLMServingCmdArgs,
LLMServingTestDefinition,
all_gpu_ids,
calculate_decode_gpu_ids,
calculate_prefill_gpu_ids,
)

VLLM_SERVE_LOG_FILE = "vllm-serve.log"
VLLM_BENCH_LOG_FILE = "vllm-bench.log"
Expand All @@ -41,6 +49,12 @@ class VllmArgs(LLMServingArgs):
def serve_args_exclude(self) -> set[str]:
return super().serve_args_exclude | {"nixl_threads"}

def serialize_serve_arg(self, key: str, value: object) -> list[str]:
opt = f"--{key.replace('_', '-')}"
if isinstance(value, bool):
return [opt] if value else [f"--no-{key.replace('_', '-')}"]
return super().serialize_serve_arg(key, value)


class VllmCmdArgs(LLMServingCmdArgs[VllmArgs]):
"""vLLM serve command arguments."""
Expand Down Expand Up @@ -79,6 +93,62 @@ def extra_installables(self) -> list[Installable]:
installables.append(self.proxy_script_repo)
return installables

@staticmethod
def _validate_vllm_parallelism_constraints(role: str, args: VllmArgs, gpu_count: int) -> bool:
tp = cast(int, getattr(args, "tensor_parallel_size", 1))
pp = cast(int, getattr(args, "pipeline_parallel_size", 1))
dp = cast(int, getattr(args, "data_parallel_size", 1))
ep_enabled = cast(bool, getattr(args, "enable_expert_parallel", False))
all2all_backend = cast(str, getattr(args, "all2all_backend", ""))

constraint1 = (tp * pp * dp) <= gpu_count
if not constraint1:
logging.error(
"vLLM %s constraint failed: (tp * pp * dp) <= num_gpus. tp=%s pp=%s dp=%s num_gpus=%s",
role,
tp,
pp,
dp,
gpu_count,
)
return False

using_flashinfer_all2allv = all2all_backend == "flashinfer_all2allv"
constraint2 = not (using_flashinfer_all2allv and dp > 1 and ep_enabled)
if not constraint2:
logging.error(
"vLLM %s constraint failed: flashinfer_all2allv only works with DP=1, or with DP>1 and expert "
"parallel disabled. all2all_backend=%s dp=%s expert_parallel=%s",
role,
all2all_backend,
dp,
ep_enabled,
)
return False

return True

def constraint_check(self, tr: TestRun, system: Optional[System]) -> bool:
system_gpus_per_node = getattr(system, "gpus_per_node", None) if system is not None else None
num_nodes = tr.nnodes

if self.cmd_args.prefill is None:
return self._validate_vllm_parallelism_constraints(
role="decode",
args=self.cmd_args.decode,
gpu_count=len(all_gpu_ids(self, system_gpus_per_node)),
)

return self._validate_vllm_parallelism_constraints(
role="prefill",
args=self.cmd_args.prefill,
gpu_count=len(calculate_prefill_gpu_ids(self, num_nodes, system_gpus_per_node)),
) and self._validate_vllm_parallelism_constraints(
role="decode",
args=self.cmd_args.decode,
gpu_count=len(calculate_decode_gpu_ids(self, num_nodes, system_gpus_per_node)),
)

def was_run_successful(self, tr: TestRun) -> JobStatusResult:
log_path = tr.output_path / VLLM_BENCH_LOG_FILE
if not log_path.is_file():
Expand Down
18 changes: 18 additions & 0 deletions tests/workloads/vllm/test_command_gen_strategy_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,24 @@ def test_get_vllm_serve_commands_single_gpu(self, vllm_cmd_gen_strategy: VllmSlu
assert len(commands) == 1
assert commands[0] == ["vllm", "serve", cmd_args.model, "--port", str(cmd_args.port)]

def test_get_vllm_serve_commands_convert_boolean_flags(
self, vllm: VllmTestDefinition, vllm_tr: TestRun, slurm_system: SlurmSystem
) -> None:
vllm.cmd_args.decode = VllmArgs.model_validate({"enable_expert_parallel": True})
vllm_tr.test = vllm
vllm_cmd_gen_strategy = VllmSlurmCommandGenStrategy(slurm_system, vllm_tr)

commands = vllm_cmd_gen_strategy.get_serve_commands()

assert commands[0] == [
"vllm",
"serve",
vllm.cmd_args.model,
"--enable-expert-parallel",
"--port",
str(vllm.cmd_args.port),
]

def test_generate_wait_for_health_function(self, vllm_cmd_gen_strategy: VllmSlurmCommandGenStrategy) -> None:
cmd_args = vllm_cmd_gen_strategy.test_run.test.cmd_args

Expand Down
87 changes: 86 additions & 1 deletion tests/workloads/vllm/test_workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,31 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from cloudai.core import GitRepo
from cloudai.core import GitRepo, TestRun
from cloudai.systems.slurm import SlurmSystem
from cloudai.workloads.vllm import VllmArgs, VllmCmdArgs, VllmTestDefinition


def test_vllm_serve_args_exclude_internal_fields() -> None:
assert VllmArgs(gpu_ids="0", nixl_threads=1).serve_args == []


def test_vllm_serve_args_convert_boolean_flags() -> None:
assert VllmArgs.model_validate({"enable_expert_parallel": True}).serve_args == ["--enable-expert-parallel"]
assert VllmArgs.model_validate({"enable_expert_parallel": False}).serve_args == ["--no-enable-expert-parallel"]
assert VllmArgs.model_validate({"tokens_only": True}).serve_args == ["--tokens-only"]
assert VllmArgs.model_validate({"tokens_only": False}).serve_args == ["--no-tokens-only"]


def test_vllm_serve_args_convert_standalone_boolean_flags() -> None:
assert VllmArgs.model_validate({"headless": True}).serve_args == ["--headless"]
assert VllmArgs.model_validate({"headless": False}).serve_args == ["--no-headless"]


def test_vllm_serve_args_keep_non_boolean_values() -> None:
assert VllmArgs.model_validate({"tensor_parallel_size": 4}).serve_args == ["--tensor-parallel-size", "4"]


def test_installables_include_proxy_script_repo() -> None:
proxy_script_repo = GitRepo(url="./proxy_script_repo", commit="commit")
tdef = VllmTestDefinition(
Expand All @@ -33,3 +50,71 @@ def test_installables_include_proxy_script_repo() -> None:
)

assert tdef.installables == [tdef.docker_image, tdef.hf_model, proxy_script_repo]


def test_constraint_check_rejects_tp_pp_dp_above_available_gpus(tmp_path) -> None:
tdef = VllmTestDefinition(
name="test",
description="test",
test_template_name="vllm",
cmd_args=VllmCmdArgs(
docker_image_url="test_url",
decode=VllmArgs.model_validate({"tensor_parallel_size": 2, "pipeline_parallel_size": 2}),
),
extra_env_vars={"CUDA_VISIBLE_DEVICES": "0,1,2"},
)
tr = TestRun(name="vllm", test=tdef, num_nodes=1, nodes=[], output_path=tmp_path)

assert tdef.constraint_check(tr, None) is False


def test_constraint_check_rejects_flashinfer_with_dp_and_expert_parallel(tmp_path) -> None:
tdef = VllmTestDefinition(
name="test",
description="test",
test_template_name="vllm",
cmd_args=VllmCmdArgs(
docker_image_url="test_url",
decode=VllmArgs.model_validate(
{"data_parallel_size": 2, "all2all_backend": "flashinfer_all2allv", "enable_expert_parallel": True}
),
),
extra_env_vars={"CUDA_VISIBLE_DEVICES": "0,1"},
)
tr = TestRun(name="vllm", test=tdef, num_nodes=1, nodes=[], output_path=tmp_path)

assert tdef.constraint_check(tr, None) is False


def test_constraint_check_validates_disaggregated_roles_against_split_gpus(tmp_path) -> None:
tdef = VllmTestDefinition(
name="test",
description="test",
test_template_name="vllm",
cmd_args=VllmCmdArgs(
docker_image_url="test_url",
prefill=VllmArgs.model_validate({"tensor_parallel_size": 3}),
decode=VllmArgs.model_validate({"tensor_parallel_size": 2}),
),
extra_env_vars={"CUDA_VISIBLE_DEVICES": "0,1,2,3"},
)
tr = TestRun(name="vllm", test=tdef, num_nodes=1, nodes=[], output_path=tmp_path)

assert tdef.constraint_check(tr, None) is False


def test_constraint_check_uses_all_node_gpus_per_role_for_two_node_disagg(tmp_path, slurm_system: SlurmSystem) -> None:
tdef = VllmTestDefinition(
name="test",
description="test",
test_template_name="vllm",
cmd_args=VllmCmdArgs(
docker_image_url="test_url",
prefill=VllmArgs.model_validate({"tensor_parallel_size": 4}),
decode=VllmArgs.model_validate({"tensor_parallel_size": 4}),
),
)
tr = TestRun(name="vllm", test=tdef, num_nodes=2, nodes=[], output_path=tmp_path)
slurm_system.gpus_per_node = 4

assert tdef.constraint_check(tr, slurm_system) is True
Loading