diff --git a/agents/s08_background_tasks.py b/agents/s08_background_tasks.py
index 390a77780..f3ce5597e 100644
--- a/agents/s08_background_tasks.py
+++ b/agents/s08_background_tasks.py
@@ -52,6 +52,7 @@ def __init__(self):
self.tasks = {} # task_id -> {status, result, command}
self._notification_queue = [] # completed task results
self._lock = threading.Lock()
+ self._condition = threading.Condition(self._lock)
def run(self, command: str) -> str:
"""Start a background thread, return task_id immediately."""
@@ -67,8 +68,12 @@ def _execute(self, task_id: str, command: str):
"""Thread target: run subprocess, capture output, push to queue."""
try:
r = subprocess.run(
- command, shell=True, cwd=WORKDIR,
- capture_output=True, text=True, timeout=300
+ command,
+ shell=True,
+ cwd=WORKDIR,
+ capture_output=True,
+ text=True,
+ timeout=300,
)
output = (r.stdout + r.stderr).strip()[:50000]
status = "completed"
@@ -80,13 +85,16 @@ def _execute(self, task_id: str, command: str):
status = "error"
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output or "(no output)"
- with self._lock:
- self._notification_queue.append({
- "task_id": task_id,
- "status": status,
- "command": command[:80],
- "result": (output or "(no output)")[:500],
- })
+ with self._condition:
+ self._notification_queue.append(
+ {
+ "task_id": task_id,
+ "status": status,
+ "command": command[:80],
+ "result": (output or "(no output)")[:500],
+ }
+ )
+ self._condition.notify_all()
def check(self, task_id: str = None) -> str:
"""Check status of one task or list all."""
@@ -94,7 +102,9 @@ def check(self, task_id: str = None) -> str:
t = self.tasks.get(task_id)
if not t:
return f"Error: Unknown task {task_id}"
- return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
+ return (
+ f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
+ )
lines = []
for tid, t in self.tasks.items():
lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
@@ -102,7 +112,22 @@ def check(self, task_id: str = None) -> str:
def drain_notifications(self) -> list:
"""Return and clear all pending completion notifications."""
- with self._lock:
+ with self._condition:
+ notifs = list(self._notification_queue)
+ self._notification_queue.clear()
+ return notifs
+
+ def _has_running_tasks_locked(self) -> bool:
+ return any(task["status"] == "running" for task in self.tasks.values())
+
+ def has_running_tasks(self) -> bool:
+ with self._condition:
+ return self._has_running_tasks_locked()
+
+ def wait_for_notifications(self) -> list:
+ with self._condition:
+ while not self._notification_queue and self._has_running_tasks_locked():
+ self._condition.wait()
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs
@@ -111,6 +136,21 @@ def drain_notifications(self) -> list:
BG = BackgroundManager()
+def inject_background_results(messages: list, notifs: list) -> bool:
+ if notifs and messages:
+ notif_text = "\n".join(
+ f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
+ )
+ messages.append(
+ {
+ "role": "user",
+ "content": f"\n{notif_text}\n",
+ }
+ )
+ return True
+ return False
+
+
# -- Tool implementations --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
@@ -118,18 +158,26 @@ def safe_path(p: str) -> Path:
raise ValueError(f"Path escapes workspace: {p}")
return path
+
def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
- r = subprocess.run(command, shell=True, cwd=WORKDIR,
- capture_output=True, text=True, timeout=120)
+ r = subprocess.run(
+ command,
+ shell=True,
+ cwd=WORKDIR,
+ capture_output=True,
+ text=True,
+ timeout=120,
+ )
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
+
def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
@@ -139,6 +187,7 @@ def run_read(path: str, limit: int = None) -> str:
except Exception as e:
return f"Error: {e}"
+
def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
@@ -148,6 +197,7 @@ def run_write(path: str, content: str) -> str:
except Exception as e:
return f"Error: {e}"
+
def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
@@ -161,57 +211,114 @@ def run_edit(path: str, old_text: str, new_text: str) -> str:
TOOL_HANDLERS = {
- "bash": lambda **kw: run_bash(kw["command"]),
- "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
- "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
- "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
- "background_run": lambda **kw: BG.run(kw["command"]),
+ "bash": lambda **kw: run_bash(kw["command"]),
+ "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
+ "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
+ "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
+ "background_run": lambda **kw: BG.run(kw["command"]),
"check_background": lambda **kw: BG.check(kw.get("task_id")),
}
TOOLS = [
- {"name": "bash", "description": "Run a shell command (blocking).",
- "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
- {"name": "read_file", "description": "Read file contents.",
- "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
- {"name": "write_file", "description": "Write content to file.",
- "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
- {"name": "edit_file", "description": "Replace exact text in file.",
- "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
- {"name": "background_run", "description": "Run command in background thread. Returns task_id immediately.",
- "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
- {"name": "check_background", "description": "Check background task status. Omit task_id to list all.",
- "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}}},
+ {
+ "name": "bash",
+ "description": "Run a shell command (blocking).",
+ "input_schema": {
+ "type": "object",
+ "properties": {"command": {"type": "string"}},
+ "required": ["command"],
+ },
+ },
+ {
+ "name": "read_file",
+ "description": "Read file contents.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}},
+ "required": ["path"],
+ },
+ },
+ {
+ "name": "write_file",
+ "description": "Write content to file.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"path": {"type": "string"}, "content": {"type": "string"}},
+ "required": ["path", "content"],
+ },
+ },
+ {
+ "name": "edit_file",
+ "description": "Replace exact text in file.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "path": {"type": "string"},
+ "old_text": {"type": "string"},
+ "new_text": {"type": "string"},
+ },
+ "required": ["path", "old_text", "new_text"],
+ },
+ },
+ {
+ "name": "background_run",
+ "description": "Run command in background thread. Returns task_id immediately.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"command": {"type": "string"}},
+ "required": ["command"],
+ },
+ },
+ {
+ "name": "check_background",
+ "description": "Check background task status. Omit task_id to list all.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"task_id": {"type": "string"}},
+ },
+ },
]
def agent_loop(messages: list):
while True:
# Drain background notifications and inject as system message before LLM call
- notifs = BG.drain_notifications()
- if notifs and messages:
- notif_text = "\n".join(
- f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
- )
- messages.append({"role": "user", "content": f"\n{notif_text}\n"})
+ inject_background_results(messages, BG.drain_notifications())
response = client.messages.create(
- model=MODEL, system=SYSTEM, messages=messages,
- tools=TOOLS, max_tokens=8000,
+ model=MODEL,
+ system=SYSTEM,
+ messages=messages,
+ tools=TOOLS,
+ max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
+ if BG.has_running_tasks() and inject_background_results(
+ messages, BG.wait_for_notifications()
+ ):
+ continue
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
- output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
+ output = (
+ handler(**block.input)
+ if handler
+ else f"Unknown tool: {block.name}"
+ )
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}:")
print(str(output)[:200])
- results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
+ results.append(
+ {
+ "type": "tool_result",
+ "tool_use_id": block.id,
+ "content": str(output),
+ }
+ )
messages.append({"role": "user", "content": results})
diff --git a/agents/s_full.py b/agents/s_full.py
index e2f887b5c..518694bbc 100644
--- a/agents/s_full.py
+++ b/agents/s_full.py
@@ -66,8 +66,13 @@
POLL_INTERVAL = 5
IDLE_TIMEOUT = 60
-VALID_MSG_TYPES = {"message", "broadcast", "shutdown_request",
- "shutdown_response", "plan_approval_response"}
+VALID_MSG_TYPES = {
+ "message",
+ "broadcast",
+ "shutdown_request",
+ "shutdown_response",
+ "plan_approval_response",
+}
# === SECTION: base_tools ===
@@ -77,18 +82,26 @@ def safe_path(p: str) -> Path:
raise ValueError(f"Path escapes workspace: {p}")
return path
+
def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
- r = subprocess.run(command, shell=True, cwd=WORKDIR,
- capture_output=True, text=True, timeout=120)
+ r = subprocess.run(
+ command,
+ shell=True,
+ cwd=WORKDIR,
+ capture_output=True,
+ text=True,
+ timeout=120,
+ )
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
+
def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
@@ -98,6 +111,7 @@ def run_read(path: str, limit: int = None) -> str:
except Exception as e:
return f"Error: {e}"
+
def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
@@ -107,6 +121,7 @@ def run_write(path: str, content: str) -> str:
except Exception as e:
return f"Error: {e}"
+
def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
@@ -130,23 +145,33 @@ def update(self, items: list) -> str:
content = str(item.get("content", "")).strip()
status = str(item.get("status", "pending")).lower()
af = str(item.get("activeForm", "")).strip()
- if not content: raise ValueError(f"Item {i}: content required")
+ if not content:
+ raise ValueError(f"Item {i}: content required")
if status not in ("pending", "in_progress", "completed"):
raise ValueError(f"Item {i}: invalid status '{status}'")
- if not af: raise ValueError(f"Item {i}: activeForm required")
- if status == "in_progress": ip += 1
+ if not af:
+ raise ValueError(f"Item {i}: activeForm required")
+ if status == "in_progress":
+ ip += 1
validated.append({"content": content, "status": status, "activeForm": af})
- if len(validated) > 20: raise ValueError("Max 20 todos")
- if ip > 1: raise ValueError("Only one in_progress allowed")
+ if len(validated) > 20:
+ raise ValueError("Max 20 todos")
+ if ip > 1:
+ raise ValueError("Only one in_progress allowed")
self.items = validated
return self.render()
def render(self) -> str:
- if not self.items: return "No todos."
+ if not self.items:
+ return "No todos."
lines = []
for item in self.items:
- m = {"completed": "[x]", "in_progress": "[>]", "pending": "[ ]"}.get(item["status"], "[?]")
- suffix = f" <- {item['activeForm']}" if item["status"] == "in_progress" else ""
+ m = {"completed": "[x]", "in_progress": "[>]", "pending": "[ ]"}.get(
+ item["status"], "[?]"
+ )
+ suffix = (
+ f" <- {item['activeForm']}" if item["status"] == "in_progress" else ""
+ )
lines.append(f"{m} {item['content']}{suffix}")
done = sum(1 for t in self.items if t["status"] == "completed")
lines.append(f"\n({done}/{len(self.items)} completed)")
@@ -159,17 +184,52 @@ def has_open_items(self) -> bool:
# === SECTION: subagent (s04) ===
def run_subagent(prompt: str, agent_type: str = "Explore") -> str:
sub_tools = [
- {"name": "bash", "description": "Run command.",
- "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
- {"name": "read_file", "description": "Read file.",
- "input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}},
+ {
+ "name": "bash",
+ "description": "Run command.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"command": {"type": "string"}},
+ "required": ["command"],
+ },
+ },
+ {
+ "name": "read_file",
+ "description": "Read file.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"path": {"type": "string"}},
+ "required": ["path"],
+ },
+ },
]
if agent_type != "Explore":
sub_tools += [
- {"name": "write_file", "description": "Write file.",
- "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
- {"name": "edit_file", "description": "Edit file.",
- "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
+ {
+ "name": "write_file",
+ "description": "Write file.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "path": {"type": "string"},
+ "content": {"type": "string"},
+ },
+ "required": ["path", "content"],
+ },
+ },
+ {
+ "name": "edit_file",
+ "description": "Edit file.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "path": {"type": "string"},
+ "old_text": {"type": "string"},
+ "new_text": {"type": "string"},
+ },
+ "required": ["path", "old_text", "new_text"],
+ },
+ },
]
sub_handlers = {
"bash": lambda **kw: run_bash(kw["command"]),
@@ -180,7 +240,9 @@ def run_subagent(prompt: str, agent_type: str = "Explore") -> str:
sub_msgs = [{"role": "user", "content": prompt}]
resp = None
for _ in range(30):
- resp = client.messages.create(model=MODEL, messages=sub_msgs, tools=sub_tools, max_tokens=8000)
+ resp = client.messages.create(
+ model=MODEL, messages=sub_msgs, tools=sub_tools, max_tokens=8000
+ )
sub_msgs.append({"role": "assistant", "content": resp.content})
if resp.stop_reason != "tool_use":
break
@@ -188,10 +250,19 @@ def run_subagent(prompt: str, agent_type: str = "Explore") -> str:
for b in resp.content:
if b.type == "tool_use":
h = sub_handlers.get(b.name, lambda **kw: "Unknown tool")
- results.append({"type": "tool_result", "tool_use_id": b.id, "content": str(h(**b.input))[:50000]})
+ results.append(
+ {
+ "type": "tool_result",
+ "tool_use_id": b.id,
+ "content": str(h(**b.input))[:50000],
+ }
+ )
sub_msgs.append({"role": "user", "content": results})
if resp:
- return "".join(b.text for b in resp.content if hasattr(b, "text")) or "(no summary)"
+ return (
+ "".join(b.text for b in resp.content if hasattr(b, "text"))
+ or "(no summary)"
+ )
return "(subagent failed)"
@@ -214,19 +285,25 @@ def __init__(self, skills_dir: Path):
self.skills[name] = {"meta": meta, "body": body}
def descriptions(self) -> str:
- if not self.skills: return "(no skills)"
- return "\n".join(f" - {n}: {s['meta'].get('description', '-')}" for n, s in self.skills.items())
+ if not self.skills:
+ return "(no skills)"
+ return "\n".join(
+ f" - {n}: {s['meta'].get('description', '-')}"
+ for n, s in self.skills.items()
+ )
def load(self, name: str) -> str:
s = self.skills.get(name)
- if not s: return f"Error: Unknown skill '{name}'. Available: {', '.join(self.skills.keys())}"
- return f"\n{s['body']}\n"
+ if not s:
+ return f"Error: Unknown skill '{name}'. Available: {', '.join(self.skills.keys())}"
+ return f'\n{s["body"]}\n'
# === SECTION: compression (s06) ===
def estimate_tokens(messages: list) -> int:
return len(json.dumps(messages, default=str)) // 4
+
def microcompact(messages: list):
indices = []
for i, msg in enumerate(messages):
@@ -240,6 +317,7 @@ def microcompact(messages: list):
if isinstance(part.get("content"), str) and len(part["content"]) > 100:
part["content"] = "[cleared]"
+
def auto_compact(messages: list) -> list:
TRANSCRIPT_DIR.mkdir(exist_ok=True)
path = TRANSCRIPT_DIR / f"transcript_{int(time.time())}.jsonl"
@@ -249,7 +327,9 @@ def auto_compact(messages: list) -> list:
conv_text = json.dumps(messages, default=str)[-80000:]
resp = client.messages.create(
model=MODEL,
- messages=[{"role": "user", "content": f"Summarize for continuity:\n{conv_text}"}],
+ messages=[
+ {"role": "user", "content": f"Summarize for continuity:\n{conv_text}"}
+ ],
max_tokens=2000,
)
summary = resp.content[0].text
@@ -269,23 +349,35 @@ def _next_id(self) -> int:
def _load(self, tid: int) -> dict:
p = TASKS_DIR / f"task_{tid}.json"
- if not p.exists(): raise ValueError(f"Task {tid} not found")
+ if not p.exists():
+ raise ValueError(f"Task {tid} not found")
return json.loads(p.read_text())
def _save(self, task: dict):
(TASKS_DIR / f"task_{task['id']}.json").write_text(json.dumps(task, indent=2))
def create(self, subject: str, description: str = "") -> str:
- task = {"id": self._next_id(), "subject": subject, "description": description,
- "status": "pending", "owner": None, "blockedBy": []}
+ task = {
+ "id": self._next_id(),
+ "subject": subject,
+ "description": description,
+ "status": "pending",
+ "owner": None,
+ "blockedBy": [],
+ }
self._save(task)
return json.dumps(task, indent=2)
def get(self, tid: int) -> str:
return json.dumps(self._load(tid), indent=2)
- def update(self, tid: int, status: str = None,
- add_blocked_by: list = None, remove_blocked_by: list = None) -> str:
+ def update(
+ self,
+ tid: int,
+ status: str = None,
+ add_blocked_by: list = None,
+ remove_blocked_by: list = None,
+ ) -> str:
task = self._load(tid)
if status:
task["status"] = status
@@ -301,16 +393,23 @@ def update(self, tid: int, status: str = None,
if add_blocked_by:
task["blockedBy"] = list(set(task["blockedBy"] + add_blocked_by))
if remove_blocked_by:
- task["blockedBy"] = [x for x in task["blockedBy"] if x not in remove_blocked_by]
+ task["blockedBy"] = [
+ x for x in task["blockedBy"] if x not in remove_blocked_by
+ ]
self._save(task)
return json.dumps(task, indent=2)
def list_all(self) -> str:
- tasks = [json.loads(f.read_text()) for f in sorted(TASKS_DIR.glob("task_*.json"))]
- if not tasks: return "No tasks."
+ tasks = [
+ json.loads(f.read_text()) for f in sorted(TASKS_DIR.glob("task_*.json"))
+ ]
+ if not tasks:
+ return "No tasks."
lines = []
for t in tasks:
- m = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(t["status"], "[?]")
+ m = {"pending": "[ ]", "in_progress": "[>]", "completed": "[x]"}.get(
+ t["status"], "[?]"
+ )
owner = f" @{t['owner']}" if t.get("owner") else ""
blocked = f" (blocked by: {t['blockedBy']})" if t.get("blockedBy") else ""
lines.append(f"{m} #{t['id']}: {t['subject']}{owner}{blocked}")
@@ -333,25 +432,50 @@ def __init__(self):
def run(self, command: str, timeout: int = 120) -> str:
tid = str(uuid.uuid4())[:8]
self.tasks[tid] = {"status": "running", "command": command, "result": None}
- threading.Thread(target=self._exec, args=(tid, command, timeout), daemon=True).start()
+ threading.Thread(
+ target=self._exec, args=(tid, command, timeout), daemon=True
+ ).start()
return f"Background task {tid} started: {command[:80]}"
def _exec(self, tid: str, command: str, timeout: int):
try:
- r = subprocess.run(command, shell=True, cwd=WORKDIR,
- capture_output=True, text=True, timeout=timeout)
+ r = subprocess.run(
+ command,
+ shell=True,
+ cwd=WORKDIR,
+ capture_output=True,
+ text=True,
+ timeout=timeout,
+ )
output = (r.stdout + r.stderr).strip()[:50000]
- self.tasks[tid].update({"status": "completed", "result": output or "(no output)"})
+ self.tasks[tid].update(
+ {"status": "completed", "result": output or "(no output)"}
+ )
except Exception as e:
self.tasks[tid].update({"status": "error", "result": str(e)})
- self.notifications.put({"task_id": tid, "status": self.tasks[tid]["status"],
- "result": self.tasks[tid]["result"][:500]})
+ self.notifications.put(
+ {
+ "task_id": tid,
+ "status": self.tasks[tid]["status"],
+ "result": self.tasks[tid]["result"][:500],
+ }
+ )
def check(self, tid: str = None) -> str:
if tid:
t = self.tasks.get(tid)
- return f"[{t['status']}] {t.get('result') or '(running)'}" if t else f"Unknown: {tid}"
- return "\n".join(f"{k}: [{v['status']}] {v['command'][:60]}" for k, v in self.tasks.items()) or "No bg tasks."
+ return (
+ f"[{t['status']}] {t.get('result') or '(running)'}"
+ if t
+ else f"Unknown: {tid}"
+ )
+ return (
+ "\n".join(
+ f"{k}: [{v['status']}] {v['command'][:60]}"
+ for k, v in self.tasks.items()
+ )
+ or "No bg tasks."
+ )
def drain(self) -> list:
notifs = []
@@ -359,24 +483,46 @@ def drain(self) -> list:
notifs.append(self.notifications.get_nowait())
return notifs
+ def has_running_tasks(self) -> bool:
+ return any(task["status"] == "running" for task in self.tasks.values())
+
+ def wait_for_notifications(self) -> list:
+ first = self.notifications.get()
+ notifs = [first]
+ while not self.notifications.empty():
+ notifs.append(self.notifications.get_nowait())
+ return notifs
+
# === SECTION: messaging (s09) ===
class MessageBus:
def __init__(self):
INBOX_DIR.mkdir(parents=True, exist_ok=True)
- def send(self, sender: str, to: str, content: str,
- msg_type: str = "message", extra: dict = None) -> str:
- msg = {"type": msg_type, "from": sender, "content": content,
- "timestamp": time.time()}
- if extra: msg.update(extra)
+ def send(
+ self,
+ sender: str,
+ to: str,
+ content: str,
+ msg_type: str = "message",
+ extra: dict = None,
+ ) -> str:
+ msg = {
+ "type": msg_type,
+ "from": sender,
+ "content": content,
+ "timestamp": time.time(),
+ }
+ if extra:
+ msg.update(extra)
with open(INBOX_DIR / f"{to}.jsonl", "a") as f:
f.write(json.dumps(msg) + "\n")
return f"Sent {msg_type} to {to}"
def read_inbox(self, name: str) -> list:
path = INBOX_DIR / f"{name}.jsonl"
- if not path.exists(): return []
+ if not path.exists():
+ return []
msgs = [json.loads(l) for l in path.read_text().strip().splitlines() if l]
path.write_text("")
return msgs
@@ -415,7 +561,8 @@ def _save(self):
def _find(self, name: str) -> dict:
for m in self.config["members"]:
- if m["name"] == name: return m
+ if m["name"] == name:
+ return m
return None
def spawn(self, name: str, role: str, prompt: str) -> str:
@@ -429,7 +576,9 @@ def spawn(self, name: str, role: str, prompt: str) -> str:
member = {"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
self._save()
- threading.Thread(target=self._loop, args=(name, role, prompt), daemon=True).start()
+ threading.Thread(
+ target=self._loop, args=(name, role, prompt), daemon=True
+ ).start()
return f"Spawned '{name}' (role: {role})"
def _set_status(self, name: str, status: str):
@@ -440,17 +589,81 @@ def _set_status(self, name: str, status: str):
def _loop(self, name: str, role: str, prompt: str):
team_name = self.config["team_name"]
- sys_prompt = (f"You are '{name}', role: {role}, team: {team_name}, at {WORKDIR}. "
- f"Use idle when done with current work. You may auto-claim tasks.")
+ sys_prompt = (
+ f"You are '{name}', role: {role}, team: {team_name}, at {WORKDIR}. "
+ f"Use idle when done with current work. You may auto-claim tasks."
+ )
messages = [{"role": "user", "content": prompt}]
tools = [
- {"name": "bash", "description": "Run command.", "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
- {"name": "read_file", "description": "Read file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"]}},
- {"name": "write_file", "description": "Write file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
- {"name": "edit_file", "description": "Edit file.", "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
- {"name": "send_message", "description": "Send message.", "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}}, "required": ["to", "content"]}},
- {"name": "idle", "description": "Signal no more work.", "input_schema": {"type": "object", "properties": {}}},
- {"name": "claim_task", "description": "Claim task by ID.", "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}},
+ {
+ "name": "bash",
+ "description": "Run command.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"command": {"type": "string"}},
+ "required": ["command"],
+ },
+ },
+ {
+ "name": "read_file",
+ "description": "Read file.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"path": {"type": "string"}},
+ "required": ["path"],
+ },
+ },
+ {
+ "name": "write_file",
+ "description": "Write file.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "path": {"type": "string"},
+ "content": {"type": "string"},
+ },
+ "required": ["path", "content"],
+ },
+ },
+ {
+ "name": "edit_file",
+ "description": "Edit file.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "path": {"type": "string"},
+ "old_text": {"type": "string"},
+ "new_text": {"type": "string"},
+ },
+ "required": ["path", "old_text", "new_text"],
+ },
+ },
+ {
+ "name": "send_message",
+ "description": "Send message.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "to": {"type": "string"},
+ "content": {"type": "string"},
+ },
+ "required": ["to", "content"],
+ },
+ },
+ {
+ "name": "idle",
+ "description": "Signal no more work.",
+ "input_schema": {"type": "object", "properties": {}},
+ },
+ {
+ "name": "claim_task",
+ "description": "Claim task by ID.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"task_id": {"type": "integer"}},
+ "required": ["task_id"],
+ },
+ },
]
while True:
# -- WORK PHASE --
@@ -463,8 +676,12 @@ def _loop(self, name: str, role: str, prompt: str):
messages.append({"role": "user", "content": json.dumps(msg)})
try:
response = client.messages.create(
- model=MODEL, system=sys_prompt, messages=messages,
- tools=tools, max_tokens=8000)
+ model=MODEL,
+ system=sys_prompt,
+ messages=messages,
+ tools=tools,
+ max_tokens=8000,
+ )
except Exception:
self._set_status(name, "shutdown")
return
@@ -481,15 +698,31 @@ def _loop(self, name: str, role: str, prompt: str):
elif block.name == "claim_task":
output = self.task_mgr.claim(block.input["task_id"], name)
elif block.name == "send_message":
- output = self.bus.send(name, block.input["to"], block.input["content"])
+ output = self.bus.send(
+ name, block.input["to"], block.input["content"]
+ )
else:
- dispatch = {"bash": lambda **kw: run_bash(kw["command"]),
- "read_file": lambda **kw: run_read(kw["path"]),
- "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
- "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"])}
- output = dispatch.get(block.name, lambda **kw: "Unknown")(**block.input)
+ dispatch = {
+ "bash": lambda **kw: run_bash(kw["command"]),
+ "read_file": lambda **kw: run_read(kw["path"]),
+ "write_file": lambda **kw: run_write(
+ kw["path"], kw["content"]
+ ),
+ "edit_file": lambda **kw: run_edit(
+ kw["path"], kw["old_text"], kw["new_text"]
+ ),
+ }
+ output = dispatch.get(block.name, lambda **kw: "Unknown")(
+ **block.input
+ )
print(f" [{name}] {block.name}: {str(output)[:120]}")
- results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
+ results.append(
+ {
+ "type": "tool_result",
+ "tool_use_id": block.id,
+ "content": str(output),
+ }
+ )
messages.append({"role": "user", "content": results})
if idle_requested:
break
@@ -510,19 +743,43 @@ def _loop(self, name: str, role: str, prompt: str):
unclaimed = []
for f in sorted(TASKS_DIR.glob("task_*.json")):
t = json.loads(f.read_text())
- if t.get("status") == "pending" and not t.get("owner") and not t.get("blockedBy"):
+ if (
+ t.get("status") == "pending"
+ and not t.get("owner")
+ and not t.get("blockedBy")
+ ):
unclaimed.append(t)
if unclaimed:
task = unclaimed[0]
self.task_mgr.claim(task["id"], name)
# Identity re-injection for compressed contexts
if len(messages) <= 3:
- messages.insert(0, {"role": "user", "content":
- f"You are '{name}', role: {role}, team: {team_name}."})
- messages.insert(1, {"role": "assistant", "content": f"I am {name}. Continuing."})
- messages.append({"role": "user", "content":
- f"Task #{task['id']}: {task['subject']}\n{task.get('description', '')}"})
- messages.append({"role": "assistant", "content": f"Claimed task #{task['id']}. Working on it."})
+ messages.insert(
+ 0,
+ {
+ "role": "user",
+ "content": f"You are '{name}', role: {role}, team: {team_name}.",
+ },
+ )
+ messages.insert(
+ 1,
+ {
+ "role": "assistant",
+ "content": f"I am {name}. Continuing.",
+ },
+ )
+ messages.append(
+ {
+ "role": "user",
+ "content": f"Task #{task['id']}: {task['subject']}\n{task.get('description', '')}",
+ }
+ )
+ messages.append(
+ {
+ "role": "assistant",
+ "content": f"Claimed task #{task['id']}. Working on it.",
+ }
+ )
resume = True
break
if not resume:
@@ -531,7 +788,8 @@ def _loop(self, name: str, role: str, prompt: str):
self._set_status(name, "working")
def list_all(self) -> str:
- if not self.config["members"]: return "No teammates."
+ if not self.config["members"]:
+ return "No teammates."
lines = [f"Team: {self.config['team_name']}"]
for m in self.config["members"]:
lines.append(f" {m['name']} ({m['role']}): {m['status']}")
@@ -560,97 +818,328 @@ def member_names(self) -> list:
def handle_shutdown_request(teammate: str) -> str:
req_id = str(uuid.uuid4())[:8]
shutdown_requests[req_id] = {"target": teammate, "status": "pending"}
- BUS.send("lead", teammate, "Please shut down.", "shutdown_request", {"request_id": req_id})
+ BUS.send(
+ "lead",
+ teammate,
+ "Please shut down.",
+ "shutdown_request",
+ {"request_id": req_id},
+ )
return f"Shutdown request {req_id} sent to '{teammate}'"
+
# === SECTION: plan_approval (s10) ===
def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str:
req = plan_requests.get(request_id)
- if not req: return f"Error: Unknown plan request_id '{request_id}'"
+ if not req:
+ return f"Error: Unknown plan request_id '{request_id}'"
req["status"] = "approved" if approve else "rejected"
- BUS.send("lead", req["from"], feedback, "plan_approval_response",
- {"request_id": request_id, "approve": approve, "feedback": feedback})
+ BUS.send(
+ "lead",
+ req["from"],
+ feedback,
+ "plan_approval_response",
+ {"request_id": request_id, "approve": approve, "feedback": feedback},
+ )
return f"Plan {req['status']} for '{req['from']}'"
# === SECTION: tool_dispatch (s02) ===
TOOL_HANDLERS = {
- "bash": lambda **kw: run_bash(kw["command"]),
- "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
- "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
- "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
- "TodoWrite": lambda **kw: TODO.update(kw["items"]),
- "task": lambda **kw: run_subagent(kw["prompt"], kw.get("agent_type", "Explore")),
- "load_skill": lambda **kw: SKILLS.load(kw["name"]),
- "compress": lambda **kw: "Compressing...",
- "background_run": lambda **kw: BG.run(kw["command"], kw.get("timeout", 120)),
+ "bash": lambda **kw: run_bash(kw["command"]),
+ "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
+ "write_file": lambda **kw: run_write(kw["path"], kw["content"]),
+ "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
+ "TodoWrite": lambda **kw: TODO.update(kw["items"]),
+ "task": lambda **kw: run_subagent(kw["prompt"], kw.get("agent_type", "Explore")),
+ "load_skill": lambda **kw: SKILLS.load(kw["name"]),
+ "compress": lambda **kw: "Compressing...",
+ "background_run": lambda **kw: BG.run(kw["command"], kw.get("timeout", 120)),
"check_background": lambda **kw: BG.check(kw.get("task_id")),
- "task_create": lambda **kw: TASK_MGR.create(kw["subject"], kw.get("description", "")),
- "task_get": lambda **kw: TASK_MGR.get(kw["task_id"]),
- "task_update": lambda **kw: TASK_MGR.update(kw["task_id"], kw.get("status"), kw.get("add_blocked_by"), kw.get("remove_blocked_by")),
- "task_list": lambda **kw: TASK_MGR.list_all(),
- "spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]),
- "list_teammates": lambda **kw: TEAM.list_all(),
- "send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], kw.get("msg_type", "message")),
- "read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2),
- "broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()),
+ "task_create": lambda **kw: TASK_MGR.create(
+ kw["subject"], kw.get("description", "")
+ ),
+ "task_get": lambda **kw: TASK_MGR.get(kw["task_id"]),
+ "task_update": lambda **kw: TASK_MGR.update(
+ kw["task_id"],
+ kw.get("status"),
+ kw.get("add_blocked_by"),
+ kw.get("remove_blocked_by"),
+ ),
+ "task_list": lambda **kw: TASK_MGR.list_all(),
+ "spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]),
+ "list_teammates": lambda **kw: TEAM.list_all(),
+ "send_message": lambda **kw: BUS.send(
+ "lead", kw["to"], kw["content"], kw.get("msg_type", "message")
+ ),
+ "read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2),
+ "broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()),
"shutdown_request": lambda **kw: handle_shutdown_request(kw["teammate"]),
- "plan_approval": lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], kw.get("feedback", "")),
- "idle": lambda **kw: "Lead does not idle.",
- "claim_task": lambda **kw: TASK_MGR.claim(kw["task_id"], "lead"),
+ "plan_approval": lambda **kw: handle_plan_review(
+ kw["request_id"], kw["approve"], kw.get("feedback", "")
+ ),
+ "idle": lambda **kw: "Lead does not idle.",
+ "claim_task": lambda **kw: TASK_MGR.claim(kw["task_id"], "lead"),
}
TOOLS = [
- {"name": "bash", "description": "Run a shell command.",
- "input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
- {"name": "read_file", "description": "Read file contents.",
- "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
- {"name": "write_file", "description": "Write content to file.",
- "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
- {"name": "edit_file", "description": "Replace exact text in file.",
- "input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
- {"name": "TodoWrite", "description": "Update task tracking list.",
- "input_schema": {"type": "object", "properties": {"items": {"type": "array", "items": {"type": "object", "properties": {"content": {"type": "string"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed"]}, "activeForm": {"type": "string"}}, "required": ["content", "status", "activeForm"]}}}, "required": ["items"]}},
- {"name": "task", "description": "Spawn a subagent for isolated exploration or work.",
- "input_schema": {"type": "object", "properties": {"prompt": {"type": "string"}, "agent_type": {"type": "string", "enum": ["Explore", "general-purpose"]}}, "required": ["prompt"]}},
- {"name": "load_skill", "description": "Load specialized knowledge by name.",
- "input_schema": {"type": "object", "properties": {"name": {"type": "string"}}, "required": ["name"]}},
- {"name": "compress", "description": "Manually compress conversation context.",
- "input_schema": {"type": "object", "properties": {}}},
- {"name": "background_run", "description": "Run command in background thread.",
- "input_schema": {"type": "object", "properties": {"command": {"type": "string"}, "timeout": {"type": "integer"}}, "required": ["command"]}},
- {"name": "check_background", "description": "Check background task status.",
- "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}}},
- {"name": "task_create", "description": "Create a persistent file task.",
- "input_schema": {"type": "object", "properties": {"subject": {"type": "string"}, "description": {"type": "string"}}, "required": ["subject"]}},
- {"name": "task_get", "description": "Get task details by ID.",
- "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}},
- {"name": "task_update", "description": "Update task status or dependencies.",
- "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}, "status": {"type": "string", "enum": ["pending", "in_progress", "completed", "deleted"]}, "add_blocked_by": {"type": "array", "items": {"type": "integer"}}, "remove_blocked_by": {"type": "array", "items": {"type": "integer"}}}, "required": ["task_id"]}},
- {"name": "task_list", "description": "List all tasks.",
- "input_schema": {"type": "object", "properties": {}}},
- {"name": "spawn_teammate", "description": "Spawn a persistent autonomous teammate.",
- "input_schema": {"type": "object", "properties": {"name": {"type": "string"}, "role": {"type": "string"}, "prompt": {"type": "string"}}, "required": ["name", "role", "prompt"]}},
- {"name": "list_teammates", "description": "List all teammates.",
- "input_schema": {"type": "object", "properties": {}}},
- {"name": "send_message", "description": "Send a message to a teammate.",
- "input_schema": {"type": "object", "properties": {"to": {"type": "string"}, "content": {"type": "string"}, "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)}}, "required": ["to", "content"]}},
- {"name": "read_inbox", "description": "Read and drain the lead's inbox.",
- "input_schema": {"type": "object", "properties": {}}},
- {"name": "broadcast", "description": "Send message to all teammates.",
- "input_schema": {"type": "object", "properties": {"content": {"type": "string"}}, "required": ["content"]}},
- {"name": "shutdown_request", "description": "Request a teammate to shut down.",
- "input_schema": {"type": "object", "properties": {"teammate": {"type": "string"}}, "required": ["teammate"]}},
- {"name": "plan_approval", "description": "Approve or reject a teammate's plan.",
- "input_schema": {"type": "object", "properties": {"request_id": {"type": "string"}, "approve": {"type": "boolean"}, "feedback": {"type": "string"}}, "required": ["request_id", "approve"]}},
- {"name": "idle", "description": "Enter idle state.",
- "input_schema": {"type": "object", "properties": {}}},
- {"name": "claim_task", "description": "Claim a task from the board.",
- "input_schema": {"type": "object", "properties": {"task_id": {"type": "integer"}}, "required": ["task_id"]}},
+ {
+ "name": "bash",
+ "description": "Run a shell command.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"command": {"type": "string"}},
+ "required": ["command"],
+ },
+ },
+ {
+ "name": "read_file",
+ "description": "Read file contents.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}},
+ "required": ["path"],
+ },
+ },
+ {
+ "name": "write_file",
+ "description": "Write content to file.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"path": {"type": "string"}, "content": {"type": "string"}},
+ "required": ["path", "content"],
+ },
+ },
+ {
+ "name": "edit_file",
+ "description": "Replace exact text in file.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "path": {"type": "string"},
+ "old_text": {"type": "string"},
+ "new_text": {"type": "string"},
+ },
+ "required": ["path", "old_text", "new_text"],
+ },
+ },
+ {
+ "name": "TodoWrite",
+ "description": "Update task tracking list.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "items": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "properties": {
+ "content": {"type": "string"},
+ "status": {
+ "type": "string",
+ "enum": ["pending", "in_progress", "completed"],
+ },
+ "activeForm": {"type": "string"},
+ },
+ "required": ["content", "status", "activeForm"],
+ },
+ }
+ },
+ "required": ["items"],
+ },
+ },
+ {
+ "name": "task",
+ "description": "Spawn a subagent for isolated exploration or work.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "prompt": {"type": "string"},
+ "agent_type": {
+ "type": "string",
+ "enum": ["Explore", "general-purpose"],
+ },
+ },
+ "required": ["prompt"],
+ },
+ },
+ {
+ "name": "load_skill",
+ "description": "Load specialized knowledge by name.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"name": {"type": "string"}},
+ "required": ["name"],
+ },
+ },
+ {
+ "name": "compress",
+ "description": "Manually compress conversation context.",
+ "input_schema": {"type": "object", "properties": {}},
+ },
+ {
+ "name": "background_run",
+ "description": "Run command in background thread.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "command": {"type": "string"},
+ "timeout": {"type": "integer"},
+ },
+ "required": ["command"],
+ },
+ },
+ {
+ "name": "check_background",
+ "description": "Check background task status.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"task_id": {"type": "string"}},
+ },
+ },
+ {
+ "name": "task_create",
+ "description": "Create a persistent file task.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "subject": {"type": "string"},
+ "description": {"type": "string"},
+ },
+ "required": ["subject"],
+ },
+ },
+ {
+ "name": "task_get",
+ "description": "Get task details by ID.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"task_id": {"type": "integer"}},
+ "required": ["task_id"],
+ },
+ },
+ {
+ "name": "task_update",
+ "description": "Update task status or dependencies.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "task_id": {"type": "integer"},
+ "status": {
+ "type": "string",
+ "enum": ["pending", "in_progress", "completed", "deleted"],
+ },
+ "add_blocked_by": {"type": "array", "items": {"type": "integer"}},
+ "remove_blocked_by": {"type": "array", "items": {"type": "integer"}},
+ },
+ "required": ["task_id"],
+ },
+ },
+ {
+ "name": "task_list",
+ "description": "List all tasks.",
+ "input_schema": {"type": "object", "properties": {}},
+ },
+ {
+ "name": "spawn_teammate",
+ "description": "Spawn a persistent autonomous teammate.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "name": {"type": "string"},
+ "role": {"type": "string"},
+ "prompt": {"type": "string"},
+ },
+ "required": ["name", "role", "prompt"],
+ },
+ },
+ {
+ "name": "list_teammates",
+ "description": "List all teammates.",
+ "input_schema": {"type": "object", "properties": {}},
+ },
+ {
+ "name": "send_message",
+ "description": "Send a message to a teammate.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "to": {"type": "string"},
+ "content": {"type": "string"},
+ "msg_type": {"type": "string", "enum": list(VALID_MSG_TYPES)},
+ },
+ "required": ["to", "content"],
+ },
+ },
+ {
+ "name": "read_inbox",
+ "description": "Read and drain the lead's inbox.",
+ "input_schema": {"type": "object", "properties": {}},
+ },
+ {
+ "name": "broadcast",
+ "description": "Send message to all teammates.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"content": {"type": "string"}},
+ "required": ["content"],
+ },
+ },
+ {
+ "name": "shutdown_request",
+ "description": "Request a teammate to shut down.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"teammate": {"type": "string"}},
+ "required": ["teammate"],
+ },
+ },
+ {
+ "name": "plan_approval",
+ "description": "Approve or reject a teammate's plan.",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "request_id": {"type": "string"},
+ "approve": {"type": "boolean"},
+ "feedback": {"type": "string"},
+ },
+ "required": ["request_id", "approve"],
+ },
+ },
+ {
+ "name": "idle",
+ "description": "Enter idle state.",
+ "input_schema": {"type": "object", "properties": {}},
+ },
+ {
+ "name": "claim_task",
+ "description": "Claim a task from the board.",
+ "input_schema": {
+ "type": "object",
+ "properties": {"task_id": {"type": "integer"}},
+ "required": ["task_id"],
+ },
+ },
]
# === SECTION: agent_loop ===
+def inject_background_results(messages: list, notifs: list) -> bool:
+ if notifs:
+ txt = "\n".join(
+ f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
+ )
+ messages.append(
+ {
+ "role": "user",
+ "content": f"\n{txt}\n",
+ }
+ )
+ return True
+ return False
+
+
def agent_loop(messages: list):
rounds_without_todo = 0
while True:
@@ -660,21 +1149,30 @@ def agent_loop(messages: list):
print("[auto-compact triggered]")
messages[:] = auto_compact(messages)
# s08: drain background notifications
- notifs = BG.drain()
- if notifs:
- txt = "\n".join(f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs)
- messages.append({"role": "user", "content": f"\n{txt}\n"})
+ inject_background_results(messages, BG.drain())
# s10: check lead inbox
inbox = BUS.read_inbox("lead")
if inbox:
- messages.append({"role": "user", "content": f"{json.dumps(inbox, indent=2)}"})
+ messages.append(
+ {
+ "role": "user",
+ "content": f"{json.dumps(inbox, indent=2)}",
+ }
+ )
# LLM call
response = client.messages.create(
- model=MODEL, system=SYSTEM, messages=messages,
- tools=TOOLS, max_tokens=8000,
+ model=MODEL,
+ system=SYSTEM,
+ messages=messages,
+ tools=TOOLS,
+ max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
+ if BG.has_running_tasks() and inject_background_results(
+ messages, BG.wait_for_notifications()
+ ):
+ continue
return
# Tool execution
results = []
@@ -686,18 +1184,30 @@ def agent_loop(messages: list):
manual_compress = True
handler = TOOL_HANDLERS.get(block.name)
try:
- output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
+ output = (
+ handler(**block.input)
+ if handler
+ else f"Unknown tool: {block.name}"
+ )
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}:")
print(str(output)[:200])
- results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
+ results.append(
+ {
+ "type": "tool_result",
+ "tool_use_id": block.id,
+ "content": str(output),
+ }
+ )
if block.name == "TodoWrite":
used_todo = True
# s03: nag reminder (only when todo workflow is active)
rounds_without_todo = 0 if used_todo else rounds_without_todo + 1
if TODO.has_open_items() and rounds_without_todo >= 3:
- results.append({"type": "text", "text": "Update your todos."})
+ results.append(
+ {"type": "text", "text": "Update your todos."}
+ )
messages.append({"role": "user", "content": results})
# s06: manual compress
if manual_compress:
diff --git a/tests/test_background_notifications.py b/tests/test_background_notifications.py
new file mode 100644
index 000000000..1a763f908
--- /dev/null
+++ b/tests/test_background_notifications.py
@@ -0,0 +1,135 @@
+import os
+import sys
+import types
+import unittest
+from pathlib import Path
+from types import SimpleNamespace
+
+
+REPO_ROOT = Path(__file__).resolve().parents[1]
+if str(REPO_ROOT) not in sys.path:
+ sys.path.insert(0, str(REPO_ROOT))
+
+os.environ.setdefault("MODEL_ID", "test-model")
+
+fake_anthropic = types.ModuleType("anthropic")
+
+
+class FakeAnthropic:
+ def __init__(self, *args, **kwargs):
+ self.messages = SimpleNamespace(create=None)
+
+
+setattr(fake_anthropic, "Anthropic", FakeAnthropic)
+sys.modules.setdefault("anthropic", fake_anthropic)
+
+fake_dotenv = types.ModuleType("dotenv")
+setattr(fake_dotenv, "load_dotenv", lambda *args, **kwargs: None)
+sys.modules.setdefault("dotenv", fake_dotenv)
+
+import agents.s08_background_tasks as s08_background_tasks
+import agents.s_full as s_full
+
+
+class FakeMessagesAPI:
+ def __init__(self, responses):
+ self._responses = iter(responses)
+ self.call_count = 0
+
+ def create(self, **kwargs):
+ self.call_count += 1
+ return next(self._responses)
+
+
+class FakeBackgroundManager:
+ def __init__(self):
+ self._running = True
+ self.wait_called = False
+
+ def drain_notifications(self):
+ return []
+
+ def drain(self):
+ return []
+
+ def has_running_tasks(self):
+ return self._running
+
+ def wait_for_notifications(self):
+ self.wait_called = True
+ self._running = False
+ return [{"task_id": "bg-1", "status": "completed", "result": "done"}]
+
+
+class BackgroundNotificationTests(unittest.TestCase):
+ def test_s08_agent_loop_waits_for_background_results_after_end_turn(self):
+ messages = [{"role": "user", "content": "Run tests in the background"}]
+ fake_bg = FakeBackgroundManager()
+ fake_api = FakeMessagesAPI(
+ [
+ SimpleNamespace(
+ stop_reason="end_turn", content="Started background work."
+ ),
+ SimpleNamespace(
+ stop_reason="end_turn", content="Background work completed."
+ ),
+ ]
+ )
+ original_bg = s08_background_tasks.BG
+ original_client = s08_background_tasks.client
+ try:
+ s08_background_tasks.BG = fake_bg
+ s08_background_tasks.client = SimpleNamespace(messages=fake_api)
+ s08_background_tasks.agent_loop(messages)
+ finally:
+ s08_background_tasks.BG = original_bg
+ s08_background_tasks.client = original_client
+
+ self.assertTrue(fake_bg.wait_called)
+ self.assertEqual(fake_api.call_count, 2)
+ self.assertTrue(
+ any(
+ message["role"] == "user"
+ and isinstance(message["content"], str)
+ and "" in message["content"]
+ for message in messages
+ )
+ )
+
+ def test_s_full_agent_loop_waits_for_background_results_after_end_turn(self):
+ messages = [{"role": "user", "content": "Run tests in the background"}]
+ fake_bg = FakeBackgroundManager()
+ fake_api = FakeMessagesAPI(
+ [
+ SimpleNamespace(
+ stop_reason="end_turn", content="Started background work."
+ ),
+ SimpleNamespace(
+ stop_reason="end_turn", content="Background work completed."
+ ),
+ ]
+ )
+ original_bg = s_full.BG
+ original_client = s_full.client
+ try:
+ s_full.BG = fake_bg
+ s_full.client = SimpleNamespace(messages=fake_api)
+ s_full.agent_loop(messages)
+ finally:
+ s_full.BG = original_bg
+ s_full.client = original_client
+
+ self.assertTrue(fake_bg.wait_called)
+ self.assertEqual(fake_api.call_count, 2)
+ self.assertTrue(
+ any(
+ message["role"] == "user"
+ and isinstance(message["content"], str)
+ and "" in message["content"]
+ for message in messages
+ )
+ )
+
+
+if __name__ == "__main__":
+ unittest.main()