Skip to content

Commit 466e646

Browse files
committed
sigterm handling for bg hooks
1 parent b65de79 commit 466e646

File tree

1 file changed

+154
-34
lines changed

1 file changed

+154
-34
lines changed

abx_dl/executor.py

Lines changed: 154 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
"""
44

55
import json
6+
import os
7+
import signal
68
import subprocess
79
import sys
10+
import time
811
from pathlib import Path
912
from typing import Any, Generator
1013

@@ -19,15 +22,20 @@ def get_interpreter(language: str) -> list[str]:
1922
return {'py': [sys.executable], 'js': ['node'], 'sh': ['bash']}.get(language, [])
2023

2124

22-
def run_hook(hook: Hook, url: str, snapshot_id: str, output_dir: Path, env: dict[str, str], timeout: int = 60) -> tuple[Process, ArchiveResult]:
23-
"""Run a single hook and return Process and ArchiveResult."""
25+
def run_hook(hook: Hook, url: str, snapshot_id: str, output_dir: Path, env: dict[str, str], timeout: int = 60) -> tuple[Process, ArchiveResult, subprocess.Popen | None]:
26+
"""
27+
Run a single hook and return Process, ArchiveResult, and optionally Popen handle.
28+
29+
For background hooks, returns the Popen object so caller can manage cleanup.
30+
For foreground hooks, returns None for the Popen.
31+
"""
2432
files_before = set(output_dir.rglob('*'))
2533

2634
interpreter = get_interpreter(hook.language)
2735
if not interpreter:
2836
proc = Process(cmd=[], exit_code=1, stderr=f'Unknown language: {hook.language}')
2937
result = ArchiveResult(snapshot_id=snapshot_id, plugin=hook.plugin_name, hook_name=hook.name, status='failed', error=proc.stderr)
30-
return proc, result
38+
return proc, result, None
3139

3240
# Set lib paths
3341
env.update({
@@ -41,6 +49,34 @@ def run_hook(hook: Hook, url: str, snapshot_id: str, output_dir: Path, env: dict
4149
proc = Process(cmd=cmd, pwd=str(output_dir), timeout=timeout, started_at=now_iso())
4250

4351
try:
52+
if hook.is_background:
53+
# Background hook - start and don't wait, return Popen handle
54+
popen = subprocess.Popen(cmd, cwd=str(output_dir), env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
55+
# Give it a moment to start and potentially fail fast
56+
time.sleep(0.3)
57+
if popen.poll() is not None:
58+
# Process already exited (fast failure)
59+
stdout, stderr = popen.communicate()
60+
proc.exit_code = popen.returncode
61+
proc.stdout = stdout.decode('utf-8', errors='replace')
62+
proc.stderr = stderr.decode('utf-8', errors='replace')
63+
proc.ended_at = now_iso()
64+
status = 'failed' if popen.returncode != 0 else 'succeeded'
65+
ar = ArchiveResult(
66+
snapshot_id=snapshot_id, plugin=hook.plugin_name, hook_name=hook.name,
67+
status=status, process_id=proc.id, start_ts=proc.started_at, end_ts=proc.ended_at,
68+
error=proc.stderr[:500] if proc.exit_code != 0 else None,
69+
)
70+
return proc, ar, None # No popen to track since it already exited
71+
else:
72+
# Still running - return handle for later cleanup
73+
ar = ArchiveResult(
74+
snapshot_id=snapshot_id, plugin=hook.plugin_name, hook_name=hook.name,
75+
status='started', process_id=proc.id, start_ts=proc.started_at,
76+
)
77+
return proc, ar, popen
78+
79+
# Foreground hook - wait for completion
4480
result = subprocess.run(cmd, cwd=str(output_dir), env=env, capture_output=True, timeout=timeout)
4581
proc.exit_code = result.returncode
4682
proc.stdout = result.stdout.decode('utf-8', errors='replace')
@@ -79,20 +115,86 @@ def run_hook(hook: Hook, url: str, snapshot_id: str, output_dir: Path, env: dict
79115
end_ts=proc.ended_at,
80116
error=proc.stderr[:500] if result.returncode != 0 else None,
81117
)
82-
return proc, ar
118+
return proc, ar, None
83119

84120
except subprocess.TimeoutExpired:
85121
proc.exit_code = -1
86122
proc.stderr = f'Timed out after {timeout}s'
87123
proc.ended_at = now_iso()
88124
ar = ArchiveResult(snapshot_id=snapshot_id, plugin=hook.plugin_name, hook_name=hook.name, status='failed', process_id=proc.id, error=proc.stderr)
89-
return proc, ar
125+
return proc, ar, None
90126
except Exception as e:
91127
proc.exit_code = -1
92128
proc.stderr = f'{type(e).__name__}: {e}'
93129
proc.ended_at = now_iso()
94130
ar = ArchiveResult(snapshot_id=snapshot_id, plugin=hook.plugin_name, hook_name=hook.name, status='failed', process_id=proc.id, error=proc.stderr)
95-
return proc, ar
131+
return proc, ar, None
132+
133+
134+
def cleanup_background_hooks(bg_hooks: list[tuple[subprocess.Popen, Process, ArchiveResult, Path]], index_path: Path, is_tty: bool):
135+
"""
136+
Send SIGTERM to all background hooks, wait for them to finish, and collect output.
137+
"""
138+
for popen, proc, ar, output_dir in bg_hooks:
139+
if popen.poll() is None:
140+
# Still running - send SIGTERM
141+
try:
142+
popen.send_signal(signal.SIGTERM)
143+
except (ProcessLookupError, OSError):
144+
pass
145+
146+
# Wait for all to finish (with timeout)
147+
for popen, proc, ar, output_dir in bg_hooks:
148+
try:
149+
stdout, stderr = popen.communicate(timeout=10)
150+
proc.exit_code = popen.returncode
151+
proc.stdout = stdout.decode('utf-8', errors='replace')
152+
proc.stderr = stderr.decode('utf-8', errors='replace')
153+
proc.ended_at = now_iso()
154+
155+
# Detect new files
156+
files_after = set(output_dir.rglob('*'))
157+
new_files = [str(f.relative_to(output_dir)) for f in files_after if f.is_file()]
158+
159+
# Update ArchiveResult
160+
ar.end_ts = proc.ended_at
161+
ar.output_files = new_files
162+
163+
# Parse JSONL output for final status
164+
status = 'succeeded' if popen.returncode == 0 else 'failed'
165+
for line in proc.stdout.strip().split('\n'):
166+
if line.strip():
167+
try:
168+
record = json.loads(line)
169+
if record.get('type') == 'ArchiveResult':
170+
status = record.get('status', status)
171+
ar.output_str = record.get('output_str', '')
172+
except json.JSONDecodeError:
173+
pass
174+
ar.status = status
175+
if popen.returncode != 0:
176+
ar.error = proc.stderr[:500]
177+
178+
except subprocess.TimeoutExpired:
179+
popen.kill()
180+
popen.wait()
181+
proc.exit_code = -1
182+
proc.stderr = 'Background hook did not exit after SIGTERM'
183+
proc.ended_at = now_iso()
184+
ar.status = 'failed'
185+
ar.error = proc.stderr
186+
ar.end_ts = proc.ended_at
187+
except Exception as e:
188+
proc.exit_code = -1
189+
proc.stderr = f'{type(e).__name__}: {e}'
190+
proc.ended_at = now_iso()
191+
ar.status = 'failed'
192+
ar.error = proc.stderr
193+
ar.end_ts = proc.ended_at
194+
195+
# Write final results
196+
write_jsonl(index_path, proc, also_print=not is_tty)
197+
write_jsonl(index_path, ar, also_print=not is_tty)
96198

97199

98200
def check_plugin_dependencies(plugin: Plugin, auto_install: bool = True) -> tuple[bool, list[str]]:
@@ -167,35 +269,53 @@ def download(url: str, plugins: dict[str, Plugin], output_dir: Path, selected_pl
167269
snapshot_hooks.sort(key=lambda x: x[1].sort_key)
168270
all_hooks = crawl_hooks + snapshot_hooks
169271

170-
# Run hooks
272+
# Track background hooks for cleanup
273+
background_hooks: list[tuple[subprocess.Popen, Process, ArchiveResult, Path]] = []
171274
shared_config = dict(config_overrides) if config_overrides else {}
172275

173-
for plugin, hook in all_hooks:
174-
env = build_env_for_plugin(plugin.name, plugin.config_schema, shared_config)
175-
timeout = int(env.get(f"{plugin.name.upper()}_TIMEOUT", env.get('TIMEOUT', '60')))
176-
177-
# Executor creates plugin subdir, hooks write to cwd directly
178-
plugin_output_dir = output_dir / plugin.name
179-
plugin_output_dir.mkdir(parents=True, exist_ok=True)
180-
proc, ar = run_hook(hook, url, snapshot.id, plugin_output_dir, env, timeout)
181-
182-
# Write to index.jsonl
183-
write_jsonl(index_path, proc, also_print=not is_tty)
184-
write_jsonl(index_path, ar, also_print=not is_tty)
185-
186-
# Extract config updates from stdout
187-
for line in proc.stdout.split('\n'):
188-
if line.strip():
189-
try:
190-
record = json.loads(line)
191-
if record.get('type') == 'Binary':
192-
name = record.get('name', '')
193-
abspath = record.get('abspath', '')
194-
if name and abspath:
195-
shared_config[f'{name.upper()}_BINARY'] = abspath
196-
except json.JSONDecodeError:
197-
pass
198-
199-
yield ar
276+
try:
277+
for plugin, hook in all_hooks:
278+
env = build_env_for_plugin(plugin.name, plugin.config_schema, shared_config)
279+
timeout = int(env.get(f"{plugin.name.upper()}_TIMEOUT", env.get('TIMEOUT', '60')))
280+
281+
# Executor creates plugin subdir, hooks write to cwd directly
282+
plugin_output_dir = output_dir / plugin.name
283+
plugin_output_dir.mkdir(parents=True, exist_ok=True)
284+
proc, ar, popen = run_hook(hook, url, snapshot.id, plugin_output_dir, env, timeout)
285+
286+
if popen:
287+
# Background hook - track for later cleanup
288+
background_hooks.append((popen, proc, ar, plugin_output_dir))
289+
# Yield initial "started" result
290+
yield ar
291+
else:
292+
# Foreground hook - write results immediately
293+
write_jsonl(index_path, proc, also_print=not is_tty)
294+
write_jsonl(index_path, ar, also_print=not is_tty)
295+
296+
# Extract config updates from stdout
297+
for line in proc.stdout.split('\n'):
298+
if line.strip():
299+
try:
300+
record = json.loads(line)
301+
if record.get('type') == 'Binary':
302+
name = record.get('name', '')
303+
abspath = record.get('abspath', '')
304+
if name and abspath:
305+
shared_config[f'{name.upper()}_BINARY'] = abspath
306+
elif record.get('type') == 'Machine' and record.get('_method') == 'update':
307+
key = record.get('key', '').replace('config/', '')
308+
value = record.get('value', '')
309+
if key and value:
310+
shared_config[key] = value
311+
except json.JSONDecodeError:
312+
pass
313+
314+
yield ar
315+
316+
finally:
317+
# Cleanup background hooks - send SIGTERM, collect output
318+
if background_hooks:
319+
cleanup_background_hooks(background_hooks, index_path, is_tty)
200320

201321
return snapshot

0 commit comments

Comments
 (0)