Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions Sources/CodexBar/UsageStore+Refresh.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,33 @@ extension UsageStore {
let fetchContext = spec.makeFetchContext()
let descriptor = spec.descriptor
// Keep provider fetch work off MainActor so slow keychain/process reads don't stall menu/UI responsiveness.
// We use a withTaskGroup with a 30-second timeout leg to ensure that a single hanging provider
// does not block the entire refresh task group permanently.
let outcome = await withTaskGroup(
of: ProviderFetchOutcome.self,
of: ProviderFetchOutcome?.self,
returning: ProviderFetchOutcome.self)
{ group in
group.addTask {
await descriptor.fetchOutcome(context: fetchContext)
}
return await group.next()!
group.addTask {
do {
try await Task.sleep(for: .seconds(30))
} catch {
return nil
}
self.providerLogger.warning("Provider refresh timed out", metadata: ["provider": provider.rawValue])
return nil
}
let first = await group.next()
group.cancelAll()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we double-check whether canceling the task group here guarantees this method returns in ~30s when a provider fetch is stuck in non-cooperative work?

if let first, let outcome = first {
return outcome
} else {
return ProviderFetchOutcome(
result: .failure(SubprocessRunnerError.timedOut("\(provider.rawValue) fetch")),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we create this timeout failure, how are we distinguishing a true timeout from parent-task cancellation?

attempts: [])
}
}
if provider == .claude,
ClaudeOAuthCredentialsStore.invalidateCacheIfCredentialsFileChanged()
Expand Down
13 changes: 12 additions & 1 deletion Sources/CodexBar/UsageStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,18 @@ final class UsageStore {
self.timerTask = Task.detached(priority: .utility) { [weak self] in
while !Task.isCancelled {
try? await Task.sleep(for: .seconds(wait))
await self?.refresh()
// We use a 60-second window to allow all providers ample time to finish under normal conditions.
guard let store = self else { return }
try? await withThrowingTaskGroup(of: Void.self) { group in
group.addTask { await store.refresh() }
group.addTask {
try await Task.sleep(for: .seconds(60))
store.providerLogger.error("GLOBAL REFRESH HANG DETECTED: reached 60s safety timeout")
throw SubprocessRunnerError.timedOut("global refresh")
}
_ = try await group.next()
group.cancelAll()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know this cancellation path always lets the timer loop move forward, even if refresh work does not respond to cancellation quickly?

}
}
}
}
Expand Down
152 changes: 82 additions & 70 deletions Sources/CodexBarCore/Host/Process/SubprocessRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,111 +59,123 @@ public enum SubprocessRunner {
process.arguments = arguments
process.environment = environment

var processGroup: pid_t? = nil
var exitCodeTask: Task<Int32, Never>? = nil

let stdoutPipe = Pipe()
let stderrPipe = Pipe()
process.standardOutput = stdoutPipe
process.standardError = stderrPipe
process.standardInput = nil

// Create asynchronous tasks to read from stdout and stderr.
// Using readToEnd() in a separate task ensures we capture all output without blocking the main execution.
let stdoutTask = Task<Data, Never> {
stdoutPipe.fileHandleForReading.readDataToEndOfFile()
(try? stdoutPipe.fileHandleForReading.readToEnd()) ?? Data()
}
let stderrTask = Task<Data, Never> {
stderrPipe.fileHandleForReading.readDataToEndOfFile()
(try? stderrPipe.fileHandleForReading.readToEnd()) ?? Data()
}

defer {
// CRITICAL: Ensure the process is actually killed on exit/error/timeout.
if process.isRunning {
self.log.debug("Subprocess cleanup: terminating running process", metadata: ["label": label])
process.terminate()
if let pgid = processGroup {
kill(-pgid, SIGTERM)
}

// Give it a brief window to exit gracefully before SIGKILL.
let killDeadline = Date().addingTimeInterval(0.4)
while process.isRunning, Date() < killDeadline {
usleep(50000)
}

if process.isRunning {
self.log.warning("Subprocess cleanup: process resisted SIGTERM, sending SIGKILL", metadata: ["label": label])
if let pgid = processGroup {
kill(-pgid, SIGKILL)
}
kill(process.processIdentifier, SIGKILL)
}
}

// Cancel tasks to avoid leaking resources.
exitCodeTask?.cancel()
stdoutTask.cancel()
stderrTask.cancel()

// Ensure pipes are closed on exit to unblock any pending read operations.
try? stdoutPipe.fileHandleForReading.close()
try? stderrPipe.fileHandleForReading.close()
}

do {
try process.run()
} catch {
stdoutTask.cancel()
stderrTask.cancel()
stdoutPipe.fileHandleForReading.closeFile()
stderrPipe.fileHandleForReading.closeFile()
self.log.error("Subprocess launch failed", metadata: ["label": label, "error": error.localizedDescription])
throw SubprocessRunnerError.launchFailed(error.localizedDescription)
}

var processGroup: pid_t?
let pid = process.processIdentifier
if setpgid(pid, pid) == 0 {
processGroup = pid
}

let exitCodeTask = Task<Int32, Never> {
// Wait for the process to exit or the timeout to fire.
let task = Task<Int32, Never> {
process.waitUntilExit()
return process.terminationStatus
}

do {
let exitCode = try await withThrowingTaskGroup(of: Int32.self) { group in
group.addTask { await exitCodeTask.value }
group.addTask {
try await Task.sleep(for: .seconds(timeout))
throw SubprocessRunnerError.timedOut(label)
}
let code = try await group.next()!
group.cancelAll()
return code
exitCodeTask = task

let exitCode = try await withThrowingTaskGroup(of: Int32.self) { group in
group.addTask { await task.value }
group.addTask {
try await Task.sleep(for: .seconds(timeout))
self.log.warning("Subprocess timed out", metadata: ["label": label, "timeout": "\(timeout)"])
throw SubprocessRunnerError.timedOut(label)
}
let code = try await group.next()!
group.cancelAll()
return code
}

let stdoutData = await stdoutTask.value
let stderrData = await stderrTask.value
let stdout = String(data: stdoutData, encoding: .utf8) ?? ""
let stderr = String(data: stderrData, encoding: .utf8) ?? ""

if exitCode != 0 {
let duration = Date().timeIntervalSince(start)
self.log.warning(
"Subprocess failed",
metadata: [
"label": label,
"binary": binaryName,
"status": "\(exitCode)",
"duration_ms": "\(Int(duration * 1000))",
])
throw SubprocessRunnerError.nonZeroExit(code: exitCode, stderr: stderr)
}
// IMPORTANT: We close the pipes BEFORE awaiting the reading tasks.
// readToEnd() can block indefinitely if the underlying process is dead but the pipe is still "open"
// in a zombie state or if a child process inherited it. Closing the handle explicitly triggers EOF
// in the reading task, allowing stdoutTask.value to complete.
try? stdoutPipe.fileHandleForReading.close()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any chance closing the read handle here could race the reader task and cause us to miss some stdout/stderr data?

try? stderrPipe.fileHandleForReading.close()

let duration = Date().timeIntervalSince(start)
self.log.debug(
"Subprocess exit",
metadata: [
"label": label,
"binary": binaryName,
"status": "\(exitCode)",
"duration_ms": "\(Int(duration * 1000))",
])
return SubprocessResult(stdout: stdout, stderr: stderr)
} catch {
let stdoutData = await stdoutTask.value
let stderrData = await stderrTask.value
let stdout = String(data: stdoutData, encoding: .utf8) ?? ""
let stderr = String(data: stderrData, encoding: .utf8) ?? ""

if exitCode != 0 {
let duration = Date().timeIntervalSince(start)
self.log.warning(
"Subprocess error",
"Subprocess failed",
metadata: [
"label": label,
"binary": binaryName,
"status": "\(exitCode)",
"duration_ms": "\(Int(duration * 1000))",
])
if process.isRunning {
process.terminate()
if let pgid = processGroup {
kill(-pgid, SIGTERM)
}
let killDeadline = Date().addingTimeInterval(0.4)
while process.isRunning, Date() < killDeadline {
usleep(50000)
}
if process.isRunning {
if let pgid = processGroup {
kill(-pgid, SIGKILL)
}
kill(process.processIdentifier, SIGKILL)
}
}
exitCodeTask.cancel()
stdoutTask.cancel()
stderrTask.cancel()
stdoutPipe.fileHandleForReading.closeFile()
stderrPipe.fileHandleForReading.closeFile()
throw error
throw SubprocessRunnerError.nonZeroExit(code: exitCode, stderr: stderr)
}

let duration = Date().timeIntervalSince(start)
self.log.debug(
"Subprocess exit",
metadata: [
"label": label,
"binary": binaryName,
"status": "\(exitCode)",
"duration_ms": "\(Int(duration * 1000))",
])
return SubprocessResult(stdout: stdout, stderr: stderr)
}
}