Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,8 @@ jobs:
- name: Run jobs aggregated sampler test
run: python -m test.test_sampler_jobs_aggregated --file-path data/allusers-gpu-30.log

- name: Run jobs replay modes test
run: python -m test.test_jobs_replay_modes

- name: Run workload generator inspection test
run: python -m test.test_inspect_workloadgen --workload-gen poisson --wg-poisson-lambdas4 200,10,6,24 --wg-max-jobs-hour 1500 --hours 336 --plot --wg-burst-small-prob 0.2 --wg-burst-heavy-prob 0.02
10 changes: 2 additions & 8 deletions src/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ def __init__(self,
evaluation_mode: bool = False,
workload_gen: WorkloadGenerator | None = None,
job_arrival_scale: float = 1.0,
jobs_exact_replay: bool = False,
jobs_exact_replay_aggregate: bool = False) -> None:
jobs_exact_replay: bool = False) -> None:
super().__init__()

self.weights = weights
Expand All @@ -89,7 +88,6 @@ def __init__(self,
self.evaluation_mode = evaluation_mode
self.job_arrival_scale = float(job_arrival_scale)
self.jobs_exact_replay = bool(jobs_exact_replay)
self.jobs_exact_replay_aggregate = bool(jobs_exact_replay_aggregate)

self.next_plot_save = self.steps_per_iteration

Expand Down Expand Up @@ -118,10 +116,7 @@ def __init__(self,
print(f"Parsed aggregated jobs for {len(self.jobs_sampler.aggregated_jobs)} hours")
if self.jobs_exact_replay:
max_raw_jobs = max((len(v) for v in self.jobs_sampler.jobs.values()), default=0)
if self.jobs_exact_replay_aggregate:
print("Jobs replay mode: exact timeline (aggregated per step)")
else:
print("Jobs replay mode: exact timeline (raw jobs per hour)")
print("Jobs replay mode: exact timeline (raw jobs per hour)")
print(f"Max raw jobs per hour: {max_raw_jobs}")
else:
self.jobs_sampler.precalculate_hourly_jobs(CORES_PER_NODE, MAX_NODES_PER_JOB)
Expand Down Expand Up @@ -350,7 +345,6 @@ def step(self, action: np.ndarray) -> tuple[dict[str, np.ndarray], float, bool,
hourly_sampler, durations_sampler, self.np_random,
job_arrival_scale=self.job_arrival_scale,
jobs_exact_replay=self.jobs_exact_replay,
jobs_exact_replay_aggregate=self.jobs_exact_replay_aggregate,
)

# Add new jobs to queue (overflow goes to helper)
Expand Down
32 changes: 8 additions & 24 deletions src/workload_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def generate_jobs(
np_random: np.random.Generator,
job_arrival_scale: float = 1.0,
jobs_exact_replay: bool = False,
jobs_exact_replay_aggregate: bool = False,
) -> tuple[int, list[int], list[int], list[int]]:
"""
Generate new jobs for the current hour using configured workload source.
Expand All @@ -50,9 +49,6 @@ def generate_jobs(
- >1.0: upsample jobs
- 0.0..1.0: downsample jobs
jobs_exact_replay: If True, replay raw jobs in log order for --jobs mode.
jobs_exact_replay_aggregate: In exact replay mode, aggregate each sampled
raw time-bin into compact hourly-equivalent templates.

Returns:
Tuple of (new_jobs_count, new_jobs_durations, new_jobs_nodes, new_jobs_cores)
"""
Expand All @@ -68,26 +64,14 @@ def generate_jobs(
# Replay jobs exactly as they appear in the parsed timeline (one bin per step).
sampled = jobs_sampler.sample(1, wrap=True)
raw_jobs = next(iter(sampled.values()), [])
if jobs_exact_replay_aggregate and raw_jobs:
aggregated_jobs = jobs_sampler.aggregate_jobs(raw_jobs)
hourly_jobs = jobs_sampler.convert_to_hourly_jobs(
aggregated_jobs, CORES_PER_NODE, MAX_NODES_PER_JOB
)
for job in hourly_jobs:
instances = max(1, int(job.get('instances', 1)))
new_jobs_count += instances
new_jobs_durations.extend([int(job['duration_hours'])] * instances)
new_jobs_nodes.extend([int(job['nnodes'])] * instances)
new_jobs_cores.extend([int(job['cores_per_node'])] * instances)
else:
for job in raw_jobs:
duration_hours = max(1, int(math.ceil(int(job['duration_minutes']) / 60)))
nnodes = min(max(int(job['nnodes']), MIN_NODES_PER_JOB), MAX_NODES_PER_JOB)
cores_per_node = min(max(int(job['cores_per_node']), MIN_CORES_PER_JOB), CORES_PER_NODE)
new_jobs_count += 1
new_jobs_durations.append(duration_hours)
new_jobs_nodes.append(nnodes)
new_jobs_cores.append(cores_per_node)
for job in raw_jobs:
duration_hours = max(1, int(math.ceil(int(job['duration_minutes']) / 60)))
nnodes = min(max(int(job['nnodes']), MIN_NODES_PER_JOB), MAX_NODES_PER_JOB)
cores_per_node = min(max(int(job['cores_per_node']), MIN_CORES_PER_JOB), CORES_PER_NODE)
new_jobs_count += 1
new_jobs_durations.append(duration_hours)
new_jobs_nodes.append(nnodes)
new_jobs_cores.append(cores_per_node)
else:
# Use pre-aggregated hourly templates for pattern-based replay.
sampled = jobs_sampler.sample_one_hourly(wrap=True)
Expand Down
1 change: 1 addition & 0 deletions test/run_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
["python", "-m", "test.test_sampler_hourly_aggregated", "--file-path", "data/allusers-gpu-30.log"],
["python", "-m", "test.test_sampler_jobs", "--file-path", "data/allusers-gpu-30.log"],
["python", "-m", "test.test_sampler_jobs_aggregated", "--file-path", "data/allusers-gpu-30.log"],
["python", "-m", "test.test_jobs_replay_modes"],
["python", "-m", "test.test_inspect_workloadgen", "--workload-gen", "poisson", "--wg-poisson-lambdas4", "200,10,6,24", "--wg-max-jobs-hour", "1500", "--hours", "336", "--plot", "--wg-burst-small-prob", "0.2", "--wg-burst-heavy-prob", "0.02"],
]

Expand Down
219 changes: 219 additions & 0 deletions test/test_jobs_replay_modes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
#!/usr/bin/env python3
"""Tests for --jobs replay modes: raw exact replay vs aggregated template replay.

Synthetic log has two hourly bins with known jobs:
Bin 1 (00:xx):
- 2x (2 nodes, 16 NCPUS=8/node, 30 min) sub-hour, identical → merge in template mode
- 1x (1 node, 8 NCPUS=8/node, 120 min) 2-hour job
Bin 2 (01:xx):
- 1x (4 nodes, 32 NCPUS=8/node, 60 min) 1-hour job
- 1x (1 node, 96 NCPUS=96/node, 30 min) sub-hour job

Raw replay: bin 1 → 3 jobs, bin 2 → 2 jobs; cores preserved as-parsed.
Template replay: bin 1 → 2 job instances (two sub-hour jobs collapsed into one
core-hour-equivalent entry), bin 2 → 2 job instances.
"""

import math
import sys
import tempfile
import textwrap

import numpy as np

from src.sampler_jobs import DurationSampler
from src.workload_generator import generate_jobs
from src.config import CORES_PER_NODE, MAX_NODES_PER_JOB

LOG_CONTENT = textwrap.dedent("""\
JobID User Partition Submit Start End ElapsedRaw NCPUS NNodes\x20
------------------------------ --------- ---------- ------------------- ------------------- ------------------- ---------- ---------- --------\x20
1 user01 gpu 2024-01-01T00:01:00 2024-01-01T00:01:00 2024-01-01T00:31:00 1800 16 2
2 user01 gpu 2024-01-01T00:02:00 2024-01-01T00:02:00 2024-01-01T00:32:00 1800 16 2
3 user01 gpu 2024-01-01T00:10:00 2024-01-01T00:10:00 2024-01-01T02:10:00 7200 8 1
4 user01 gpu 2024-01-01T01:05:00 2024-01-01T01:05:00 2024-01-01T02:05:00 3600 32 4
5 user01 gpu 2024-01-01T01:10:00 2024-01-01T01:10:00 2024-01-01T01:40:00 1800 96 1
""")

# Expected raw jobs after clamping/rounding applied by generate_jobs.
# duration = max(1, ceil(elapsed_seconds / 3600))
# cores_per_node = ncpus // nnodes, then clamped to [1, CORES_PER_NODE]
EXPECTED_RAW_BIN1 = [
{"duration_hours": 1, "nnodes": 2, "cores_per_node": 8},
{"duration_hours": 1, "nnodes": 2, "cores_per_node": 8},
{"duration_hours": 2, "nnodes": 1, "cores_per_node": 8},
]
EXPECTED_RAW_BIN2 = [
{"duration_hours": 1, "nnodes": 4, "cores_per_node": 8},
{"duration_hours": 1, "nnodes": 1, "cores_per_node": 96},
]


def _make_sampler(log_path: str, template: bool) -> DurationSampler:
s = DurationSampler()
s.parse_jobs(log_path, 60)
if template:
s.precalculate_hourly_jobs(CORES_PER_NODE, MAX_NODES_PER_JOB)
return s


def _call_generate(sampler, hour, exact_replay):
rng = np.random.default_rng(0)
return generate_jobs(
current_hour=hour,
external_jobs="synthetic", # non-empty triggers --jobs path
external_hourly_jobs=None,
external_durations=None,
workload_gen=None,
jobs_sampler=sampler,
hourly_sampler=None,
durations_sampler=None,
np_random=rng,
jobs_exact_replay=exact_replay,
)


def test_raw_replay_job_counts():
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f:
f.write(LOG_CONTENT)
path = f.name

sampler = _make_sampler(path, template=False)

count1, _, _, _ = _call_generate(sampler, hour=1, exact_replay=True)
assert count1 == 3, f"bin 1 raw: expected 3 jobs, got {count1}"

count2, _, _, _ = _call_generate(sampler, hour=2, exact_replay=True)
assert count2 == 2, f"bin 2 raw: expected 2 jobs, got {count2}"

print(f"PASS: raw replay job counts ({count1}, {count2})")


def test_raw_replay_cores_populated():
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f:
f.write(LOG_CONTENT)
path = f.name

sampler = _make_sampler(path, template=False)

for hour in (1, 2):
_, _, _, cores = _call_generate(sampler, hour=hour, exact_replay=True)
assert all(c > 0 for c in cores), \
f"bin {hour} raw: cores_per_node contains zeros: {cores}"

print("PASS: raw replay cores populated")


def test_raw_replay_job_attributes():
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f:
f.write(LOG_CONTENT)
path = f.name

sampler = _make_sampler(path, template=False)

_, dur1, nodes1, cores1 = _call_generate(sampler, hour=1, exact_replay=True)
actual1 = sorted(zip(dur1, nodes1, cores1))
expected1 = sorted(
(j["duration_hours"], j["nnodes"], j["cores_per_node"]) for j in EXPECTED_RAW_BIN1
)
assert actual1 == expected1, f"bin 1 raw jobs mismatch:\n got: {actual1}\n expected: {expected1}"

_, dur2, nodes2, cores2 = _call_generate(sampler, hour=2, exact_replay=True)
actual2 = sorted(zip(dur2, nodes2, cores2))
expected2 = sorted(
(j["duration_hours"], j["nnodes"], j["cores_per_node"]) for j in EXPECTED_RAW_BIN2
)
assert actual2 == expected2, f"bin 2 raw jobs mismatch:\n got: {actual2}\n expected: {expected2}"

print("PASS: raw replay job attributes match expected")


def test_template_replay_cores_populated():
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f:
f.write(LOG_CONTENT)
path = f.name

sampler = _make_sampler(path, template=True)

for hour in (1, 2):
count, durations, nodes, cores = _call_generate(sampler, hour=hour, exact_replay=False)
assert count > 0, f"bin {hour} template: expected jobs, got 0"
assert all(c > 0 for c in cores), \
f"bin {hour} template: cores_per_node contains zeros: {cores}"

print("PASS: template replay cores populated")


def test_template_replay_bin1_collapses_subhour_pair():
"""Two identical sub-hour jobs in bin 1 should collapse to one template entry."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f:
f.write(LOG_CONTENT)
path = f.name

sampler = _make_sampler(path, template=True)

count, durations, nodes, cores = _call_generate(sampler, hour=1, exact_replay=False)

# Raw has 3 jobs; template collapses the two identical sub-hour jobs into one
# core-hour-equivalent entry + keeps the 2h job = 2 job instances.
assert count == 2, (
f"bin 1 template: expected 2 job instances after sub-hour collapse, got {count}\n"
f" durations={durations}, nodes={nodes}, cores={cores}"
)
print(f"PASS: template replay bin 1 collapses sub-hour pair → {count} instances")


def test_template_replay_preserves_core_hours_bin1():
"""Template mode should preserve approximate total core-hours from the raw log."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".log", delete=False) as f:
f.write(LOG_CONTENT)
path = f.name

# Raw core-hours for bin 1:
# job1: 2 nodes * 8 cores/node * (30/60)h = 8 core-hours
# job2: same = 8 core-hours
# job3: 1 node * 8 cores/node * (120/60)h = 16 core-hours
# Total = 32 core-hours
raw_core_hours_bin1 = 8 + 8 + 16

sampler = _make_sampler(path, template=True)
count, durations, nodes, cores = _call_generate(sampler, hour=1, exact_replay=False)

template_core_hours = sum(
n * c * d for d, n, c in zip(durations, nodes, cores)
)
# Allow small rounding due to ceil operations, but should be within 10%
assert abs(template_core_hours - raw_core_hours_bin1) / raw_core_hours_bin1 < 0.1, (
f"bin 1 template core-hours {template_core_hours:.1f} deviates "
f"more than 10% from raw {raw_core_hours_bin1}"
)
print(f"PASS: template replay bin 1 core-hours {template_core_hours:.1f} ≈ raw {raw_core_hours_bin1}")


def main():
tests = [
test_raw_replay_job_counts,
test_raw_replay_cores_populated,
test_raw_replay_job_attributes,
test_template_replay_cores_populated,
test_template_replay_bin1_collapses_subhour_pair,
test_template_replay_preserves_core_hours_bin1,
]
failed = []
for t in tests:
try:
t()
except AssertionError as e:
print(f"FAIL: {t.__name__}: {e}")
failed.append(t.__name__)
except Exception as e:
print(f"ERROR: {t.__name__}: {e}")
failed.append(t.__name__)

print(f"\n{len(tests) - len(failed)}/{len(tests)} passed")
if failed:
sys.exit(1)


if __name__ == "__main__":
main()
16 changes: 6 additions & 10 deletions train.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@ def main():
parser.add_argument('--hourly-jobs', type=str, nargs='?', const="", default="", help='Path to Slurm log file for hourly statistical sampling (for use with hourly_sampler)')
parser.add_argument('--job-arrival-scale', type=float, default=1.0, help='Scale sampled arrivals per step (1.0 = unchanged).')
parser.add_argument('--jobs-exact-replay', action='store_true', help='For --jobs mode, replay raw jobs in timeline order (no template aggregation).')
parser.add_argument('--jobs-exact-replay-aggregate', action='store_true', help='With --jobs-exact-replay, aggregate each sampled raw time-bin before enqueueing.')
parser.add_argument('--plot-rewards', action='store_true', help='Per step, plot rewards for all possible num_idle_nodes & num_used_nodes (default: False).')
parser.add_argument('--plot-eff-reward', action=argparse.BooleanOptionalAction, default=True, help='Include efficiency reward in the plot (dashed line).')
parser.add_argument('--plot-price-reward', action=argparse.BooleanOptionalAction, default=True, help='Include price reward in the plot (dashed line).')
parser.add_argument('--plot-idle-penalty', action=argparse.BooleanOptionalAction, default=True, help='Include idle penalty in the plot (dashed line).')
parser.add_argument('--plot-job-age-penalty', action=argparse.BooleanOptionalAction, default=True, help='Include job age penalty in the plot (dashed line).')
parser.add_argument('--plot-total-reward', action=argparse.BooleanOptionalAction, default=True, help='Include total reward per step in the dashboard (raw values).')
parser.add_argument('--plot-eff-reward', action=argparse.BooleanOptionalAction, default=False, help='Include efficiency reward in the plot (dashed line).')
parser.add_argument('--plot-price-reward', action=argparse.BooleanOptionalAction, default=False, help='Include price reward in the plot (dashed line).')
parser.add_argument('--plot-idle-penalty', action=argparse.BooleanOptionalAction, default=False, help='Include idle penalty in the plot (dashed line).')
parser.add_argument('--plot-job-age-penalty', action=argparse.BooleanOptionalAction, default=False, help='Include job age penalty in the plot (dashed line).')
parser.add_argument('--plot-total-reward', action=argparse.BooleanOptionalAction, default=False, help='Include total reward per step in the dashboard (raw values).')
parser.add_argument('--plot-price', action=argparse.BooleanOptionalAction, default=True, help='Plot electricity price.')
parser.add_argument('--plot-online-nodes', action=argparse.BooleanOptionalAction, default=True, help='Plot online nodes.')
parser.add_argument('--plot-used-nodes', action=argparse.BooleanOptionalAction, default=True, help='Plot used nodes.')
Expand Down Expand Up @@ -91,8 +90,6 @@ def main():
parser.error(str(exc))
if args.jobs_exact_replay and not norm_path(args.jobs):
parser.error("--jobs-exact-replay requires --jobs")
if args.jobs_exact_replay_aggregate and not args.jobs_exact_replay:
parser.error("--jobs-exact-replay-aggregate requires --jobs-exact-replay")
if args.workload_gen and args.job_arrival_scale != 1.0:
print(
"Warning: --job-arrival-scale is not allowed with --workload-gen; "
Expand Down Expand Up @@ -177,8 +174,7 @@ def main():
evaluation_mode=args.evaluate_savings,
workload_gen=workload_gen,
job_arrival_scale=args.job_arrival_scale,
jobs_exact_replay=args.jobs_exact_replay,
jobs_exact_replay_aggregate=args.jobs_exact_replay_aggregate)
jobs_exact_replay=args.jobs_exact_replay)
env.session_dir = session_root
env.plots_dir = plots_dir
env.reset(seed=args.seed)
Expand Down
Loading
Loading