-
Notifications
You must be signed in to change notification settings - Fork 736
fix(polling): Prevent hanging providers from permanently blocking background usage refresh #414
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,14 +45,33 @@ extension UsageStore { | |
| let fetchContext = spec.makeFetchContext() | ||
| let descriptor = spec.descriptor | ||
| // Keep provider fetch work off MainActor so slow keychain/process reads don't stall menu/UI responsiveness. | ||
| // We use a withTaskGroup with a 30-second timeout leg to ensure that a single hanging provider | ||
| // does not block the entire refresh task group permanently. | ||
| let outcome = await withTaskGroup( | ||
| of: ProviderFetchOutcome.self, | ||
| of: ProviderFetchOutcome?.self, | ||
| returning: ProviderFetchOutcome.self) | ||
| { group in | ||
| group.addTask { | ||
| await descriptor.fetchOutcome(context: fetchContext) | ||
| } | ||
| return await group.next()! | ||
| group.addTask { | ||
| do { | ||
| try await Task.sleep(for: .seconds(30)) | ||
| } catch { | ||
| return nil | ||
| } | ||
| self.providerLogger.warning("Provider refresh timed out", metadata: ["provider": provider.rawValue]) | ||
| return nil | ||
| } | ||
| let first = await group.next() | ||
| group.cancelAll() | ||
| if let first, let outcome = first { | ||
| return outcome | ||
| } else { | ||
| return ProviderFetchOutcome( | ||
| result: .failure(SubprocessRunnerError.timedOut("\(provider.rawValue) fetch")), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we create this timeout failure, how are we distinguishing a true timeout from parent-task cancellation? |
||
| attempts: []) | ||
| } | ||
| } | ||
| if provider == .claude, | ||
| ClaudeOAuthCredentialsStore.invalidateCacheIfCredentialsFileChanged() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -494,7 +494,18 @@ final class UsageStore { | |
| self.timerTask = Task.detached(priority: .utility) { [weak self] in | ||
| while !Task.isCancelled { | ||
| try? await Task.sleep(for: .seconds(wait)) | ||
| await self?.refresh() | ||
| // We use a 60-second window to allow all providers ample time to finish under normal conditions. | ||
| guard let store = self else { return } | ||
| try? await withThrowingTaskGroup(of: Void.self) { group in | ||
| group.addTask { await store.refresh() } | ||
| group.addTask { | ||
| try await Task.sleep(for: .seconds(60)) | ||
| store.providerLogger.error("GLOBAL REFRESH HANG DETECTED: reached 60s safety timeout") | ||
| throw SubprocessRunnerError.timedOut("global refresh") | ||
| } | ||
| _ = try await group.next() | ||
| group.cancelAll() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we know this cancellation path always lets the timer loop move forward, even if refresh work does not respond to cancellation quickly? |
||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,111 +59,123 @@ public enum SubprocessRunner { | |
| process.arguments = arguments | ||
| process.environment = environment | ||
|
|
||
| var processGroup: pid_t? = nil | ||
| var exitCodeTask: Task<Int32, Never>? = nil | ||
|
|
||
| let stdoutPipe = Pipe() | ||
| let stderrPipe = Pipe() | ||
| process.standardOutput = stdoutPipe | ||
| process.standardError = stderrPipe | ||
| process.standardInput = nil | ||
|
|
||
| // Create asynchronous tasks to read from stdout and stderr. | ||
| // Using readToEnd() in a separate task ensures we capture all output without blocking the main execution. | ||
| let stdoutTask = Task<Data, Never> { | ||
| stdoutPipe.fileHandleForReading.readDataToEndOfFile() | ||
| (try? stdoutPipe.fileHandleForReading.readToEnd()) ?? Data() | ||
| } | ||
| let stderrTask = Task<Data, Never> { | ||
| stderrPipe.fileHandleForReading.readDataToEndOfFile() | ||
| (try? stderrPipe.fileHandleForReading.readToEnd()) ?? Data() | ||
| } | ||
|
|
||
| defer { | ||
| // CRITICAL: Ensure the process is actually killed on exit/error/timeout. | ||
| if process.isRunning { | ||
| self.log.debug("Subprocess cleanup: terminating running process", metadata: ["label": label]) | ||
| process.terminate() | ||
| if let pgid = processGroup { | ||
| kill(-pgid, SIGTERM) | ||
| } | ||
|
|
||
| // Give it a brief window to exit gracefully before SIGKILL. | ||
| let killDeadline = Date().addingTimeInterval(0.4) | ||
| while process.isRunning, Date() < killDeadline { | ||
| usleep(50000) | ||
| } | ||
|
|
||
| if process.isRunning { | ||
| self.log.warning("Subprocess cleanup: process resisted SIGTERM, sending SIGKILL", metadata: ["label": label]) | ||
| if let pgid = processGroup { | ||
| kill(-pgid, SIGKILL) | ||
| } | ||
| kill(process.processIdentifier, SIGKILL) | ||
| } | ||
| } | ||
|
|
||
| // Cancel tasks to avoid leaking resources. | ||
| exitCodeTask?.cancel() | ||
| stdoutTask.cancel() | ||
| stderrTask.cancel() | ||
|
|
||
| // Ensure pipes are closed on exit to unblock any pending read operations. | ||
| try? stdoutPipe.fileHandleForReading.close() | ||
| try? stderrPipe.fileHandleForReading.close() | ||
| } | ||
|
|
||
| do { | ||
| try process.run() | ||
| } catch { | ||
| stdoutTask.cancel() | ||
| stderrTask.cancel() | ||
| stdoutPipe.fileHandleForReading.closeFile() | ||
| stderrPipe.fileHandleForReading.closeFile() | ||
| self.log.error("Subprocess launch failed", metadata: ["label": label, "error": error.localizedDescription]) | ||
| throw SubprocessRunnerError.launchFailed(error.localizedDescription) | ||
| } | ||
|
|
||
| var processGroup: pid_t? | ||
| let pid = process.processIdentifier | ||
| if setpgid(pid, pid) == 0 { | ||
| processGroup = pid | ||
| } | ||
|
|
||
| let exitCodeTask = Task<Int32, Never> { | ||
| // Wait for the process to exit or the timeout to fire. | ||
| let task = Task<Int32, Never> { | ||
| process.waitUntilExit() | ||
| return process.terminationStatus | ||
| } | ||
|
|
||
| do { | ||
| let exitCode = try await withThrowingTaskGroup(of: Int32.self) { group in | ||
| group.addTask { await exitCodeTask.value } | ||
| group.addTask { | ||
| try await Task.sleep(for: .seconds(timeout)) | ||
| throw SubprocessRunnerError.timedOut(label) | ||
| } | ||
| let code = try await group.next()! | ||
| group.cancelAll() | ||
| return code | ||
| exitCodeTask = task | ||
|
|
||
| let exitCode = try await withThrowingTaskGroup(of: Int32.self) { group in | ||
| group.addTask { await task.value } | ||
| group.addTask { | ||
| try await Task.sleep(for: .seconds(timeout)) | ||
| self.log.warning("Subprocess timed out", metadata: ["label": label, "timeout": "\(timeout)"]) | ||
| throw SubprocessRunnerError.timedOut(label) | ||
| } | ||
| let code = try await group.next()! | ||
| group.cancelAll() | ||
| return code | ||
| } | ||
|
|
||
| let stdoutData = await stdoutTask.value | ||
| let stderrData = await stderrTask.value | ||
| let stdout = String(data: stdoutData, encoding: .utf8) ?? "" | ||
| let stderr = String(data: stderrData, encoding: .utf8) ?? "" | ||
|
|
||
| if exitCode != 0 { | ||
| let duration = Date().timeIntervalSince(start) | ||
| self.log.warning( | ||
| "Subprocess failed", | ||
| metadata: [ | ||
| "label": label, | ||
| "binary": binaryName, | ||
| "status": "\(exitCode)", | ||
| "duration_ms": "\(Int(duration * 1000))", | ||
| ]) | ||
| throw SubprocessRunnerError.nonZeroExit(code: exitCode, stderr: stderr) | ||
| } | ||
| // IMPORTANT: We close the pipes BEFORE awaiting the reading tasks. | ||
| // readToEnd() can block indefinitely if the underlying process is dead but the pipe is still "open" | ||
| // in a zombie state or if a child process inherited it. Closing the handle explicitly triggers EOF | ||
| // in the reading task, allowing stdoutTask.value to complete. | ||
| try? stdoutPipe.fileHandleForReading.close() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any chance closing the read handle here could race the reader task and cause us to miss some stdout/stderr data? |
||
| try? stderrPipe.fileHandleForReading.close() | ||
|
|
||
| let duration = Date().timeIntervalSince(start) | ||
| self.log.debug( | ||
| "Subprocess exit", | ||
| metadata: [ | ||
| "label": label, | ||
| "binary": binaryName, | ||
| "status": "\(exitCode)", | ||
| "duration_ms": "\(Int(duration * 1000))", | ||
| ]) | ||
| return SubprocessResult(stdout: stdout, stderr: stderr) | ||
| } catch { | ||
| let stdoutData = await stdoutTask.value | ||
| let stderrData = await stderrTask.value | ||
| let stdout = String(data: stdoutData, encoding: .utf8) ?? "" | ||
| let stderr = String(data: stderrData, encoding: .utf8) ?? "" | ||
|
|
||
| if exitCode != 0 { | ||
| let duration = Date().timeIntervalSince(start) | ||
| self.log.warning( | ||
| "Subprocess error", | ||
| "Subprocess failed", | ||
| metadata: [ | ||
| "label": label, | ||
| "binary": binaryName, | ||
| "status": "\(exitCode)", | ||
| "duration_ms": "\(Int(duration * 1000))", | ||
| ]) | ||
| if process.isRunning { | ||
| process.terminate() | ||
| if let pgid = processGroup { | ||
| kill(-pgid, SIGTERM) | ||
| } | ||
| let killDeadline = Date().addingTimeInterval(0.4) | ||
| while process.isRunning, Date() < killDeadline { | ||
| usleep(50000) | ||
| } | ||
| if process.isRunning { | ||
| if let pgid = processGroup { | ||
| kill(-pgid, SIGKILL) | ||
| } | ||
| kill(process.processIdentifier, SIGKILL) | ||
| } | ||
| } | ||
| exitCodeTask.cancel() | ||
| stdoutTask.cancel() | ||
| stderrTask.cancel() | ||
| stdoutPipe.fileHandleForReading.closeFile() | ||
| stderrPipe.fileHandleForReading.closeFile() | ||
| throw error | ||
| throw SubprocessRunnerError.nonZeroExit(code: exitCode, stderr: stderr) | ||
| } | ||
|
|
||
| let duration = Date().timeIntervalSince(start) | ||
| self.log.debug( | ||
| "Subprocess exit", | ||
| metadata: [ | ||
| "label": label, | ||
| "binary": binaryName, | ||
| "status": "\(exitCode)", | ||
| "duration_ms": "\(Int(duration * 1000))", | ||
| ]) | ||
| return SubprocessResult(stdout: stdout, stderr: stderr) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we double-check whether canceling the task group here guarantees this method returns in ~30s when a provider fetch is stuck in non-cooperative work?