From 57fb55a005fbac2978d964eb0d967c8310733742 Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Mon, 15 Dec 2025 20:03:16 -0500 Subject: [PATCH 1/8] tmp --- .../fuzzer-common-data-bundles/README.md | 3 - .../_internal/cron/schedule_fuzz.py | 36 +++++++++- .../_internal/datastore/data_types.py | 16 +++++ .../_internal/google_cloud_utils/batch.py | 33 +++++++++- .../handlers/cron/schedule_fuzz_test.py | 65 +++++++++++++++++++ 5 files changed, 146 insertions(+), 7 deletions(-) delete mode 100644 bot/inputs/fuzzer-common-data-bundles/README.md diff --git a/bot/inputs/fuzzer-common-data-bundles/README.md b/bot/inputs/fuzzer-common-data-bundles/README.md deleted file mode 100644 index 2c59a08c0dc..00000000000 --- a/bot/inputs/fuzzer-common-data-bundles/README.md +++ /dev/null @@ -1,3 +0,0 @@ -Placeholder for common corpora that is available to a fuzzer (if a corpus is not provided). - -Examples include web tests. See WEB_TESTS_URL attribute in project.yaml. \ No newline at end of file diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index d18d6edbfa9..2229d62f0fa 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -14,6 +14,7 @@ """Cron job to schedule fuzz tasks that run on batch.""" import collections +import datetime import multiprocessing import random import time @@ -377,9 +378,36 @@ def respect_project_max_cpus(num_cpus): return min(max_cpus_per_schedule, num_cpus) +def is_congested() -> bool: + """Returns True if the batch system is congested.""" + one_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(hours=1) + congestion_jobs = list( + data_types.CongestionJob.query( + data_types.CongestionJob.timestamp > one_hour_ago)) + + # TODO(metzman): Don't hardcode this, infer how many should be per hour based + # on how long ago was previous schedule. + if len(congestion_jobs) >= 3: + completed_count = batch.check_congestion_jobs( + [job.job_id for job in congestion_jobs]) + if completed_count < 3: + logs.error( + f'Congestion detected: {completed_count}/{len(congestion_jobs)} ' + 'congestion jobs completed in the last hour. Pausing scheduling.') + return True + return False + + def schedule_fuzz_tasks() -> bool: """Schedules fuzz tasks.""" - multiprocessing.set_start_method('spawn') + try: + multiprocessing.set_start_method('spawn') + except RuntimeError: + pass + + if is_congested(): + return False + batch_config = local_config.BatchConfig() project = batch_config.get('project') regions = get_batch_regions(batch_config) @@ -398,6 +426,12 @@ def schedule_fuzz_tasks() -> bool: tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') + # Run a hello world task that finishes very quickly. We need job, pick any. + clusterfuzz_job_type = fuzz_tasks[0].job + + batch_job_result = batch.create_congestion_job(clusterfuzz_job_type) + data_types.CongestionJob(job_id=batch_job_result.name).put() + end = time.time() total = end - start logs.info(f'Task scheduling took {total} seconds.') diff --git a/src/clusterfuzz/_internal/datastore/data_types.py b/src/clusterfuzz/_internal/datastore/data_types.py index 4e5af47dc24..8c8c007831a 100644 --- a/src/clusterfuzz/_internal/datastore/data_types.py +++ b/src/clusterfuzz/_internal/datastore/data_types.py @@ -1803,3 +1803,19 @@ class FuzzerTaskEvent(Model): def _pre_put_hook(self): self.ttl_expiry_timestamp = ( datetime.datetime.now() + self.FUZZER_EVENT_TTL) + + +class CongestionJob(Model): + """Congestion job. Used to measure congestion in batch.""" + CONGESTION_JOB_TTL = datetime.timedelta(days=3) + + # The job name (ID) in Batch. + job_id = ndb.StringProperty() + # Time of creation. + timestamp = ndb.DateTimeProperty(auto_now_add=True) + # Expiration time for this entity. + ttl_expiry_timestamp = ndb.DateTimeProperty() + + def _pre_put_hook(self): + self.ttl_expiry_timestamp = ( + datetime.datetime.now() + self.CONGESTION_JOB_TTL) diff --git a/src/clusterfuzz/_internal/google_cloud_utils/batch.py b/src/clusterfuzz/_internal/google_cloud_utils/batch.py index 39124e3b9f3..6d18fb07c04 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/batch.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/batch.py @@ -122,11 +122,38 @@ def create_uworker_main_batch_jobs(batch_tasks: List[BatchTask]): return jobs -def _get_task_spec(batch_workload_spec): +def create_congestion_job(job_type): + """Creates a congestion job.""" + batch_tasks = [BatchTask('fuzz', job_type, 'CONGESTION')] + specs = _get_specs_from_config(batch_tasks) + spec = specs[('fuzz', job_type)] + return _create_job(spec, ['CONGESTION'], commands=['echo', 'hello']) + + +def check_congestion_jobs(job_ids): + """Checks the status of the congestion jobs.""" + completed_count = 0 + for job_id in job_ids: + try: + job = _batch_client().get_job(name=job_id) + if job.status.state == batch.JobStatus.State.SUCCEEDED: + completed_count += 1 + except Exception: + # If we can't get the job, it might have been deleted or there is an error. + # We don't count it as completed. + logs.warning(f'Failed to get job {job_id}.') + + return completed_count + + +def _get_task_spec(batch_workload_spec, commands=None): """Gets the task spec based on the batch workload spec.""" runnable = batch.Runnable() runnable.container = batch.Runnable.Container() runnable.container.image_uri = batch_workload_spec.docker_image + if commands: + runnable.container.commands = commands + clusterfuzz_release = batch_workload_spec.clusterfuzz_release runnable.container.options = ( '--memory-swappiness=40 --shm-size=1.9g --rm --net=host ' @@ -190,7 +217,7 @@ def _get_allocation_policy(spec): return allocation_policy -def _create_job(spec, input_urls): +def _create_job(spec, input_urls, commands=None): """Creates and starts a batch job from |spec| that executes all tasks.""" task_group = batch.TaskGroup() task_group.task_count = len(input_urls) @@ -200,7 +227,7 @@ def _create_job(spec, input_urls): for input_url in input_urls ] task_group.task_environments = task_environments - task_group.task_spec = _get_task_spec(spec) + task_group.task_spec = _get_task_spec(spec, commands=commands) task_group.task_count_per_node = TASK_COUNT_PER_NODE assert task_group.task_count_per_node == 1, 'This is a security issue' diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index 8b960d9977e..dd5a47840a4 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -286,3 +286,68 @@ def test_config_limit(self): }] self.assertEqual( schedule_fuzz.get_cpu_usage(self.creds, 'region', 'project'), (2, 0)) + + +@test_utils.with_cloud_emulators('datastore') +class ScheduleFuzzTasksTest(unittest.TestCase): + """Tests for schedule_fuzz_tasks.""" + + def setUp(self): + test_helpers.patch(self, [ + 'clusterfuzz._internal.cron.schedule_fuzz.get_available_cpus', + 'clusterfuzz._internal.cron.schedule_fuzz.get_fuzz_tasks', + 'clusterfuzz._internal.cron.schedule_fuzz.get_batch_regions', + 'clusterfuzz._internal.google_cloud_utils.batch.check_congestion_jobs', + 'clusterfuzz._internal.google_cloud_utils.batch.create_congestion_job', + 'clusterfuzz._internal.base.tasks.bulk_add_tasks', + ]) + self.mock.get_batch_regions.return_value = ['us-central1'] + mock_job = unittest.mock.Mock() + mock_job.name = 'congestion-job' + self.mock.create_congestion_job.return_value = mock_job + + def test_is_congested_true(self): + """Tests that scheduling stops when congested.""" + # Create 3 congestion jobs. + for i in range(3): + data_types.CongestionJob(job_id=f'job-{i}').put() + + # Mock check_congestion_jobs to return 0 completed. + self.mock.check_congestion_jobs.return_value = 0 + + self.assertFalse(schedule_fuzz.schedule_fuzz_tasks()) + self.mock.get_available_cpus.assert_not_called() + + def test_is_congested_false(self): + """Tests that scheduling proceeds when not congested.""" + # Create 3 congestion jobs. + for i in range(3): + data_types.CongestionJob(job_id=f'job-{i}').put() + + # Mock check_congestion_jobs to return 3 completed. + self.mock.check_congestion_jobs.return_value = 3 + self.mock.get_available_cpus.return_value = 10 + mock_task = unittest.mock.Mock() + mock_task.job = 'job1' + self.mock.get_fuzz_tasks.return_value = [mock_task] + + self.assertTrue(schedule_fuzz.schedule_fuzz_tasks()) + self.mock.get_available_cpus.assert_called() + + def test_no_congestion_job_if_no_tasks(self): + """Tests that no congestion job is scheduled if no fuzz tasks.""" + self.mock.get_available_cpus.return_value = 10 + self.mock.get_fuzz_tasks.return_value = [] + + self.assertFalse(schedule_fuzz.schedule_fuzz_tasks()) + self.mock.create_congestion_job.assert_not_called() + + def test_congestion_job_scheduled(self): + """Tests that a congestion job is scheduled when fuzz tasks are.""" + self.mock.get_available_cpus.return_value = 10 + mock_task = unittest.mock.Mock() + mock_task.job = 'job1' + self.mock.get_fuzz_tasks.return_value = [mock_task] + + self.assertTrue(schedule_fuzz.schedule_fuzz_tasks()) + self.mock.create_congestion_job.assert_called_with('job1') From ee5cbc0b6e3c4d960193dae6f30ee7d63963454b Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Mon, 15 Dec 2025 20:45:07 -0500 Subject: [PATCH 2/8] [batch] Add congestion jobs. The idea is if we schedule a bunch of NOOP jobs and we don't see many in the last hour, that things are congested. This might not react fast enough to congestion, but can potentially pause things for long enough to drain the queue. In the future we should stop hardcoding and infer how many we should see completed in the last hour. Other future improvements can be testing congestion by subconfig so we can tell if some other resource is saturated. --- .../_internal/cron/schedule_fuzz.py | 87 ++++++++++++------- .../_internal/datastore/data_types.py | 2 + .../_internal/google_cloud_utils/batch.py | 63 +++++++++----- .../handlers/cron/schedule_fuzz_test.py | 16 ++-- .../core/google_cloud_utils/batch_test.py | 4 +- 5 files changed, 115 insertions(+), 57 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index 2229d62f0fa..21fb3462078 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -378,24 +378,57 @@ def respect_project_max_cpus(num_cpus): return min(max_cpus_per_schedule, num_cpus) -def is_congested() -> bool: - """Returns True if the batch system is congested.""" +def get_congested_regions() -> List[str]: + """Returns a list of congested regions.""" one_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(hours=1) congestion_jobs = list( data_types.CongestionJob.query( data_types.CongestionJob.timestamp > one_hour_ago)) - # TODO(metzman): Don't hardcode this, infer how many should be per hour based - # on how long ago was previous schedule. - if len(congestion_jobs) >= 3: + jobs_by_region = collections.defaultdict(list) + for job in congestion_jobs: + if job.region: + jobs_by_region[job.region].append(job) + + congested_regions = [] + for region, jobs in jobs_by_region.items(): + # Sort by timestamp descending. + jobs.sort(key=lambda j: j.timestamp, reverse=True) + # Check the last 3 jobs. + recent_jobs = jobs[:3] + if len(recent_jobs) < 3: + continue + completed_count = batch.check_congestion_jobs( - [job.job_id for job in congestion_jobs]) + [job.job_id for job in recent_jobs]) if completed_count < 3: - logs.error( - f'Congestion detected: {completed_count}/{len(congestion_jobs)} ' - 'congestion jobs completed in the last hour. Pausing scheduling.') - return True - return False + logs.error(f'Congestion detected in {region}: {completed_count}/3 ' + 'congestion jobs completed in the last hour.') + congested_regions.append(region) + return congested_regions + + +def schedule_congestion_jobs(fuzz_tasks, all_regions): + """Schedules congestion jobs for all regions.""" + # Run a hello world task that finishes very quickly. We need job, pick any. + clusterfuzz_job_type = None + if fuzz_tasks: + clusterfuzz_job_type = fuzz_tasks[0].job + else: + # If no tasks scheduled, try to get a job type from DB to run congestion job. + job = data_types.Job.query().get() + if job: + clusterfuzz_job_type = job.name + + if clusterfuzz_job_type: + for region in all_regions: + try: + batch_job_result = batch.create_congestion_job( + clusterfuzz_job_type, gce_region=region) + data_types.CongestionJob( + job_id=batch_job_result.name, region=region).put() + except Exception: + logs.error(f'Failed to create congestion job in {region}.') def schedule_fuzz_tasks() -> bool: @@ -405,32 +438,28 @@ def schedule_fuzz_tasks() -> bool: except RuntimeError: pass - if is_congested(): - return False - batch_config = local_config.BatchConfig() project = batch_config.get('project') - regions = get_batch_regions(batch_config) + all_regions = get_batch_regions(batch_config) + congested_regions = get_congested_regions() + regions = [r for r in all_regions if r not in congested_regions] + start = time.time() available_cpus = get_available_cpus(project, regions) logs.info(f'{available_cpus} available CPUs.') - if not available_cpus: - return False - - fuzz_tasks = get_fuzz_tasks(available_cpus) - if not fuzz_tasks: - logs.error('No fuzz tasks found to schedule.') - return False - logs.info(f'Adding {fuzz_tasks} to preprocess queue.') - tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) - logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') + fuzz_tasks = [] + if available_cpus > 0: + fuzz_tasks = get_fuzz_tasks(available_cpus) - # Run a hello world task that finishes very quickly. We need job, pick any. - clusterfuzz_job_type = fuzz_tasks[0].job + if fuzz_tasks: + logs.info(f'Adding {fuzz_tasks} to preprocess queue.') + tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True) # pylint: disable=line-too-long + logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.') + else: + logs.info('No fuzz tasks scheduled.') - batch_job_result = batch.create_congestion_job(clusterfuzz_job_type) - data_types.CongestionJob(job_id=batch_job_result.name).put() + schedule_congestion_jobs(fuzz_tasks, all_regions) end = time.time() total = end - start diff --git a/src/clusterfuzz/_internal/datastore/data_types.py b/src/clusterfuzz/_internal/datastore/data_types.py index 8c8c007831a..441cd0e20e7 100644 --- a/src/clusterfuzz/_internal/datastore/data_types.py +++ b/src/clusterfuzz/_internal/datastore/data_types.py @@ -1811,6 +1811,8 @@ class CongestionJob(Model): # The job name (ID) in Batch. job_id = ndb.StringProperty() + # The region the job is running in. + region = ndb.StringProperty() # Time of creation. timestamp = ndb.DateTimeProperty(auto_now_add=True) # Expiration time for this entity. diff --git a/src/clusterfuzz/_internal/google_cloud_utils/batch.py b/src/clusterfuzz/_internal/google_cloud_utils/batch.py index 6d18fb07c04..1ac014c1378 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/batch.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/batch.py @@ -13,6 +13,7 @@ # limitations under the License. """Cloud Batch helpers.""" import collections +import dataclasses import threading from typing import Dict from typing import List @@ -44,23 +45,24 @@ # See https://cloud.google.com/batch/quotas#job_limits MAX_CONCURRENT_VMS_PER_JOB = 1000 -BatchWorkloadSpec = collections.namedtuple('BatchWorkloadSpec', [ - 'clusterfuzz_release', - 'disk_size_gb', - 'disk_type', - 'docker_image', - 'user_data', - 'service_account_email', - 'subnetwork', - 'preemptible', - 'project', - 'machine_type', - 'network', - 'gce_region', - 'priority', - 'max_run_duration', - 'retry', -]) + +@dataclasses.dataclass +class BatchWorkloadSpec: + clusterfuzz_release: str + disk_size_gb: int + disk_type: str + docker_image: str + user_data: str + service_account_email: str + subnetwork: str + preemptible: bool + project: str + machine_type: str + network: str + gce_region: str + priority: int + max_run_duration: str + retry: bool def _create_batch_client_new(): @@ -122,13 +124,23 @@ def create_uworker_main_batch_jobs(batch_tasks: List[BatchTask]): return jobs -def create_congestion_job(job_type): +def create_congestion_job(job_type, gce_region=None): """Creates a congestion job.""" batch_tasks = [BatchTask('fuzz', job_type, 'CONGESTION')] specs = _get_specs_from_config(batch_tasks) spec = specs[('fuzz', job_type)] - return _create_job(spec, ['CONGESTION'], commands=['echo', 'hello']) + if gce_region: + batch_config = _get_batch_config() + config_map = _get_config_names(batch_tasks) + config_name, _, _ = config_map[('fuzz', job_type)] + instance_spec = batch_config.get('mapping').get(config_name) + subconfig = _get_subconfig_for_region( + batch_config, instance_spec, gce_region) + spec.gce_region = subconfig['region'] + spec.network = subconfig['network'] + spec.subnetwork = subconfig['subnetwork'] + return _create_job(spec, ['CONGESTION'], commands=['echo', 'hello']) def check_congestion_jobs(job_ids): """Checks the status of the congestion jobs.""" @@ -139,7 +151,8 @@ def check_congestion_jobs(job_ids): if job.status.state == batch.JobStatus.State.SUCCEEDED: completed_count += 1 except Exception: - # If we can't get the job, it might have been deleted or there is an error. + # If we can't get the job, it might have been deleted or there is an + # error. # We don't count it as completed. logs.warning(f'Failed to get job {job_id}.') @@ -335,6 +348,16 @@ def _get_subconfig(batch_config, instance_spec): return all_subconfigs[weighted_subconfig.name] +def _get_subconfig_for_region(batch_config, instance_spec, region): + all_subconfigs = batch_config.get('subconfigs', {}) + instance_subconfigs = instance_spec['subconfigs'] + for subconfig in instance_subconfigs: + full_subconfig = all_subconfigs[subconfig['name']] + if full_subconfig['region'] == region: + return full_subconfig + raise ValueError(f'No subconfig for region {region}') + + def _get_specs_from_config(batch_tasks) -> Dict: """Gets the configured specifications for a batch workload.""" if not batch_tasks: diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index dd5a47840a4..17828121e2e 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -310,19 +310,22 @@ def test_is_congested_true(self): """Tests that scheduling stops when congested.""" # Create 3 congestion jobs. for i in range(3): - data_types.CongestionJob(job_id=f'job-{i}').put() + data_types.CongestionJob(job_id=f'job-{i}', region='us-central1').put() # Mock check_congestion_jobs to return 0 completed. self.mock.check_congestion_jobs.return_value = 0 + self.mock.get_available_cpus.return_value = 0 - self.assertFalse(schedule_fuzz.schedule_fuzz_tasks()) - self.mock.get_available_cpus.assert_not_called() + self.assertTrue(schedule_fuzz.schedule_fuzz_tasks()) + self.mock.get_available_cpus.assert_called() + # Verify called with empty regions list + self.assertEqual(self.mock.get_available_cpus.call_args[0][1], []) def test_is_congested_false(self): """Tests that scheduling proceeds when not congested.""" # Create 3 congestion jobs. for i in range(3): - data_types.CongestionJob(job_id=f'job-{i}').put() + data_types.CongestionJob(job_id=f'job-{i}', region='us-central1').put() # Mock check_congestion_jobs to return 3 completed. self.mock.check_congestion_jobs.return_value = 3 @@ -339,7 +342,7 @@ def test_no_congestion_job_if_no_tasks(self): self.mock.get_available_cpus.return_value = 10 self.mock.get_fuzz_tasks.return_value = [] - self.assertFalse(schedule_fuzz.schedule_fuzz_tasks()) + self.assertTrue(schedule_fuzz.schedule_fuzz_tasks()) self.mock.create_congestion_job.assert_not_called() def test_congestion_job_scheduled(self): @@ -350,4 +353,5 @@ def test_congestion_job_scheduled(self): self.mock.get_fuzz_tasks.return_value = [mock_task] self.assertTrue(schedule_fuzz.schedule_fuzz_tasks()) - self.mock.create_congestion_job.assert_called_with('job1') + self.mock.create_congestion_job.assert_called_with( + 'job1', gce_region='us-central1') diff --git a/src/clusterfuzz/_internal/tests/core/google_cloud_utils/batch_test.py b/src/clusterfuzz/_internal/tests/core/google_cloud_utils/batch_test.py index f86f10ed334..b031465a057 100644 --- a/src/clusterfuzz/_internal/tests/core/google_cloud_utils/batch_test.py +++ b/src/clusterfuzz/_internal/tests/core/google_cloud_utils/batch_test.py @@ -63,7 +63,7 @@ def test_nonpreemptible(self): max_run_duration='21600s', ) - self.assertCountEqual(spec, expected_spec) + self.assertEqual(spec, expected_spec) def test_fuzz_get_specs_from_config(self): """Tests that _get_specs_from_config works for fuzz tasks as expected.""" @@ -89,7 +89,7 @@ def test_fuzz_get_specs_from_config(self): max_run_duration='21600s', ) - self.assertCountEqual(spec, expected_spec) + self.assertEqual(spec, expected_spec) def test_corpus_pruning(self): """Tests that corpus pruning uses a spec of 24 hours and a different one From 884ae7a77f126e146bdfc59417bcab59c092be69 Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Tue, 16 Dec 2025 16:31:05 -0500 Subject: [PATCH 3/8] fmt --- bot/inputs/fuzzer-common-data-bundles/README.md | 3 +++ src/clusterfuzz/_internal/google_cloud_utils/batch.py | 5 +++-- 2 files changed, 6 insertions(+), 2 deletions(-) create mode 100644 bot/inputs/fuzzer-common-data-bundles/README.md diff --git a/bot/inputs/fuzzer-common-data-bundles/README.md b/bot/inputs/fuzzer-common-data-bundles/README.md new file mode 100644 index 00000000000..2c59a08c0dc --- /dev/null +++ b/bot/inputs/fuzzer-common-data-bundles/README.md @@ -0,0 +1,3 @@ +Placeholder for common corpora that is available to a fuzzer (if a corpus is not provided). + +Examples include web tests. See WEB_TESTS_URL attribute in project.yaml. \ No newline at end of file diff --git a/src/clusterfuzz/_internal/google_cloud_utils/batch.py b/src/clusterfuzz/_internal/google_cloud_utils/batch.py index 1ac014c1378..ea4f0de32e9 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/batch.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/batch.py @@ -134,14 +134,15 @@ def create_congestion_job(job_type, gce_region=None): config_map = _get_config_names(batch_tasks) config_name, _, _ = config_map[('fuzz', job_type)] instance_spec = batch_config.get('mapping').get(config_name) - subconfig = _get_subconfig_for_region( - batch_config, instance_spec, gce_region) + subconfig = _get_subconfig_for_region(batch_config, instance_spec, + gce_region) spec.gce_region = subconfig['region'] spec.network = subconfig['network'] spec.subnetwork = subconfig['subnetwork'] return _create_job(spec, ['CONGESTION'], commands=['echo', 'hello']) + def check_congestion_jobs(job_ids): """Checks the status of the congestion jobs.""" completed_count = 0 From 848f748fec67f51f79228bb5989e060e68d9f837 Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Wed, 17 Dec 2025 13:31:16 -0500 Subject: [PATCH 4/8] Address comments --- src/clusterfuzz/_internal/cron/schedule_fuzz.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index 21fb3462078..6c93f6d360c 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -379,7 +379,12 @@ def respect_project_max_cpus(num_cpus): def get_congested_regions() -> List[str]: - """Returns a list of congested regions.""" + """Returns a list of congested regions. The strategy used is as follows: + Run congestion jobs every time this cron is run in each region. + Assuming we run this cron more than 3 times an hour, if there aren't + 3 completed jobs in the last hour, they either failed (unlikely, they are + trivial) or never ran because of congestion. + """ one_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(hours=1) congestion_jobs = list( data_types.CongestionJob.query( @@ -402,6 +407,7 @@ def get_congested_regions() -> List[str]: completed_count = batch.check_congestion_jobs( [job.job_id for job in recent_jobs]) if completed_count < 3: + # TODO(metzman): Add some monitoring here. logs.error(f'Congestion detected in {region}: {completed_count}/3 ' 'congestion jobs completed in the last hour.') congested_regions.append(region) @@ -410,7 +416,8 @@ def get_congested_regions() -> List[str]: def schedule_congestion_jobs(fuzz_tasks, all_regions): """Schedules congestion jobs for all regions.""" - # Run a hello world task that finishes very quickly. We need job, pick any. + # Run a hello world task that finishes very quickly. The job field is ignored, + # but we need one, so pick an arbitrary one. clusterfuzz_job_type = None if fuzz_tasks: clusterfuzz_job_type = fuzz_tasks[0].job @@ -435,7 +442,7 @@ def schedule_fuzz_tasks() -> bool: """Schedules fuzz tasks.""" try: multiprocessing.set_start_method('spawn') - except RuntimeError: + except RuntimeError: # Ignore if this was done previously. pass batch_config = local_config.BatchConfig() From 0f88d496642730e0b24e5a3a89c9a3e491733b6b Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Wed, 17 Dec 2025 13:33:02 -0500 Subject: [PATCH 5/8] fix --- src/clusterfuzz/_internal/cron/schedule_fuzz.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index 6c93f6d360c..4f3f880c40c 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -416,13 +416,14 @@ def get_congested_regions() -> List[str]: def schedule_congestion_jobs(fuzz_tasks, all_regions): """Schedules congestion jobs for all regions.""" - # Run a hello world task that finishes very quickly. The job field is ignored, - # but we need one, so pick an arbitrary one. + # Run a hello world task that finishes very quickly. The job field is + # ignored, but we need one, so pick an arbitrary one. clusterfuzz_job_type = None if fuzz_tasks: clusterfuzz_job_type = fuzz_tasks[0].job else: - # If no tasks scheduled, try to get a job type from DB to run congestion job. + # If no tasks scheduled, try to get a job type from DB to run congestion + # job. job = data_types.Job.query().get() if job: clusterfuzz_job_type = job.name @@ -442,7 +443,7 @@ def schedule_fuzz_tasks() -> bool: """Schedules fuzz tasks.""" try: multiprocessing.set_start_method('spawn') - except RuntimeError: # Ignore if this was done previously. + except RuntimeError: # Ignore if this was done previously. pass batch_config = local_config.BatchConfig() From ec162d19d777d3e131f2641a423e65c4e4fcce4b Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Wed, 17 Dec 2025 13:34:09 -0500 Subject: [PATCH 6/8] add docstring --- src/clusterfuzz/_internal/google_cloud_utils/batch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/clusterfuzz/_internal/google_cloud_utils/batch.py b/src/clusterfuzz/_internal/google_cloud_utils/batch.py index ea4f0de32e9..12136f94f49 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/batch.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/batch.py @@ -48,6 +48,7 @@ @dataclasses.dataclass class BatchWorkloadSpec: + """Batch specification chosen from the configs and subconfigs.""" clusterfuzz_release: str disk_size_gb: int disk_type: str From 4c00557521d4bb4b7e3384908348de45fa4b3252 Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Wed, 17 Dec 2025 13:56:03 -0500 Subject: [PATCH 7/8] Make requested changes and make congestion measure more robust --- src/clusterfuzz/_internal/cron/schedule_fuzz.py | 15 ++++++++------- .../_internal/google_cloud_utils/batch.py | 8 +++++++- .../handlers/cron/schedule_fuzz_test.py | 17 +++++++++-------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index 4f3f880c40c..bd1d4231efe 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -296,7 +296,13 @@ def get_fuzz_tasks(self) -> List[tasks.Task]: return fuzz_tasks -def get_fuzz_tasks(available_cpus: int) -> [tasks.Task]: +def get_fuzz_tasks(project, regions) -> [tasks.Task]: + available_cpus = get_available_cpus(project, regions) + logs.info(f'{available_cpus} available CPUs.') + + if not available_cpus: + return [] + if utils.is_oss_fuzz(): scheduler = OssfuzzFuzzTaskScheduler(available_cpus) else: @@ -453,12 +459,7 @@ def schedule_fuzz_tasks() -> bool: regions = [r for r in all_regions if r not in congested_regions] start = time.time() - available_cpus = get_available_cpus(project, regions) - logs.info(f'{available_cpus} available CPUs.') - - fuzz_tasks = [] - if available_cpus > 0: - fuzz_tasks = get_fuzz_tasks(available_cpus) + fuzz_tasks = get_fuzz_tasks(project, regions) if fuzz_tasks: logs.info(f'Adding {fuzz_tasks} to preprocess queue.') diff --git a/src/clusterfuzz/_internal/google_cloud_utils/batch.py b/src/clusterfuzz/_internal/google_cloud_utils/batch.py index 12136f94f49..49aacdd16d7 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/batch.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/batch.py @@ -150,7 +150,13 @@ def check_congestion_jobs(job_ids): for job_id in job_ids: try: job = _batch_client().get_job(name=job_id) - if job.status.state == batch.JobStatus.State.SUCCEEDED: + # We count SUCCEEDED, RUNNING, and FAILED as completed (i.e. not + # congested). If the job is in any of these states, it means it was + # successfully scheduled and started running. If it is QUEUED, it means + # it is still waiting to be scheduled, which implies congestion. + if job.status.state in (batch.JobStatus.State.SUCCEEDED, + batch.JobStatus.State.RUNNING, + batch.JobStatus.State.FAILED): completed_count += 1 except Exception: # If we can't get the job, it might have been deleted or there is an diff --git a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py index 17828121e2e..ea2b095c472 100644 --- a/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py @@ -314,12 +314,16 @@ def test_is_congested_true(self): # Mock check_congestion_jobs to return 0 completed. self.mock.check_congestion_jobs.return_value = 0 - self.mock.get_available_cpus.return_value = 0 + # Mock get_fuzz_tasks to return empty list (simulating no CPUs or other issues) + self.mock.get_fuzz_tasks.return_value = [] self.assertTrue(schedule_fuzz.schedule_fuzz_tasks()) - self.mock.get_available_cpus.assert_called() - # Verify called with empty regions list - self.assertEqual(self.mock.get_available_cpus.call_args[0][1], []) + + self.mock.get_fuzz_tasks.assert_called() + # Verify called with empty regions list (2nd argument) + # Args are (project, regions) + call_args = self.mock.get_fuzz_tasks.call_args[0] + self.assertEqual(call_args[1], []) def test_is_congested_false(self): """Tests that scheduling proceeds when not congested.""" @@ -329,17 +333,15 @@ def test_is_congested_false(self): # Mock check_congestion_jobs to return 3 completed. self.mock.check_congestion_jobs.return_value = 3 - self.mock.get_available_cpus.return_value = 10 mock_task = unittest.mock.Mock() mock_task.job = 'job1' self.mock.get_fuzz_tasks.return_value = [mock_task] self.assertTrue(schedule_fuzz.schedule_fuzz_tasks()) - self.mock.get_available_cpus.assert_called() + self.mock.get_fuzz_tasks.assert_called() def test_no_congestion_job_if_no_tasks(self): """Tests that no congestion job is scheduled if no fuzz tasks.""" - self.mock.get_available_cpus.return_value = 10 self.mock.get_fuzz_tasks.return_value = [] self.assertTrue(schedule_fuzz.schedule_fuzz_tasks()) @@ -347,7 +349,6 @@ def test_no_congestion_job_if_no_tasks(self): def test_congestion_job_scheduled(self): """Tests that a congestion job is scheduled when fuzz tasks are.""" - self.mock.get_available_cpus.return_value = 10 mock_task = unittest.mock.Mock() mock_task.job = 'job1' self.mock.get_fuzz_tasks.return_value = [mock_task] From 366a3eba04fd940a09acbb36241c958ced6655c6 Mon Sep 17 00:00:00 2001 From: Jonathan Metzman Date: Wed, 17 Dec 2025 14:42:06 -0500 Subject: [PATCH 8/8] Move check_congestion_jobs to gcp.py and create_congestion_job to service.py --- AGENTS.md | 8 +++ src/clusterfuzz/_internal/batch/gcp.py | 23 +++++++ src/clusterfuzz/_internal/batch/service.py | 22 ------ .../_internal/cron/schedule_fuzz.py | 2 +- .../_internal/google_cloud_utils/batch.py | 4 +- .../_internal/tests/core/batch/gcp_test.py | 69 +++++++++++++++++++ .../tests/core/batch/service_test.py | 39 +---------- 7 files changed, 105 insertions(+), 62 deletions(-) create mode 100644 src/clusterfuzz/_internal/tests/core/batch/gcp_test.py diff --git a/AGENTS.md b/AGENTS.md index a53e6d72b82..15460ef8844 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -93,3 +93,11 @@ python butler.py format This will format the changed code in your current branch. It's possible to get into a state where linting and formatting contradict each other. In this case, STOP, the human will fix it. + +## Codebase Notes + +### Batch Logic + +- `src/clusterfuzz/_internal/batch/gcp.py` contains low-level GCP Batch client logic. `check_congestion_jobs` is placed here as it directly queries job status using the client. +- `src/clusterfuzz/_internal/batch/service.py` contains high-level batch service logic, including configuration management. `create_congestion_job` is placed here because it depends on configuration logic (`_get_specs_from_config`, etc.). +- `src/clusterfuzz/_internal/google_cloud_utils/batch.py` acts as a facade/wrapper for backward compatibility or convenience, delegating to `gcp.py` and `service.py`. \ No newline at end of file diff --git a/src/clusterfuzz/_internal/batch/gcp.py b/src/clusterfuzz/_internal/batch/gcp.py index 51f91920347..583653e743f 100644 --- a/src/clusterfuzz/_internal/batch/gcp.py +++ b/src/clusterfuzz/_internal/batch/gcp.py @@ -154,6 +154,29 @@ def count_queued_or_scheduled_tasks(project: str, return (queued, scheduled) +def check_congestion_jobs(job_ids: List[str]) -> int: + """Checks the status of the congestion jobs.""" + completed_count = 0 + for job_id in job_ids: + try: + job = _batch_client().get_job(name=job_id) + # We count SUCCEEDED, RUNNING, and FAILED as completed (i.e. not + # congested). If the job is in any of these states, it means it was + # successfully scheduled and started running. If it is QUEUED, it means + # it is still waiting to be scheduled, which implies congestion. + if job.status.state in (batch.JobStatus.State.SUCCEEDED, + batch.JobStatus.State.RUNNING, + batch.JobStatus.State.FAILED): + completed_count += 1 + except Exception: + # If we can't get the job, it might have been deleted or there is an + # error. + # We don't count it as completed. + logs.warning(f'Failed to get job {job_id}.') + + return completed_count + + class GcpBatchClient(RemoteTaskInterface): """A client for creating and managing jobs on the GCP Batch service. diff --git a/src/clusterfuzz/_internal/batch/service.py b/src/clusterfuzz/_internal/batch/service.py index c8158bd2131..7024b02bd1f 100644 --- a/src/clusterfuzz/_internal/batch/service.py +++ b/src/clusterfuzz/_internal/batch/service.py @@ -257,25 +257,3 @@ def create_congestion_job(self, job_type, gce_region=None): spec.subnetwork = subconfig['subnetwork'] return self._client.create_job(spec, ['CONGESTION'], commands=['echo', 'hello']) - - def check_congestion_jobs(self, job_ids): - """Checks the status of the congestion jobs.""" - completed_count = 0 - for job_id in job_ids: - try: - job = self._client.get_job(name=job_id) - # We count SUCCEEDED, RUNNING, and FAILED as completed (i.e. not - # congested). If the job is in any of these states, it means it was - # successfully scheduled and started running. If it is QUEUED, it means - # it is still waiting to be scheduled, which implies congestion. - if job.status.state in (batch.JobStatus.State.SUCCEEDED, - batch.JobStatus.State.RUNNING, - batch.JobStatus.State.FAILED): - completed_count += 1 - except Exception: - # If we can't get the job, it might have been deleted or there is an - # error. - # We don't count it as completed. - logs.warning(f'Failed to get job {job_id}.') - - return completed_count diff --git a/src/clusterfuzz/_internal/cron/schedule_fuzz.py b/src/clusterfuzz/_internal/cron/schedule_fuzz.py index 6b40c48bd21..bd1d4231efe 100644 --- a/src/clusterfuzz/_internal/cron/schedule_fuzz.py +++ b/src/clusterfuzz/_internal/cron/schedule_fuzz.py @@ -26,10 +26,10 @@ from clusterfuzz._internal.base import tasks from clusterfuzz._internal.base import utils -from clusterfuzz._internal.batch import gcp as batch from clusterfuzz._internal.config import local_config from clusterfuzz._internal.datastore import data_types from clusterfuzz._internal.datastore import ndb_utils +from clusterfuzz._internal.google_cloud_utils import batch from clusterfuzz._internal.google_cloud_utils import credentials from clusterfuzz._internal.metrics import logs diff --git a/src/clusterfuzz/_internal/google_cloud_utils/batch.py b/src/clusterfuzz/_internal/google_cloud_utils/batch.py index 4beba2ed067..85ed2ecf5cc 100644 --- a/src/clusterfuzz/_internal/google_cloud_utils/batch.py +++ b/src/clusterfuzz/_internal/google_cloud_utils/batch.py @@ -39,8 +39,8 @@ def create_congestion_job(job_type, gce_region=None): def check_congestion_jobs(job_ids): """Checks the status of the congestion jobs.""" - service = BatchService() - return service.check_congestion_jobs(job_ids) + from clusterfuzz._internal.batch import gcp + return gcp.check_congestion_jobs(job_ids) def count_queued_or_scheduled_tasks(project: str, region: str): diff --git a/src/clusterfuzz/_internal/tests/core/batch/gcp_test.py b/src/clusterfuzz/_internal/tests/core/batch/gcp_test.py new file mode 100644 index 00000000000..4479a248a92 --- /dev/null +++ b/src/clusterfuzz/_internal/tests/core/batch/gcp_test.py @@ -0,0 +1,69 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for the gcp module.""" +import unittest +from unittest import mock + +from google.cloud import batch_v1 as batch + +from clusterfuzz._internal.batch import gcp +from clusterfuzz._internal.tests.test_libs import helpers +from clusterfuzz._internal.tests.test_libs import test_utils + +class GcpTest(unittest.TestCase): + """Tests for gcp module.""" + + def setUp(self): + helpers.patch(self, [ + 'clusterfuzz._internal.batch.gcp._batch_client', + ]) + self.mock_batch_client_instance = mock.Mock() + self.mock._batch_client.return_value = self.mock_batch_client_instance + + def test_check_congestion_jobs(self): + """Tests that check_congestion_jobs counts correctly.""" + # Create mock jobs with different states + job_succeeded = mock.Mock() + job_succeeded.status.state = batch.JobStatus.State.SUCCEEDED + + job_running = mock.Mock() + job_running.status.state = batch.JobStatus.State.RUNNING + + job_failed = mock.Mock() + job_failed.status.state = batch.JobStatus.State.FAILED + + job_queued = mock.Mock() + job_queued.status.state = batch.JobStatus.State.QUEUED + + # Mock get_job to return these based on job name + def get_job_side_effect(name): + if name == 'job-succeeded': + return job_succeeded + if name == 'job-running': + return job_running + if name == 'job-failed': + return job_failed + if name == 'job-queued': + return job_queued + raise Exception("Job not found") + + self.mock_batch_client_instance.get_job.side_effect = get_job_side_effect + + # Check that SUCCEEDED, RUNNING, FAILED are counted (3 total) + # QUEUED is not counted + # Non-existent job is not counted + job_ids = ['job-succeeded', 'job-running', 'job-failed', 'job-queued', 'job-missing'] + count = gcp.check_congestion_jobs(job_ids) + + self.assertEqual(count, 3) diff --git a/src/clusterfuzz/_internal/tests/core/batch/service_test.py b/src/clusterfuzz/_internal/tests/core/batch/service_test.py index 4c9715c07da..6d401f1f971 100644 --- a/src/clusterfuzz/_internal/tests/core/batch/service_test.py +++ b/src/clusterfuzz/_internal/tests/core/batch/service_test.py @@ -105,7 +105,8 @@ def _get_expected_allocation_policy(spec): return allocation_policy -def _get_expected_create_request(job_name_uuid, spec, input_urls, commands=None): +def _get_expected_create_request(job_name_uuid, spec, input_urls, + commands=None): """Constructs and returns a `batch.CreateJobRequest` object. This function builds a complete `CreateJobRequest` for the GCP Batch service, @@ -295,42 +296,6 @@ def test_create_congestion_job(self): expected_create_request) self.assertEqual(result, 'job') - def test_check_congestion_jobs(self): - """Tests that check_congestion_jobs counts correctly.""" - # Create mock jobs with different states - job_succeeded = mock.Mock() - job_succeeded.status.state = batch.JobStatus.State.SUCCEEDED - - job_running = mock.Mock() - job_running.status.state = batch.JobStatus.State.RUNNING - - job_failed = mock.Mock() - job_failed.status.state = batch.JobStatus.State.FAILED - - job_queued = mock.Mock() - job_queued.status.state = batch.JobStatus.State.QUEUED - - # Mock get_job to return these based on job name - def get_job_side_effect(name): - if name == 'job-succeeded': - return job_succeeded - if name == 'job-running': - return job_running - if name == 'job-failed': - return job_failed - if name == 'job-queued': - return job_queued - raise Exception("Job not found") - - self.mock_batch_client_instance.get_job.side_effect = get_job_side_effect - - # Check that SUCCEEDED, RUNNING, FAILED are counted (3 total) - # QUEUED is not counted - # Non-existent job is not counted - job_ids = ['job-succeeded', 'job-running', 'job-failed', 'job-queued', 'job-missing'] - count = self.batch_service.check_congestion_jobs(job_ids) - - self.assertEqual(count, 3) @test_utils.with_cloud_emulators('datastore') class IsRemoteTaskTest(unittest.TestCase):