Skip to content

Commit db8daf9

Browse files
authored
parallel_testsuite: Switch to pool.imap_unordered. NFC (#25751)
This allows us the print that status line back on the main thread as the results arrive there. It also avoids some of the shared state. I have a followup change that uses just a single line for output line other modern test runners do.
1 parent 19d3d9e commit db8daf9

File tree

1 file changed

+48
-40
lines changed

1 file changed

+48
-40
lines changed

test/parallel_testsuite.py

Lines changed: 48 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ def cap_max_workers_in_pool(max_workers, is_browser):
4444
return max_workers
4545

4646

47-
def run_test(test, allowed_failures_counter, lock, progress_counter, num_tests, buffer):
47+
def run_test(args):
48+
test, allowed_failures_counter, lock, buffer = args
4849
# If we have exceeded the number of allowed failures during the test run,
4950
# abort executing further tests immediately.
5051
if allowed_failures_counter and allowed_failures_counter.value < 0:
@@ -57,38 +58,6 @@ def test_failed():
5758

5859
start_time = time.perf_counter()
5960

60-
def compute_progress():
61-
if not lock:
62-
return ''
63-
with lock:
64-
val = f'[{int(progress_counter.value * 100 / num_tests)}%] '
65-
progress_counter.value += 1
66-
return with_color(CYAN, val)
67-
68-
def printResult(res):
69-
elapsed = time.perf_counter() - start_time
70-
progress = compute_progress()
71-
if res.test_result == 'success':
72-
msg = f'ok ({elapsed:.2f}s)'
73-
errlog(f'{progress}{res.test} ... {with_color(GREEN, msg)}')
74-
elif res.test_result == 'errored':
75-
msg = f'{res.test} ... ERROR'
76-
errlog(f'{progress}{with_color(RED, msg)}')
77-
elif res.test_result == 'failed':
78-
msg = f'{res.test} ... FAIL'
79-
errlog(f'{progress}{with_color(RED, msg)}')
80-
elif res.test_result == 'skipped':
81-
msg = f"skipped '{res.buffered_result.reason}'"
82-
errlog(f"{progress}{res.test} ... {with_color(CYAN, msg)}")
83-
elif res.test_result == 'unexpected success':
84-
msg = f'unexpected success ({elapsed:.2f}s)'
85-
errlog(f'{progress}{res.test} ... {with_color(RED, msg)}')
86-
elif res.test_result == 'expected failure':
87-
msg = f'expected failure ({elapsed:.2f}s)'
88-
errlog(f'{progress}{res.test} ... {with_color(RED, msg)}')
89-
else:
90-
assert False
91-
9261
olddir = os.getcwd()
9362
result = BufferedParallelTestResult()
9463
result.start_time = start_time
@@ -110,7 +79,7 @@ def printResult(res):
11079
result.addError(test, e)
11180
test_failed()
11281
finally:
113-
printResult(result)
82+
result.elapsed = time.perf_counter() - start_time
11483

11584
# Before attempting to delete the tmp dir make sure the current
11685
# working directory is not within it.
@@ -149,11 +118,44 @@ def __init__(self, max_cores, options):
149118
self.max_cores = max_cores
150119
self.max_failures = options.max_failures
151120
self.failing_and_slow_first = options.failing_and_slow_first
121+
self.progress_counter = 0
152122

153123
def addTest(self, test):
154124
super().addTest(test)
155125
test.is_parallel = True
156126

127+
def printOneResult(self, res):
128+
percent = int(self.progress_counter * 100 / self.num_tests)
129+
progress = f'[{percent:2d}%] '
130+
self.progress_counter += 1
131+
132+
if res.test_result == 'success':
133+
msg = 'ok'
134+
color = GREEN
135+
elif res.test_result == 'errored':
136+
msg = 'ERROR'
137+
color = RED
138+
elif res.test_result == 'failed':
139+
msg = 'FAIL'
140+
color = RED
141+
elif res.test_result == 'skipped':
142+
reason = res.skipped[0][1]
143+
msg = f"skipped '{reason}'"
144+
color = CYAN
145+
elif res.test_result == 'unexpected success':
146+
msg = 'unexpected success'
147+
color = RED
148+
elif res.test_result == 'expected failure':
149+
color = RED
150+
msg = 'expected failure'
151+
else:
152+
assert False, f'unhandled test result {res.test_result}'
153+
154+
if res.test_result != 'skipped':
155+
msg += f' ({res.elapsed:.2f}s)'
156+
157+
errlog(f'{with_color(CYAN, progress)}{res.test} ... {with_color(color, msg)}')
158+
157159
def run(self, result):
158160
# The 'spawn' method is used on windows and it can be useful to set this on
159161
# all platforms when debugging multiprocessing issues. Without this we
@@ -163,6 +165,7 @@ def run(self, result):
163165
# multiprocessing.set_start_method('spawn')
164166

165167
tests = self.get_sorted_tests()
168+
self.num_tests = len(tests)
166169
contains_browser_test = any(test.is_browser_test() for test in tests)
167170
use_cores = cap_max_workers_in_pool(min(self.max_cores, len(tests), num_cores()), contains_browser_test)
168171
errlog(f'Using {use_cores} parallel test processes')
@@ -178,15 +181,23 @@ def run(self, result):
178181
if python_multiprocessing_structures_are_buggy():
179182
# When multiprocessing shared structures are buggy we don't support failfast
180183
# or the progress bar.
181-
allowed_failures_counter = progress_counter = lock = None
184+
allowed_failures_counter = lock = None
182185
if self.max_failures < 2**31 - 1:
183186
errlog('The version of python being used is not compatible with --failfast and --max-failures options. See https://github.com/python/cpython/issues/71936')
184187
sys.exit(1)
185188
else:
186189
allowed_failures_counter = manager.Value('i', self.max_failures)
187-
progress_counter = manager.Value('i', 0)
188190
lock = manager.Lock()
189-
results = pool.starmap(run_test, ((t, allowed_failures_counter, lock, progress_counter, len(tests), result.buffer) for t in tests), chunksize=1)
191+
192+
results = []
193+
args = ((t, allowed_failures_counter, lock, result.buffer) for t in tests)
194+
for res in pool.imap_unordered(run_test, args, chunksize=1):
195+
# results may be be None if # of allowed errors was exceeded
196+
# and the harness aborted.
197+
if res:
198+
self.printOneResult(res)
199+
results.append(res)
200+
190201
# Send a task to each worker to tear down the browser and server. This
191202
# relies on the implementation detail in the worker pool that all workers
192203
# are cycled through once.
@@ -195,9 +206,6 @@ def run(self, result):
195206
if num_tear_downs != use_cores:
196207
errlog(f'Expected {use_cores} teardowns, got {num_tear_downs}')
197208

198-
# Filter out the None results which can occur if # of allowed errors was exceeded and the harness aborted.
199-
results = [r for r in results if r is not None]
200-
201209
if self.failing_and_slow_first:
202210
previous_test_run_results = common.load_previous_test_run_results()
203211
for r in results:

0 commit comments

Comments
 (0)