Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 147 additions & 40 deletions agents/s08_background_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(self):
self.tasks = {} # task_id -> {status, result, command}
self._notification_queue = [] # completed task results
self._lock = threading.Lock()
self._condition = threading.Condition(self._lock)

def run(self, command: str) -> str:
"""Start a background thread, return task_id immediately."""
Expand All @@ -67,8 +68,12 @@ def _execute(self, task_id: str, command: str):
"""Thread target: run subprocess, capture output, push to queue."""
try:
r = subprocess.run(
command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=300
command,
shell=True,
cwd=WORKDIR,
capture_output=True,
text=True,
timeout=300,
)
output = (r.stdout + r.stderr).strip()[:50000]
status = "completed"
Expand All @@ -80,29 +85,49 @@ def _execute(self, task_id: str, command: str):
status = "error"
self.tasks[task_id]["status"] = status
self.tasks[task_id]["result"] = output or "(no output)"
with self._lock:
self._notification_queue.append({
"task_id": task_id,
"status": status,
"command": command[:80],
"result": (output or "(no output)")[:500],
})
with self._condition:
self._notification_queue.append(
{
"task_id": task_id,
"status": status,
"command": command[:80],
"result": (output or "(no output)")[:500],
}
)
self._condition.notify_all()

def check(self, task_id: str = None) -> str:
"""Check status of one task or list all."""
if task_id:
t = self.tasks.get(task_id)
if not t:
return f"Error: Unknown task {task_id}"
return f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
return (
f"[{t['status']}] {t['command'][:60]}\n{t.get('result') or '(running)'}"
)
lines = []
for tid, t in self.tasks.items():
lines.append(f"{tid}: [{t['status']}] {t['command'][:60]}")
return "\n".join(lines) if lines else "No background tasks."

def drain_notifications(self) -> list:
"""Return and clear all pending completion notifications."""
with self._lock:
with self._condition:
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs

def _has_running_tasks_locked(self) -> bool:
return any(task["status"] == "running" for task in self.tasks.values())

def has_running_tasks(self) -> bool:
with self._condition:
return self._has_running_tasks_locked()

def wait_for_notifications(self) -> list:
with self._condition:
while not self._notification_queue and self._has_running_tasks_locked():
self._condition.wait()
notifs = list(self._notification_queue)
self._notification_queue.clear()
return notifs
Expand All @@ -111,25 +136,48 @@ def drain_notifications(self) -> list:
BG = BackgroundManager()


def inject_background_results(messages: list, notifs: list) -> bool:
if notifs and messages:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
)
messages.append(
{
"role": "user",
"content": f"<background-results>\n{notif_text}\n</background-results>",
}
)
return True
return False


# -- Tool implementations --
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path


def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(d in command for d in dangerous):
return "Error: Dangerous command blocked"
try:
r = subprocess.run(command, shell=True, cwd=WORKDIR,
capture_output=True, text=True, timeout=120)
r = subprocess.run(
command,
shell=True,
cwd=WORKDIR,
capture_output=True,
text=True,
timeout=120,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"


def run_read(path: str, limit: int = None) -> str:
try:
lines = safe_path(path).read_text().splitlines()
Expand All @@ -139,6 +187,7 @@ def run_read(path: str, limit: int = None) -> str:
except Exception as e:
return f"Error: {e}"


def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
Expand All @@ -148,6 +197,7 @@ def run_write(path: str, content: str) -> str:
except Exception as e:
return f"Error: {e}"


def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
Expand All @@ -161,57 +211,114 @@ def run_edit(path: str, old_text: str, new_text: str) -> str:


TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"background_run": lambda **kw: BG.run(kw["command"]),
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"background_run": lambda **kw: BG.run(kw["command"]),
"check_background": lambda **kw: BG.check(kw.get("task_id")),
}

TOOLS = [
{"name": "bash", "description": "Run a shell command (blocking).",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "read_file", "description": "Read file contents.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "limit": {"type": "integer"}}, "required": ["path"]}},
{"name": "write_file", "description": "Write content to file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "content": {"type": "string"}}, "required": ["path", "content"]}},
{"name": "edit_file", "description": "Replace exact text in file.",
"input_schema": {"type": "object", "properties": {"path": {"type": "string"}, "old_text": {"type": "string"}, "new_text": {"type": "string"}}, "required": ["path", "old_text", "new_text"]}},
{"name": "background_run", "description": "Run command in background thread. Returns task_id immediately.",
"input_schema": {"type": "object", "properties": {"command": {"type": "string"}}, "required": ["command"]}},
{"name": "check_background", "description": "Check background task status. Omit task_id to list all.",
"input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}}},
{
"name": "bash",
"description": "Run a shell command (blocking).",
"input_schema": {
"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"],
},
},
{
"name": "read_file",
"description": "Read file contents.",
"input_schema": {
"type": "object",
"properties": {"path": {"type": "string"}, "limit": {"type": "integer"}},
"required": ["path"],
},
},
{
"name": "write_file",
"description": "Write content to file.",
"input_schema": {
"type": "object",
"properties": {"path": {"type": "string"}, "content": {"type": "string"}},
"required": ["path", "content"],
},
},
{
"name": "edit_file",
"description": "Replace exact text in file.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string"},
"old_text": {"type": "string"},
"new_text": {"type": "string"},
},
"required": ["path", "old_text", "new_text"],
},
},
{
"name": "background_run",
"description": "Run command in background thread. Returns task_id immediately.",
"input_schema": {
"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"],
},
},
{
"name": "check_background",
"description": "Check background task status. Omit task_id to list all.",
"input_schema": {
"type": "object",
"properties": {"task_id": {"type": "string"}},
},
},
]


def agent_loop(messages: list):
while True:
# Drain background notifications and inject as system message before LLM call
notifs = BG.drain_notifications()
if notifs and messages:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['status']}: {n['result']}" for n in notifs
)
messages.append({"role": "user", "content": f"<background-results>\n{notif_text}\n</background-results>"})
inject_background_results(messages, BG.drain_notifications())
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
model=MODEL,
system=SYSTEM,
messages=messages,
tools=TOOLS,
max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
if BG.has_running_tasks() and inject_background_results(
messages, BG.wait_for_notifications()
):
continue
return
results = []
for block in response.content:
if block.type == "tool_use":
handler = TOOL_HANDLERS.get(block.name)
try:
output = handler(**block.input) if handler else f"Unknown tool: {block.name}"
output = (
handler(**block.input)
if handler
else f"Unknown tool: {block.name}"
)
except Exception as e:
output = f"Error: {e}"
print(f"> {block.name}:")
print(str(output)[:200])
results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(output)})
results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": str(output),
}
)
messages.append({"role": "user", "content": results})


Expand Down
Loading