diff --git a/agents/s08_background_tasks.py b/agents/s08_background_tasks.py index 390a77780..f3ce5597e 100644 --- a/agents/s08_background_tasks.py +++ b/agents/s08_background_tasks.py @@ -52,6 +52,7 @@ def __init__(self): self.tasks = {} # task_id -> {status, result, command} self._notification_queue = [] # completed task results self._lock = threading.Lock() + self._condition = threading.Condition(self._lock) def run(self, command: str) -> str: """Start a background thread, return task_id immediately.""" @@ -67,8 +68,12 @@ def _execute(self, task_id: str, command: str): """Thread target: run subprocess, capture output, push to queue.""" try: r = subprocess.run( - command, shell=True, cwd=WORKDIR, - capture_output=True, text=True, timeout=300 + command, + shell=True, + cwd=WORKDIR, + capture_output=True, + text=True, + timeout=300, ) output = (r.stdout + r.stderr).strip()[:50000] status = "completed" @@ -80,13 +85,16 @@ def _execute(self, task_id: str, command: str): status = "error" self.tasks[task_id]["status"] = status self.tasks[task_id]["result"] = output or "(no output)" - with self._lock: - self._notification_queue.append({ - "task_id": task_id, - "status": status, - "command": command[:80], - "result": (output or "(no output)")[:500], - }) + with self._condition: + self._notification_queue.append( + { + "task_id": task_id, + "status": status, + "command": command[:80], + "result": (output or "(no output)")[:500], + } + ) + self._condition.notify_all() def check(self, task_id: str = None) -> str: """Check status of one task or list all.""" @@ -94,7 +102,9 @@ def check(self, task_id: str = None) -> str: t = self.tasks.get(task_id) if not t: return f"Error: Unknown task {task_id}" - return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}" + return ( + f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}" + ) lines = [] for tid, t in self.tasks.items(): lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}") @@ -102,7 +112,22 @@ def check(self, task_id: str = None) -> str: def drain_notifications(self) -> list: """Return and clear all pending completion notifications.""" - with self._lock: + with self._condition: + notifs = list(self._notification_queue) + self._notification_queue.clear() + return notifs + + def _has_running_tasks_locked(self) -> bool: + return any(task["status"] == "running" for task in self.tasks.values()) + + def has_running_tasks(self) -> bool: + with self._condition: + return self._has_running_tasks_locked() + + def wait_for_notifications(self) -> list: + with self._condition: + while not self._notification_queue and self._has_running_tasks_locked(): + self._condition.wait() notifs = list(self._notification_queue) self._notification_queue.clear() return notifs @@ -111,6 +136,21 @@ def drain_notifications(self) -> list: BG = BackgroundManager() +def inject_background_results(messages: list, notifs: list) -> bool: + if notifs and messages: + notif_text = "\n".join( + f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs + ) + messages.append( + { + "role": "user", + "content": f"\n{notif_text}\n", + } + ) + return True + return False + + # -- Tool implementations -- def safe_path(p: str) -> Path: path = (WORKDIR / p).resolve() @@ -118,18 +158,26 @@ def safe_path(p: str) -> Path: raise ValueError(f"Path escapes workspace: {p}") return path + def run_bash(command: str) -> str: dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" try: - r = subprocess.run(command, shell=True, cwd=WORKDIR, - capture_output=True, text=True, timeout=120) + r = subprocess.run( + command, + shell=True, + cwd=WORKDIR, + capture_output=True, + text=True, + timeout=120, + ) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" + def run_read(path: str, limit: int = None) -> str: try: lines = safe_path(path).read_text().splitlines() @@ -139,6 +187,7 @@ def run_read(path: str, limit: int = None) -> str: except Exception as e: return f"Error: {e}" + def run_write(path: str, content: str) -> str: try: fp = safe_path(path) @@ -148,6 +197,7 @@ def run_write(path: str, content: str) -> str: except Exception as e: return f"Error: {e}" + def run_edit(path: str, old_text: str, new_text: str) -> str: try: fp = safe_path(path) @@ -161,57 +211,114 @@ def run_edit(path: str, old_text: str, new_text: str) -> str: TOOL_HANDLERS = { - "bash": lambda **kw: run_bash(kw["command"]), - "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")), - "write_file": lambda **kw: run_write(kw["path"], kw["content"]), - "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]), - "background_run": lambda **kw: BG.run(kw["command"]), + "bash": lambda **kw: run_bash(kw["command"]), + "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")), + "write_file": lambda **kw: run_write(kw["path"], kw["content"]), + "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]), + "background_run": lambda **kw: BG.run(kw["command"]), "check_background": lambda **kw: BG.check(kw.get("task_id")), } TOOLS = [ - {"name": "bash", "description": "Run a shell command (blocking).", - "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, - {"name": "read_file", "description": "Read file contents.", - "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}}, - {"name": "write_file", "description": "Write content to file.", - "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, - {"name": "edit_file", "description": "Replace exact text in file.", - "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, - {"name": "background_run", "description": "Run command in background thread. Returns task_id immediately.", - "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, - {"name": "check_background", "description": "Check background task status. Omit task_id to list all.", - "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}}}, + { + "name": "bash", + "description": "Run a shell command (blocking).", + "input_schema": { + "type": "object", + "properties": {"command": {"type": "string"}}, + "required": ["command"], + }, + }, + { + "name": "read_file", + "description": "Read file contents.", + "input_schema": { + "type": "object", + "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, + "required": ["path"], + }, + }, + { + "name": "write_file", + "description": "Write content to file.", + "input_schema": { + "type": "object", + "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, + "required": ["path", "content"], + }, + }, + { + "name": "edit_file", + "description": "Replace exact text in file.", + "input_schema": { + "type": "object", + "properties": { + "path": {"type": "string"}, + "old_text": {"type": "string"}, + "new_text": {"type": "string"}, + }, + "required": ["path", "old_text", "new_text"], + }, + }, + { + "name": "background_run", + "description": "Run command in background thread. Returns task_id immediately.", + "input_schema": { + "type": "object", + "properties": {"command": {"type": "string"}}, + "required": ["command"], + }, + }, + { + "name": "check_background", + "description": "Check background task status. Omit task_id to list all.", + "input_schema": { + "type": "object", + "properties": {"task_id": {"type": "string"}}, + }, + }, ] def agent_loop(messages: list): while True: # Drain background notifications and inject as system message before LLM call - notifs = BG.drain_notifications() - if notifs and messages: - notif_text = "\n".join( - f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs - ) - messages.append({"role": "user", "content": f"\n{notif_text}\n"}) + inject_background_results(messages, BG.drain_notifications()) response = client.messages.create( - model=MODEL, system=SYSTEM, messages=messages, - tools=TOOLS, max_tokens=8000, + model=MODEL, + system=SYSTEM, + messages=messages, + tools=TOOLS, + max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": + if BG.has_running_tasks() and inject_background_results( + messages, BG.wait_for_notifications() + ): + continue return results = [] for block in response.content: if block.type == "tool_use": handler = TOOL_HANDLERS.get(block.name) try: - output = handler(**block.input) if handler else f"Unknown tool: {block.name}" + output = ( + handler(**block.input) + if handler + else f"Unknown tool: {block.name}" + ) except Exception as e: output = f"Error: {e}" print(f"> {block.name}:") print(str(output)[:200]) - results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)}) + results.append( + { + "type": "tool_result", + "tool_use_id": block.id, + "content": str(output), + } + ) messages.append({"role": "user", "content": results}) diff --git a/agents/s_full.py b/agents/s_full.py index e2f887b5c..518694bbc 100644 --- a/agents/s_full.py +++ b/agents/s_full.py @@ -66,8 +66,13 @@ POLL_INTERVAL = 5 IDLE_TIMEOUT = 60 -VALID_MSG_TYPES = {"message", "broadcast", "shutdown_request", - "shutdown_response", "plan_approval_response"} +VALID_MSG_TYPES = { + "message", + "broadcast", + "shutdown_request", + "shutdown_response", + "plan_approval_response", +} # === SECTION: base_tools === @@ -77,18 +82,26 @@ def safe_path(p: str) -> Path: raise ValueError(f"Path escapes workspace: {p}") return path + def run_bash(command: str) -> str: dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"] if any(d in command for d in dangerous): return "Error: Dangerous command blocked" try: - r = subprocess.run(command, shell=True, cwd=WORKDIR, - capture_output=True, text=True, timeout=120) + r = subprocess.run( + command, + shell=True, + cwd=WORKDIR, + capture_output=True, + text=True, + timeout=120, + ) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (120s)" + def run_read(path: str, limit: int = None) -> str: try: lines = safe_path(path).read_text().splitlines() @@ -98,6 +111,7 @@ def run_read(path: str, limit: int = None) -> str: except Exception as e: return f"Error: {e}" + def run_write(path: str, content: str) -> str: try: fp = safe_path(path) @@ -107,6 +121,7 @@ def run_write(path: str, content: str) -> str: except Exception as e: return f"Error: {e}" + def run_edit(path: str, old_text: str, new_text: str) -> str: try: fp = safe_path(path) @@ -130,23 +145,33 @@ def update(self, items: list) -> str: content = str(item.get("content", "")).strip() status = str(item.get("status", "pending")).lower() af = str(item.get("activeForm", "")).strip() - if not content: raise ValueError(f"Item {i}: content required") + if not content: + raise ValueError(f"Item {i}: content required") if status not in ("pending", "in_progress", "completed"): raise ValueError(f"Item {i}: invalid status '{status}'") - if not af: raise ValueError(f"Item {i}: activeForm required") - if status == "in_progress": ip += 1 + if not af: + raise ValueError(f"Item {i}: activeForm required") + if status == "in_progress": + ip += 1 validated.append({"content": content, "status": status, "activeForm": af}) - if len(validated) > 20: raise ValueError("Max 20 todos") - if ip > 1: raise ValueError("Only one in_progress allowed") + if len(validated) > 20: + raise ValueError("Max 20 todos") + if ip > 1: + raise ValueError("Only one in_progress allowed") self.items = validated return self.render() def render(self) -> str: - if not self.items: return "No todos." + if not self.items: + return "No todos." lines = [] for item in self.items: - m = {"completed": "[x]", "in_progress": "[>]", "pending": "[ ]"}.get(item["status"], "[?]") - suffix = f" <- {item['activeForm']}" if item["status"] == "in_progress" else "" + m = {"completed": "[x]", "in_progress": "[>]", "pending": "[ ]"}.get( + item["status"], "[?]" + ) + suffix = ( + f" <- {item['activeForm']}" if item["status"] == "in_progress" else "" + ) lines.append(f"{m} {item['content']}{suffix}") done = sum(1 for t in self.items if t["status"] == "completed") lines.append(f"\n({done}/{len(self.items)} completed)") @@ -159,17 +184,52 @@ def has_open_items(self) -> bool: # === SECTION: subagent (s04) === def run_subagent(prompt: str, agent_type: str = "Explore") -> str: sub_tools = [ - {"name": "bash", "description": "Run command.", - "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, - {"name": "read_file", "description": "Read file.", - "input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}}, + { + "name": "bash", + "description": "Run command.", + "input_schema": { + "type": "object", + "properties": {"command": {"type": "string"}}, + "required": ["command"], + }, + }, + { + "name": "read_file", + "description": "Read file.", + "input_schema": { + "type": "object", + "properties": {"path": {"type": "string"}}, + "required": ["path"], + }, + }, ] if agent_type != "Explore": sub_tools += [ - {"name": "write_file", "description": "Write file.", - "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, - {"name": "edit_file", "description": "Edit file.", - "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, + { + "name": "write_file", + "description": "Write file.", + "input_schema": { + "type": "object", + "properties": { + "path": {"type": "string"}, + "content": {"type": "string"}, + }, + "required": ["path", "content"], + }, + }, + { + "name": "edit_file", + "description": "Edit file.", + "input_schema": { + "type": "object", + "properties": { + "path": {"type": "string"}, + "old_text": {"type": "string"}, + "new_text": {"type": "string"}, + }, + "required": ["path", "old_text", "new_text"], + }, + }, ] sub_handlers = { "bash": lambda **kw: run_bash(kw["command"]), @@ -180,7 +240,9 @@ def run_subagent(prompt: str, agent_type: str = "Explore") -> str: sub_msgs = [{"role": "user", "content": prompt}] resp = None for _ in range(30): - resp = client.messages.create(model=MODEL, messages=sub_msgs, tools=sub_tools, max_tokens=8000) + resp = client.messages.create( + model=MODEL, messages=sub_msgs, tools=sub_tools, max_tokens=8000 + ) sub_msgs.append({"role": "assistant", "content": resp.content}) if resp.stop_reason != "tool_use": break @@ -188,10 +250,19 @@ def run_subagent(prompt: str, agent_type: str = "Explore") -> str: for b in resp.content: if b.type == "tool_use": h = sub_handlers.get(b.name, lambda **kw: "Unknown tool") - results.append({"type": "tool_result", "tool_use_id": b.id, "content": str(h(**b.input))[:50000]}) + results.append( + { + "type": "tool_result", + "tool_use_id": b.id, + "content": str(h(**b.input))[:50000], + } + ) sub_msgs.append({"role": "user", "content": results}) if resp: - return "".join(b.text for b in resp.content if hasattr(b, "text")) or "(no summary)" + return ( + "".join(b.text for b in resp.content if hasattr(b, "text")) + or "(no summary)" + ) return "(subagent failed)" @@ -214,19 +285,25 @@ def __init__(self, skills_dir: Path): self.skills[name] = {"meta": meta, "body": body} def descriptions(self) -> str: - if not self.skills: return "(no skills)" - return "\n".join(f" - {n}: {s['meta'].get('description', '-')}" for n, s in self.skills.items()) + if not self.skills: + return "(no skills)" + return "\n".join( + f" - {n}: {s['meta'].get('description', '-')}" + for n, s in self.skills.items() + ) def load(self, name: str) -> str: s = self.skills.get(name) - if not s: return f"Error: Unknown skill '{name}'. Available: {', '.join(self.skills.keys())}" - return f"\n{s['body']}\n" + if not s: + return f"Error: Unknown skill '{name}'. Available: {', '.join(self.skills.keys())}" + return f'\n{s["body"]}\n' # === SECTION: compression (s06) === def estimate_tokens(messages: list) -> int: return len(json.dumps(messages, default=str)) // 4 + def microcompact(messages: list): indices = [] for i, msg in enumerate(messages): @@ -240,6 +317,7 @@ def microcompact(messages: list): if isinstance(part.get("content"), str) and len(part["content"]) > 100: part["content"] = "[cleared]" + def auto_compact(messages: list) -> list: TRANSCRIPT_DIR.mkdir(exist_ok=True) path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl" @@ -249,7 +327,9 @@ def auto_compact(messages: list) -> list: conv_text = json.dumps(messages, default=str)[-80000:] resp = client.messages.create( model=MODEL, - messages=[{"role": "user", "content": f"Summarize for continuity:\n{conv_text}"}], + messages=[ + {"role": "user", "content": f"Summarize for continuity:\n{conv_text}"} + ], max_tokens=2000, ) summary = resp.content[0].text @@ -269,23 +349,35 @@ def _next_id(self) -> int: def _load(self, tid: int) -> dict: p = TASKS_DIR / f"task_{tid}.json" - if not p.exists(): raise ValueError(f"Task {tid} not found") + if not p.exists(): + raise ValueError(f"Task {tid} not found") return json.loads(p.read_text()) def _save(self, task: dict): (TASKS_DIR / f"task_{task['id']}.json").write_text(json.dumps(task, indent=2)) def create(self, subject: str, description: str = "") -> str: - task = {"id": self._next_id(), "subject": subject, "description": description, - "status": "pending", "owner": None, "blockedBy": []} + task = { + "id": self._next_id(), + "subject": subject, + "description": description, + "status": "pending", + "owner": None, + "blockedBy": [], + } self._save(task) return json.dumps(task, indent=2) def get(self, tid: int) -> str: return json.dumps(self._load(tid), indent=2) - def update(self, tid: int, status: str = None, - add_blocked_by: list = None, remove_blocked_by: list = None) -> str: + def update( + self, + tid: int, + status: str = None, + add_blocked_by: list = None, + remove_blocked_by: list = None, + ) -> str: task = self._load(tid) if status: task["status"] = status @@ -301,16 +393,23 @@ def update(self, tid: int, status: str = None, if add_blocked_by: task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by)) if remove_blocked_by: - task["blockedBy"] = [x for x in task["blockedBy"] if x not in remove_blocked_by] + task["blockedBy"] = [ + x for x in task["blockedBy"] if x not in remove_blocked_by + ] self._save(task) return json.dumps(task, indent=2) def list_all(self) -> str: - tasks = [json.loads(f.read_text()) for f in sorted(TASKS_DIR.glob("task_*.json"))] - if not tasks: return "No tasks." + tasks = [ + json.loads(f.read_text()) for f in sorted(TASKS_DIR.glob("task_*.json")) + ] + if not tasks: + return "No tasks." lines = [] for t in tasks: - m = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]") + m = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get( + t["status"], "[?]" + ) owner = f" @{t['owner']}" if t.get("owner") else "" blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else "" lines.append(f"{m} #{t['id']}: {t['subject']}{owner}{blocked}") @@ -333,25 +432,50 @@ def __init__(self): def run(self, command: str, timeout: int = 120) -> str: tid = str(uuid.uuid4())[:8] self.tasks[tid] = {"status": "running", "command": command, "result": None} - threading.Thread(target=self._exec, args=(tid, command, timeout), daemon=True).start() + threading.Thread( + target=self._exec, args=(tid, command, timeout), daemon=True + ).start() return f"Background task {tid} started: {command[:80]}" def _exec(self, tid: str, command: str, timeout: int): try: - r = subprocess.run(command, shell=True, cwd=WORKDIR, - capture_output=True, text=True, timeout=timeout) + r = subprocess.run( + command, + shell=True, + cwd=WORKDIR, + capture_output=True, + text=True, + timeout=timeout, + ) output = (r.stdout + r.stderr).strip()[:50000] - self.tasks[tid].update({"status": "completed", "result": output or "(no output)"}) + self.tasks[tid].update( + {"status": "completed", "result": output or "(no output)"} + ) except Exception as e: self.tasks[tid].update({"status": "error", "result": str(e)}) - self.notifications.put({"task_id": tid, "status": self.tasks[tid]["status"], - "result": self.tasks[tid]["result"][:500]}) + self.notifications.put( + { + "task_id": tid, + "status": self.tasks[tid]["status"], + "result": self.tasks[tid]["result"][:500], + } + ) def check(self, tid: str = None) -> str: if tid: t = self.tasks.get(tid) - return f"[{t['status']}] {t.get('result') or '(running)'}" if t else f"Unknown: {tid}" - return "\n".join(f"{k}: [{v['status']}] {v['command'][:60]}" for k, v in self.tasks.items()) or "No bg tasks." + return ( + f"[{t['status']}] {t.get('result') or '(running)'}" + if t + else f"Unknown: {tid}" + ) + return ( + "\n".join( + f"{k}: [{v['status']}] {v['command'][:60]}" + for k, v in self.tasks.items() + ) + or "No bg tasks." + ) def drain(self) -> list: notifs = [] @@ -359,24 +483,46 @@ def drain(self) -> list: notifs.append(self.notifications.get_nowait()) return notifs + def has_running_tasks(self) -> bool: + return any(task["status"] == "running" for task in self.tasks.values()) + + def wait_for_notifications(self) -> list: + first = self.notifications.get() + notifs = [first] + while not self.notifications.empty(): + notifs.append(self.notifications.get_nowait()) + return notifs + # === SECTION: messaging (s09) === class MessageBus: def __init__(self): INBOX_DIR.mkdir(parents=True, exist_ok=True) - def send(self, sender: str, to: str, content: str, - msg_type: str = "message", extra: dict = None) -> str: - msg = {"type": msg_type, "from": sender, "content": content, - "timestamp": time.time()} - if extra: msg.update(extra) + def send( + self, + sender: str, + to: str, + content: str, + msg_type: str = "message", + extra: dict = None, + ) -> str: + msg = { + "type": msg_type, + "from": sender, + "content": content, + "timestamp": time.time(), + } + if extra: + msg.update(extra) with open(INBOX_DIR / f"{to}.jsonl", "a") as f: f.write(json.dumps(msg) + "\n") return f"Sent {msg_type} to {to}" def read_inbox(self, name: str) -> list: path = INBOX_DIR / f"{name}.jsonl" - if not path.exists(): return [] + if not path.exists(): + return [] msgs = [json.loads(l) for l in path.read_text().strip().splitlines() if l] path.write_text("") return msgs @@ -415,7 +561,8 @@ def _save(self): def _find(self, name: str) -> dict: for m in self.config["members"]: - if m["name"] == name: return m + if m["name"] == name: + return m return None def spawn(self, name: str, role: str, prompt: str) -> str: @@ -429,7 +576,9 @@ def spawn(self, name: str, role: str, prompt: str) -> str: member = {"name": name, "role": role, "status": "working"} self.config["members"].append(member) self._save() - threading.Thread(target=self._loop, args=(name, role, prompt), daemon=True).start() + threading.Thread( + target=self._loop, args=(name, role, prompt), daemon=True + ).start() return f"Spawned '{name}' (role: {role})" def _set_status(self, name: str, status: str): @@ -440,17 +589,81 @@ def _set_status(self, name: str, status: str): def _loop(self, name: str, role: str, prompt: str): team_name = self.config["team_name"] - sys_prompt = (f"You are '{name}', role: {role}, team: {team_name}, at {WORKDIR}. " - f"Use idle when done with current work. You may auto-claim tasks.") + sys_prompt = ( + f"You are '{name}', role: {role}, team: {team_name}, at {WORKDIR}. " + f"Use idle when done with current work. You may auto-claim tasks." + ) messages = [{"role": "user", "content": prompt}] tools = [ - {"name": "bash", "description": "Run command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, - {"name": "read_file", "description": "Read file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}}, - {"name": "write_file", "description": "Write file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, - {"name": "edit_file", "description": "Edit file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, - {"name": "send_message", "description": "Send message.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}}, "required": ["to", "content"]}}, - {"name": "idle", "description": "Signal no more work.", "input_schema": {"type": "object", "properties": {}}}, - {"name": "claim_task", "description": "Claim task by ID.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}}, + { + "name": "bash", + "description": "Run command.", + "input_schema": { + "type": "object", + "properties": {"command": {"type": "string"}}, + "required": ["command"], + }, + }, + { + "name": "read_file", + "description": "Read file.", + "input_schema": { + "type": "object", + "properties": {"path": {"type": "string"}}, + "required": ["path"], + }, + }, + { + "name": "write_file", + "description": "Write file.", + "input_schema": { + "type": "object", + "properties": { + "path": {"type": "string"}, + "content": {"type": "string"}, + }, + "required": ["path", "content"], + }, + }, + { + "name": "edit_file", + "description": "Edit file.", + "input_schema": { + "type": "object", + "properties": { + "path": {"type": "string"}, + "old_text": {"type": "string"}, + "new_text": {"type": "string"}, + }, + "required": ["path", "old_text", "new_text"], + }, + }, + { + "name": "send_message", + "description": "Send message.", + "input_schema": { + "type": "object", + "properties": { + "to": {"type": "string"}, + "content": {"type": "string"}, + }, + "required": ["to", "content"], + }, + }, + { + "name": "idle", + "description": "Signal no more work.", + "input_schema": {"type": "object", "properties": {}}, + }, + { + "name": "claim_task", + "description": "Claim task by ID.", + "input_schema": { + "type": "object", + "properties": {"task_id": {"type": "integer"}}, + "required": ["task_id"], + }, + }, ] while True: # -- WORK PHASE -- @@ -463,8 +676,12 @@ def _loop(self, name: str, role: str, prompt: str): messages.append({"role": "user", "content": json.dumps(msg)}) try: response = client.messages.create( - model=MODEL, system=sys_prompt, messages=messages, - tools=tools, max_tokens=8000) + model=MODEL, + system=sys_prompt, + messages=messages, + tools=tools, + max_tokens=8000, + ) except Exception: self._set_status(name, "shutdown") return @@ -481,15 +698,31 @@ def _loop(self, name: str, role: str, prompt: str): elif block.name == "claim_task": output = self.task_mgr.claim(block.input["task_id"], name) elif block.name == "send_message": - output = self.bus.send(name, block.input["to"], block.input["content"]) + output = self.bus.send( + name, block.input["to"], block.input["content"] + ) else: - dispatch = {"bash": lambda **kw: run_bash(kw["command"]), - "read_file": lambda **kw: run_read(kw["path"]), - "write_file": lambda **kw: run_write(kw["path"], kw["content"]), - "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"])} - output = dispatch.get(block.name, lambda **kw: "Unknown")(**block.input) + dispatch = { + "bash": lambda **kw: run_bash(kw["command"]), + "read_file": lambda **kw: run_read(kw["path"]), + "write_file": lambda **kw: run_write( + kw["path"], kw["content"] + ), + "edit_file": lambda **kw: run_edit( + kw["path"], kw["old_text"], kw["new_text"] + ), + } + output = dispatch.get(block.name, lambda **kw: "Unknown")( + **block.input + ) print(f" [{name}] {block.name}: {str(output)[:120]}") - results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)}) + results.append( + { + "type": "tool_result", + "tool_use_id": block.id, + "content": str(output), + } + ) messages.append({"role": "user", "content": results}) if idle_requested: break @@ -510,19 +743,43 @@ def _loop(self, name: str, role: str, prompt: str): unclaimed = [] for f in sorted(TASKS_DIR.glob("task_*.json")): t = json.loads(f.read_text()) - if t.get("status") == "pending" and not t.get("owner") and not t.get("blockedBy"): + if ( + t.get("status") == "pending" + and not t.get("owner") + and not t.get("blockedBy") + ): unclaimed.append(t) if unclaimed: task = unclaimed[0] self.task_mgr.claim(task["id"], name) # Identity re-injection for compressed contexts if len(messages) <= 3: - messages.insert(0, {"role": "user", "content": - f"You are '{name}', role: {role}, team: {team_name}."}) - messages.insert(1, {"role": "assistant", "content": f"I am {name}. Continuing."}) - messages.append({"role": "user", "content": - f"Task #{task['id']}: {task['subject']}\n{task.get('description', '')}"}) - messages.append({"role": "assistant", "content": f"Claimed task #{task['id']}. Working on it."}) + messages.insert( + 0, + { + "role": "user", + "content": f"You are '{name}', role: {role}, team: {team_name}.", + }, + ) + messages.insert( + 1, + { + "role": "assistant", + "content": f"I am {name}. Continuing.", + }, + ) + messages.append( + { + "role": "user", + "content": f"Task #{task['id']}: {task['subject']}\n{task.get('description', '')}", + } + ) + messages.append( + { + "role": "assistant", + "content": f"Claimed task #{task['id']}. Working on it.", + } + ) resume = True break if not resume: @@ -531,7 +788,8 @@ def _loop(self, name: str, role: str, prompt: str): self._set_status(name, "working") def list_all(self) -> str: - if not self.config["members"]: return "No teammates." + if not self.config["members"]: + return "No teammates." lines = [f"Team: {self.config['team_name']}"] for m in self.config["members"]: lines.append(f" {m['name']} ({m['role']}): {m['status']}") @@ -560,97 +818,328 @@ def member_names(self) -> list: def handle_shutdown_request(teammate: str) -> str: req_id = str(uuid.uuid4())[:8] shutdown_requests[req_id] = {"target": teammate, "status": "pending"} - BUS.send("lead", teammate, "Please shut down.", "shutdown_request", {"request_id": req_id}) + BUS.send( + "lead", + teammate, + "Please shut down.", + "shutdown_request", + {"request_id": req_id}, + ) return f"Shutdown request {req_id} sent to '{teammate}'" + # === SECTION: plan_approval (s10) === def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str: req = plan_requests.get(request_id) - if not req: return f"Error: Unknown plan request_id '{request_id}'" + if not req: + return f"Error: Unknown plan request_id '{request_id}'" req["status"] = "approved" if approve else "rejected" - BUS.send("lead", req["from"], feedback, "plan_approval_response", - {"request_id": request_id, "approve": approve, "feedback": feedback}) + BUS.send( + "lead", + req["from"], + feedback, + "plan_approval_response", + {"request_id": request_id, "approve": approve, "feedback": feedback}, + ) return f"Plan {req['status']} for '{req['from']}'" # === SECTION: tool_dispatch (s02) === TOOL_HANDLERS = { - "bash": lambda **kw: run_bash(kw["command"]), - "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")), - "write_file": lambda **kw: run_write(kw["path"], kw["content"]), - "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]), - "TodoWrite": lambda **kw: TODO.update(kw["items"]), - "task": lambda **kw: run_subagent(kw["prompt"], kw.get("agent_type", "Explore")), - "load_skill": lambda **kw: SKILLS.load(kw["name"]), - "compress": lambda **kw: "Compressing...", - "background_run": lambda **kw: BG.run(kw["command"], kw.get("timeout", 120)), + "bash": lambda **kw: run_bash(kw["command"]), + "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")), + "write_file": lambda **kw: run_write(kw["path"], kw["content"]), + "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]), + "TodoWrite": lambda **kw: TODO.update(kw["items"]), + "task": lambda **kw: run_subagent(kw["prompt"], kw.get("agent_type", "Explore")), + "load_skill": lambda **kw: SKILLS.load(kw["name"]), + "compress": lambda **kw: "Compressing...", + "background_run": lambda **kw: BG.run(kw["command"], kw.get("timeout", 120)), "check_background": lambda **kw: BG.check(kw.get("task_id")), - "task_create": lambda **kw: TASK_MGR.create(kw["subject"], kw.get("description", "")), - "task_get": lambda **kw: TASK_MGR.get(kw["task_id"]), - "task_update": lambda **kw: TASK_MGR.update(kw["task_id"], kw.get("status"), kw.get("add_blocked_by"), kw.get("remove_blocked_by")), - "task_list": lambda **kw: TASK_MGR.list_all(), - "spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]), - "list_teammates": lambda **kw: TEAM.list_all(), - "send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")), - "read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2), - "broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()), + "task_create": lambda **kw: TASK_MGR.create( + kw["subject"], kw.get("description", "") + ), + "task_get": lambda **kw: TASK_MGR.get(kw["task_id"]), + "task_update": lambda **kw: TASK_MGR.update( + kw["task_id"], + kw.get("status"), + kw.get("add_blocked_by"), + kw.get("remove_blocked_by"), + ), + "task_list": lambda **kw: TASK_MGR.list_all(), + "spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]), + "list_teammates": lambda **kw: TEAM.list_all(), + "send_message": lambda **kw: BUS.send( + "lead", kw["to"], kw["content"], kw.get("msg_type", "message") + ), + "read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2), + "broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()), "shutdown_request": lambda **kw: handle_shutdown_request(kw["teammate"]), - "plan_approval": lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], kw.get("feedback", "")), - "idle": lambda **kw: "Lead does not idle.", - "claim_task": lambda **kw: TASK_MGR.claim(kw["task_id"], "lead"), + "plan_approval": lambda **kw: handle_plan_review( + kw["request_id"], kw["approve"], kw.get("feedback", "") + ), + "idle": lambda **kw: "Lead does not idle.", + "claim_task": lambda **kw: TASK_MGR.claim(kw["task_id"], "lead"), } TOOLS = [ - {"name": "bash", "description": "Run a shell command.", - "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}}, - {"name": "read_file", "description": "Read file contents.", - "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}}, - {"name": "write_file", "description": "Write content to file.", - "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}}, - {"name": "edit_file", "description": "Replace exact text in file.", - "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}}, - {"name": "TodoWrite", "description": "Update task tracking list.", - "input_schema": {"type": "object", "properties": {"items": {"type": "array", "items": {"type": "object", "properties": {"content": {"type": "string"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed"]}, "activeForm": {"type": "string"}}, "required": ["content", "status", "activeForm"]}}}, "required": ["items"]}}, - {"name": "task", "description": "Spawn a subagent for isolated exploration or work.", - "input_schema": {"type": "object", "properties": {"prompt": {"type": "string"}, "agent_type": {"type": "string", "enum": ["Explore", "general-purpose"]}}, "required": ["prompt"]}}, - {"name": "load_skill", "description": "Load specialized knowledge by name.", - "input_schema": {"type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"]}}, - {"name": "compress", "description": "Manually compress conversation context.", - "input_schema": {"type": "object", "properties": {}}}, - {"name": "background_run", "description": "Run command in background thread.", - "input_schema": {"type": "object", "properties": {"command": {"type": "string"}, "timeout": {"type": "integer"}}, "required": ["command"]}}, - {"name": "check_background", "description": "Check background task status.", - "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}}}, - {"name": "task_create", "description": "Create a persistent file task.", - "input_schema": {"type": "object", "properties": {"subject": {"type": "string"}, "description": {"type": "string"}}, "required": ["subject"]}}, - {"name": "task_get", "description": "Get task details by ID.", - "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}}, - {"name": "task_update", "description": "Update task status or dependencies.", - "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed", "deleted"]}, "add_blocked_by": {"type": "array", "items": {"type": "integer"}}, "remove_blocked_by": {"type": "array", "items": {"type": "integer"}}}, "required": ["task_id"]}}, - {"name": "task_list", "description": "List all tasks.", - "input_schema": {"type": "object", "properties": {}}}, - {"name": "spawn_teammate", "description": "Spawn a persistent autonomous teammate.", - "input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}}, - {"name": "list_teammates", "description": "List all teammates.", - "input_schema": {"type": "object", "properties": {}}}, - {"name": "send_message", "description": "Send a message to a teammate.", - "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}}, - {"name": "read_inbox", "description": "Read and drain the lead's inbox.", - "input_schema": {"type": "object", "properties": {}}}, - {"name": "broadcast", "description": "Send message to all teammates.", - "input_schema": {"type": "object", "properties": {"content": {"type": "string"}}, "required": ["content"]}}, - {"name": "shutdown_request", "description": "Request a teammate to shut down.", - "input_schema": {"type": "object", "properties": {"teammate": {"type": "string"}}, "required": ["teammate"]}}, - {"name": "plan_approval", "description": "Approve or reject a teammate's plan.", - "input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "feedback": {"type": "string"}}, "required": ["request_id", "approve"]}}, - {"name": "idle", "description": "Enter idle state.", - "input_schema": {"type": "object", "properties": {}}}, - {"name": "claim_task", "description": "Claim a task from the board.", - "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}}, + { + "name": "bash", + "description": "Run a shell command.", + "input_schema": { + "type": "object", + "properties": {"command": {"type": "string"}}, + "required": ["command"], + }, + }, + { + "name": "read_file", + "description": "Read file contents.", + "input_schema": { + "type": "object", + "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, + "required": ["path"], + }, + }, + { + "name": "write_file", + "description": "Write content to file.", + "input_schema": { + "type": "object", + "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, + "required": ["path", "content"], + }, + }, + { + "name": "edit_file", + "description": "Replace exact text in file.", + "input_schema": { + "type": "object", + "properties": { + "path": {"type": "string"}, + "old_text": {"type": "string"}, + "new_text": {"type": "string"}, + }, + "required": ["path", "old_text", "new_text"], + }, + }, + { + "name": "TodoWrite", + "description": "Update task tracking list.", + "input_schema": { + "type": "object", + "properties": { + "items": { + "type": "array", + "items": { + "type": "object", + "properties": { + "content": {"type": "string"}, + "status": { + "type": "string", + "enum": ["pending", "in_progress", "completed"], + }, + "activeForm": {"type": "string"}, + }, + "required": ["content", "status", "activeForm"], + }, + } + }, + "required": ["items"], + }, + }, + { + "name": "task", + "description": "Spawn a subagent for isolated exploration or work.", + "input_schema": { + "type": "object", + "properties": { + "prompt": {"type": "string"}, + "agent_type": { + "type": "string", + "enum": ["Explore", "general-purpose"], + }, + }, + "required": ["prompt"], + }, + }, + { + "name": "load_skill", + "description": "Load specialized knowledge by name.", + "input_schema": { + "type": "object", + "properties": {"name": {"type": "string"}}, + "required": ["name"], + }, + }, + { + "name": "compress", + "description": "Manually compress conversation context.", + "input_schema": {"type": "object", "properties": {}}, + }, + { + "name": "background_run", + "description": "Run command in background thread.", + "input_schema": { + "type": "object", + "properties": { + "command": {"type": "string"}, + "timeout": {"type": "integer"}, + }, + "required": ["command"], + }, + }, + { + "name": "check_background", + "description": "Check background task status.", + "input_schema": { + "type": "object", + "properties": {"task_id": {"type": "string"}}, + }, + }, + { + "name": "task_create", + "description": "Create a persistent file task.", + "input_schema": { + "type": "object", + "properties": { + "subject": {"type": "string"}, + "description": {"type": "string"}, + }, + "required": ["subject"], + }, + }, + { + "name": "task_get", + "description": "Get task details by ID.", + "input_schema": { + "type": "object", + "properties": {"task_id": {"type": "integer"}}, + "required": ["task_id"], + }, + }, + { + "name": "task_update", + "description": "Update task status or dependencies.", + "input_schema": { + "type": "object", + "properties": { + "task_id": {"type": "integer"}, + "status": { + "type": "string", + "enum": ["pending", "in_progress", "completed", "deleted"], + }, + "add_blocked_by": {"type": "array", "items": {"type": "integer"}}, + "remove_blocked_by": {"type": "array", "items": {"type": "integer"}}, + }, + "required": ["task_id"], + }, + }, + { + "name": "task_list", + "description": "List all tasks.", + "input_schema": {"type": "object", "properties": {}}, + }, + { + "name": "spawn_teammate", + "description": "Spawn a persistent autonomous teammate.", + "input_schema": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "role": {"type": "string"}, + "prompt": {"type": "string"}, + }, + "required": ["name", "role", "prompt"], + }, + }, + { + "name": "list_teammates", + "description": "List all teammates.", + "input_schema": {"type": "object", "properties": {}}, + }, + { + "name": "send_message", + "description": "Send a message to a teammate.", + "input_schema": { + "type": "object", + "properties": { + "to": {"type": "string"}, + "content": {"type": "string"}, + "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}, + }, + "required": ["to", "content"], + }, + }, + { + "name": "read_inbox", + "description": "Read and drain the lead's inbox.", + "input_schema": {"type": "object", "properties": {}}, + }, + { + "name": "broadcast", + "description": "Send message to all teammates.", + "input_schema": { + "type": "object", + "properties": {"content": {"type": "string"}}, + "required": ["content"], + }, + }, + { + "name": "shutdown_request", + "description": "Request a teammate to shut down.", + "input_schema": { + "type": "object", + "properties": {"teammate": {"type": "string"}}, + "required": ["teammate"], + }, + }, + { + "name": "plan_approval", + "description": "Approve or reject a teammate's plan.", + "input_schema": { + "type": "object", + "properties": { + "request_id": {"type": "string"}, + "approve": {"type": "boolean"}, + "feedback": {"type": "string"}, + }, + "required": ["request_id", "approve"], + }, + }, + { + "name": "idle", + "description": "Enter idle state.", + "input_schema": {"type": "object", "properties": {}}, + }, + { + "name": "claim_task", + "description": "Claim a task from the board.", + "input_schema": { + "type": "object", + "properties": {"task_id": {"type": "integer"}}, + "required": ["task_id"], + }, + }, ] # === SECTION: agent_loop === +def inject_background_results(messages: list, notifs: list) -> bool: + if notifs: + txt = "\n".join( + f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs + ) + messages.append( + { + "role": "user", + "content": f"\n{txt}\n", + } + ) + return True + return False + + def agent_loop(messages: list): rounds_without_todo = 0 while True: @@ -660,21 +1149,30 @@ def agent_loop(messages: list): print("[auto-compact triggered]") messages[:] = auto_compact(messages) # s08: drain background notifications - notifs = BG.drain() - if notifs: - txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs) - messages.append({"role": "user", "content": f"\n{txt}\n"}) + inject_background_results(messages, BG.drain()) # s10: check lead inbox inbox = BUS.read_inbox("lead") if inbox: - messages.append({"role": "user", "content": f"{json.dumps(inbox, indent=2)}"}) + messages.append( + { + "role": "user", + "content": f"{json.dumps(inbox, indent=2)}", + } + ) # LLM call response = client.messages.create( - model=MODEL, system=SYSTEM, messages=messages, - tools=TOOLS, max_tokens=8000, + model=MODEL, + system=SYSTEM, + messages=messages, + tools=TOOLS, + max_tokens=8000, ) messages.append({"role": "assistant", "content": response.content}) if response.stop_reason != "tool_use": + if BG.has_running_tasks() and inject_background_results( + messages, BG.wait_for_notifications() + ): + continue return # Tool execution results = [] @@ -686,18 +1184,30 @@ def agent_loop(messages: list): manual_compress = True handler = TOOL_HANDLERS.get(block.name) try: - output = handler(**block.input) if handler else f"Unknown tool: {block.name}" + output = ( + handler(**block.input) + if handler + else f"Unknown tool: {block.name}" + ) except Exception as e: output = f"Error: {e}" print(f"> {block.name}:") print(str(output)[:200]) - results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)}) + results.append( + { + "type": "tool_result", + "tool_use_id": block.id, + "content": str(output), + } + ) if block.name == "TodoWrite": used_todo = True # s03: nag reminder (only when todo workflow is active) rounds_without_todo = 0 if used_todo else rounds_without_todo + 1 if TODO.has_open_items() and rounds_without_todo >= 3: - results.append({"type": "text", "text": "Update your todos."}) + results.append( + {"type": "text", "text": "Update your todos."} + ) messages.append({"role": "user", "content": results}) # s06: manual compress if manual_compress: diff --git a/tests/test_background_notifications.py b/tests/test_background_notifications.py new file mode 100644 index 000000000..1a763f908 --- /dev/null +++ b/tests/test_background_notifications.py @@ -0,0 +1,135 @@ +import os +import sys +import types +import unittest +from pathlib import Path +from types import SimpleNamespace + + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +os.environ.setdefault("MODEL_ID", "test-model") + +fake_anthropic = types.ModuleType("anthropic") + + +class FakeAnthropic: + def __init__(self, *args, **kwargs): + self.messages = SimpleNamespace(create=None) + + +setattr(fake_anthropic, "Anthropic", FakeAnthropic) +sys.modules.setdefault("anthropic", fake_anthropic) + +fake_dotenv = types.ModuleType("dotenv") +setattr(fake_dotenv, "load_dotenv", lambda *args, **kwargs: None) +sys.modules.setdefault("dotenv", fake_dotenv) + +import agents.s08_background_tasks as s08_background_tasks +import agents.s_full as s_full + + +class FakeMessagesAPI: + def __init__(self, responses): + self._responses = iter(responses) + self.call_count = 0 + + def create(self, **kwargs): + self.call_count += 1 + return next(self._responses) + + +class FakeBackgroundManager: + def __init__(self): + self._running = True + self.wait_called = False + + def drain_notifications(self): + return [] + + def drain(self): + return [] + + def has_running_tasks(self): + return self._running + + def wait_for_notifications(self): + self.wait_called = True + self._running = False + return [{"task_id": "bg-1", "status": "completed", "result": "done"}] + + +class BackgroundNotificationTests(unittest.TestCase): + def test_s08_agent_loop_waits_for_background_results_after_end_turn(self): + messages = [{"role": "user", "content": "Run tests in the background"}] + fake_bg = FakeBackgroundManager() + fake_api = FakeMessagesAPI( + [ + SimpleNamespace( + stop_reason="end_turn", content="Started background work." + ), + SimpleNamespace( + stop_reason="end_turn", content="Background work completed." + ), + ] + ) + original_bg = s08_background_tasks.BG + original_client = s08_background_tasks.client + try: + s08_background_tasks.BG = fake_bg + s08_background_tasks.client = SimpleNamespace(messages=fake_api) + s08_background_tasks.agent_loop(messages) + finally: + s08_background_tasks.BG = original_bg + s08_background_tasks.client = original_client + + self.assertTrue(fake_bg.wait_called) + self.assertEqual(fake_api.call_count, 2) + self.assertTrue( + any( + message["role"] == "user" + and isinstance(message["content"], str) + and "" in message["content"] + for message in messages + ) + ) + + def test_s_full_agent_loop_waits_for_background_results_after_end_turn(self): + messages = [{"role": "user", "content": "Run tests in the background"}] + fake_bg = FakeBackgroundManager() + fake_api = FakeMessagesAPI( + [ + SimpleNamespace( + stop_reason="end_turn", content="Started background work." + ), + SimpleNamespace( + stop_reason="end_turn", content="Background work completed." + ), + ] + ) + original_bg = s_full.BG + original_client = s_full.client + try: + s_full.BG = fake_bg + s_full.client = SimpleNamespace(messages=fake_api) + s_full.agent_loop(messages) + finally: + s_full.BG = original_bg + s_full.client = original_client + + self.assertTrue(fake_bg.wait_called) + self.assertEqual(fake_api.call_count, 2) + self.assertTrue( + any( + message["role"] == "user" + and isinstance(message["content"], str) + and "" in message["content"] + for message in messages + ) + ) + + +if __name__ == "__main__": + unittest.main()