Skip to content

Commit a90dc06

Browse files
committed
feat: add multi-source drain/MCP aggregation with dedup and source labels
Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
1 parent 00d46d5 commit a90dc06

File tree

18 files changed

+1035
-88
lines changed

18 files changed

+1035
-88
lines changed

CLAUDE.md

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,23 @@ AI tool ← MCP server (get_build_errors) ← store.Drain() ← per
5151
### Key Patterns
5252

5353
- **Multi-adapter fan-out**: Writers discover adapters via `store.DiscoverAdapters()`, write to all. Each tool drains independently.
54+
- **Multi-source drain**: MCP server and drain command can read from up to 2 sources (local + configured remote). `resolveDrainSources()` in `cmd/devtap/storefactory.go` resolves sources and deduplicates when (backend, session) match. Messages are merged, deduplicated via `mcp.DedupMessages()`, and labeled with `[host session]` prefix in multi-source mode.
55+
- **Drain budget unit**: `Store.Drain(sessionID, maxLines)` treats `maxLines` as a **message count** (not line count). The multi-source loop tracks remaining budget in messages. Line-level truncation is a separate pass via `mcp.TruncateMessages()`.
56+
- **Host field**: `LogMessage.Host` is set via cached `os.Hostname()` at capture startup. Used for multi-source origin labeling. GreptimeDB uses `COALESCE(host, '')` for backward compat with pre-existing rows.
5457
- **File store IPC**: `pending.jsonl` → atomic rename to `pending.jsonl.draining` → read → delete. Leftover lines written back to prevent data loss.
5558
- **Config merge**: `.mcp.json` / `settings.json` / `opencode.json` reads existing → upserts devtap entry → writes back. Never overwrites other tools' config.
5659
- **Instruction injection**: Appends `<!-- devtap:start -->` / `<!-- devtap:end -->` block to project instruction files. Idempotent via marker detection.
5760
- **Session encoding**: `session.EncodeDir("/foo/bar")``"-foo-bar"`, shared across adapters.
5861
- **Capture modes**: `runner.go` (batch, flush every 50 lines) vs `longrun.go` (debounce timer, for dev servers).
5962
- **Scanner buffers**: 64KB initial / 1MB max (`internal/capture/errors.go`). On scanner error (line >1MB), pipe is drained to discard to prevent child process deadlock.
60-
- **Line-level truncation**: `mcp.TruncateMessages()` allocates line budget proportionally across messages. Applied in both MCP server and drain command.
63+
- **Line-level truncation**: `mcp.TruncateMessages()` allocates line budget proportionally across messages. Applied in both MCP server and drain command after all sources are drained.
6164

6265
### GreptimeDB Specifics
6366

6467
- Composite `PRIMARY KEY (session_id, \`tag\`, stream, adapter)` clause (not inline per-column)
6568
- `TIMESTAMP(6)` microsecond precision to avoid PK collisions
6669
- `append_mode=true` allows duplicate PKs
70+
- `host STRING` column added via `ALTER TABLE ADD COLUMN` (silently ignores error if column already exists). Not a TAG column — TAG columns are part of the primary key and cannot be added via ALTER TABLE.
6771
- SQL injection protection via `validateFilterSQL` in drain (best-effort blocklist, not a security boundary)
6872
- Integration tests gated behind `//go:build integration`
6973

@@ -72,10 +76,19 @@ AI tool ← MCP server (get_build_errors) ← store.Drain() ← per
7276

7377
devtap captures build/dev output from a separate terminal and delivers it here via MCP.
7478

75-
**You MUST call get_build_errors in these situations:**
79+
**You MUST check devtap in these situations:**
7680
- At the start of each new task or conversation turn
7781
- Before writing or editing any code
7882
- After being told that a build, test, or dev server has new output
7983

80-
Do not assume there are no errors — always check. Build output may arrive at any time from a parallel terminal.
84+
**Use this call order:**
85+
1. Call "get_build_status" once at the start of the turn.
86+
2. Call "get_build_errors" when status reports pending messages, when the user says new output arrived, or before the first code edit in the turn.
87+
88+
**Call limit per turn:** call "get_build_errors" at most once per turn unless new output is reported.
89+
90+
**Multi-source mode:** when devtap drains from multiple sources, tags are prefixed with "host/label |" (for example, `[devtap: myhost/local | make]`). "host" is the machine name, "label" identifies the source. Show these prefixes as-is. If output includes source warnings (for example, source unreachable), show those warnings verbatim and continue with output from reachable sources.
91+
92+
**Output format:** when "get_build_errors" returns content, present it verbatim in a fenced code block, then add one line:
93+
"Next action: <what you will do>".
8194
<!-- devtap:end -->

README.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,19 @@ devtap -- npm run dev
6060

6161
**Terminal B** — use your AI coding tool as usual. It will automatically call `get_build_errors` via MCP to fetch captured build errors.
6262

63+
If you want to verify without MCP, run:
64+
65+
```bash
66+
devtap drain
67+
```
68+
69+
Typical output:
70+
71+
```text
72+
[devtap: cargo check] Build failed (exit code 101):
73+
...
74+
```
75+
6376
**Tip:** Since devtap captures stdout from any command, you can send arbitrary messages to your coding agent:
6477

6578
```bash
@@ -106,6 +119,13 @@ Your laptop CI / remote build server
106119
3. Each AI tool independently drains its own copy via `get_build_errors`
107120
4. AI sees the errors and fixes them
108121

122+
When `mcp-serve`/`drain` is started with explicit `--session` or `--store`, devtap can merge output from two sources:
123+
124+
- `local`: auto-detected project session from your default backend
125+
- `configured`: the explicit `--session`/`--store` target
126+
127+
If both resolve to the same backend+session, devtap uses a single source. Otherwise it drains both, deduplicates identical messages, and prefixes tags with source info (for example `myhost/local |`).
128+
109129
## Supported Tools
110130

111131
| Tool | Adapter | Integration | Config File | Instruction File |
@@ -218,6 +238,30 @@ devtap --store greptimedb --session myproject -- make
218238

219239
Multiple build machines can write to the same session simultaneously — each entry is tagged with its source, and the AI tool drains them all.
220240

241+
**Local + remote merged drain** — keep your local loop and a remote shared session visible at the same time:
242+
243+
```bash
244+
# Laptop: local output (default source)
245+
devtap -- cargo check
246+
247+
# Remote machine: shared session output (configured source)
248+
devtap --store greptimedb --session myproject -- make
249+
250+
# Laptop: install MCP with explicit remote session
251+
devtap install --adapter claude-code --store greptimedb --session myproject
252+
253+
# Optional manual check (without MCP client)
254+
devtap drain --store greptimedb --session myproject
255+
```
256+
257+
`devtap drain` shows a merged header (`Draining from N sources`), emits source-unreachable warnings when partial failures occur, and still returns available output from reachable sources.
258+
259+
Typical merged header:
260+
261+
```text
262+
[devtap] Draining from 2 sources (2 reachable)
263+
```
264+
221265
**Session auto-detection** — when `--session auto` (default), devtap resolves the project directory like this:
222266
1. Git root (nearest parent with `.git`)
223267
2. Project marker files (nearest parent with one of: `go.mod`, `package.json`, `pyproject.toml`, `Cargo.toml`, `pom.xml`, `build.gradle`, `build.gradle.kts`, `composer.json`, `Gemfile`, `setup.py`)
@@ -265,6 +309,15 @@ Subcommands:
265309

266310
Lines exceeding `--max-lines` are smart-truncated: head and tail preserved with omission notice. Consecutive duplicate lines are merged.
267311

312+
`devtap mcp-serve` and `devtap drain` can aggregate multiple sources (local + configured) as described above. `devtap drain --filter-sql` is a single-source mode and requires `--store greptimedb`.
313+
314+
## Troubleshooting
315+
316+
- No output but you expect logs: run `devtap status` first, then `devtap drain --max-lines 200`.
317+
- Using an unexpected session: run with `--session pick` once to confirm the target session.
318+
- Multi-source drain shows warnings: reachable sources are still returned; warnings indicate one source is unavailable.
319+
- MCP tool not being called: re-run `devtap install --adapter <adapter>` in the project root and restart the AI tool session.
320+
268321
## Security & Privacy
269322

270323
- **All data stays local.** Build output is stored on your machine at `~/.devtap/` (file backend) or in a self-hosted GreptimeDB instance. Nothing is sent to external servers.

cmd/devtap/drain.go

Lines changed: 101 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"encoding/json"
55
"fmt"
6+
"strings"
67

78
"github.com/spf13/cobra"
89

@@ -33,14 +34,107 @@ For MCP-capable tools, use "devtap mcp-serve" instead.`,
3334
}
3435

3536
func runDrain(cmd *cobra.Command, args []string) error {
36-
adapterName, _ := cmd.Flags().GetString("adapter")
37-
sessionFlag, _ := cmd.Flags().GetString("session")
3837
event, _ := cmd.Flags().GetString("event")
3938
autoLoop, _ := cmd.Flags().GetBool("auto-loop")
4039
maxRetries, _ := cmd.Flags().GetInt("max-retries")
4140
maxLines, _ := cmd.Flags().GetInt("max-lines")
4241
filterSQL, _ := cmd.Flags().GetString("filter-sql")
4342

43+
// If --filter-sql is set, fall back to single-source drain against the configured store.
44+
if filterSQL != "" {
45+
return runDrainSingleSource(cmd, filterSQL, maxLines, event, autoLoop, maxRetries)
46+
}
47+
48+
sources, cleanup, err := resolveDrainSources(cmd)
49+
if err != nil {
50+
return fmt.Errorf("resolve drain sources: %w", err)
51+
}
52+
defer cleanup()
53+
54+
multiSource := len(sources) > 1
55+
var allMessages []store.LogMessage
56+
57+
// Track remaining message budget. Drain treats its limit as a message
58+
// count, so the budget uses the same unit. Line-level truncation is
59+
// handled afterward by TruncateMessages.
60+
remaining := maxLines
61+
62+
for _, src := range sources {
63+
if remaining <= 0 {
64+
break
65+
}
66+
67+
messages, err := src.Store.Drain(src.SessionID, remaining)
68+
if err != nil {
69+
if multiSource {
70+
fmt.Fprintf(cmd.ErrOrStderr(), "devtap: source %q unreachable: %v\n", src.Label, err)
71+
continue
72+
}
73+
return fmt.Errorf("drain: %w", err)
74+
}
75+
76+
if multiSource {
77+
for i := range messages {
78+
host := messages[i].Host
79+
if host == "" {
80+
host = "unknown"
81+
}
82+
messages[i].Tag = fmt.Sprintf("%s/%s | %s", host, src.Label, messages[i].Tag)
83+
}
84+
}
85+
86+
allMessages = append(allMessages, messages...)
87+
remaining -= len(messages)
88+
}
89+
90+
if multiSource && len(allMessages) > 0 {
91+
allMessages = mcp.DedupMessages(allMessages)
92+
}
93+
94+
allMessages = mcp.TruncateMessages(allMessages, maxLines)
95+
96+
// Handle auto-loop Stop hook (Claude Code specific)
97+
if event == "Stop" && autoLoop {
98+
storeDir, err := defaultStoreDir()
99+
if err != nil {
100+
return fmt.Errorf("resolve store dir: %w", err)
101+
}
102+
// Use the first source's session for retry tracking
103+
sessionID := sources[0].SessionID
104+
return handleAutoLoopStop(storeDir, sessionID, allMessages, maxRetries)
105+
}
106+
107+
// Reset retry counter on any non-Stop drain (e.g., user submitted a new prompt).
108+
if event != "" && event != "Stop" {
109+
if storeDir, err := defaultStoreDir(); err == nil {
110+
tracker := store.NewRetryTracker(storeDir)
111+
for _, src := range sources {
112+
_ = tracker.Reset(src.SessionID)
113+
}
114+
}
115+
}
116+
117+
if len(allMessages) == 0 {
118+
return nil
119+
}
120+
121+
// Plain text output
122+
if multiSource {
123+
var sourceLabels []string
124+
for _, src := range sources {
125+
sourceLabels = append(sourceLabels, src.Label)
126+
}
127+
fmt.Printf("[devtap] Draining from %d sources: %s\n\n", len(sourceLabels), strings.Join(sourceLabels, ", "))
128+
}
129+
fmt.Println(mcp.FormatMessages(allMessages))
130+
return nil
131+
}
132+
133+
// runDrainSingleSource handles drain with --filter-sql which only works with a single GreptimeDB store.
134+
func runDrainSingleSource(cmd *cobra.Command, filterSQL string, maxLines int, event string, autoLoop bool, maxRetries int) error {
135+
adapterName, _ := cmd.Flags().GetString("adapter")
136+
sessionFlag, _ := cmd.Flags().GetString("session")
137+
44138
sessionID, err := resolveSession(adapterName, sessionFlag)
45139
if err != nil {
46140
return fmt.Errorf("resolve session: %w", err)
@@ -52,24 +146,18 @@ func runDrain(cmd *cobra.Command, args []string) error {
52146
}
53147
defer func() { _ = s.Close() }()
54148

55-
// Drain messages (with SQL filter if using GreptimeDB)
56-
var messages []store.LogMessage
57-
if filterSQL != "" {
58-
if gs, ok := s.(*greptimestore.Store); ok {
59-
messages, err = gs.DrainSQL(sessionID, filterSQL, maxLines)
60-
} else {
61-
return fmt.Errorf("--filter-sql requires --store greptimedb")
62-
}
63-
} else {
64-
messages, err = s.Drain(sessionID, maxLines)
149+
gs, ok := s.(*greptimestore.Store)
150+
if !ok {
151+
return fmt.Errorf("--filter-sql requires --store greptimedb")
65152
}
153+
154+
messages, err := gs.DrainSQL(sessionID, filterSQL, maxLines)
66155
if err != nil {
67156
return fmt.Errorf("drain: %w", err)
68157
}
69158

70159
messages = mcp.TruncateMessages(messages, maxLines)
71160

72-
// Handle auto-loop Stop hook (Claude Code specific)
73161
if event == "Stop" && autoLoop {
74162
storeDir, err := defaultStoreDir()
75163
if err != nil {
@@ -78,7 +166,6 @@ func runDrain(cmd *cobra.Command, args []string) error {
78166
return handleAutoLoopStop(storeDir, sessionID, messages, maxRetries)
79167
}
80168

81-
// Reset retry counter on any non-Stop drain (e.g., user submitted a new prompt).
82169
if event != "" && event != "Stop" {
83170
if storeDir, err := defaultStoreDir(); err == nil {
84171
tracker := store.NewRetryTracker(storeDir)
@@ -90,7 +177,6 @@ func runDrain(cmd *cobra.Command, args []string) error {
90177
return nil
91178
}
92179

93-
// Plain text output
94180
fmt.Println(mcp.FormatMessages(messages))
95181
return nil
96182
}

cmd/devtap/install.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ func runInstall(cmd *cobra.Command, args []string) error {
5959
return fmt.Errorf("install: %w", err)
6060
}
6161

62+
// Print multi-source hint when explicit --session or --store is used.
63+
if (sessionFlag != "" && sessionFlag != "auto") || storeFlag != "" {
64+
targetSession := sessionFlag
65+
if targetSession == "" {
66+
targetSession = "auto"
67+
}
68+
fmt.Printf("MCP server will drain from both local builds and remote session %q.\n\n", targetSession)
69+
}
70+
6271
switch adapterName {
6372
case "claude-code":
6473
fmt.Println("Installed devtap MCP server for Claude Code.")

cmd/devtap/mcpserve.go

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,38 +23,30 @@ Exposed tools:
2323
get_build_status Get pending message counts per session`,
2424
SilenceUsage: true,
2525
RunE: func(cmd *cobra.Command, args []string) error {
26-
s, err := openStore(cmd)
27-
if err != nil {
28-
return fmt.Errorf("open store: %w", err)
26+
maxLines, _ := cmd.Flags().GetInt("max-lines")
27+
if maxLines <= 0 {
28+
maxLines = 10000
2929
}
30-
defer func() { _ = s.Close() }()
3130

3231
adapterName, _ := cmd.Flags().GetString("adapter")
3332
if adapterName == "" {
3433
adapterName = "claude-code"
3534
}
36-
sessionID, _ := cmd.Flags().GetString("session")
37-
maxLines, _ := cmd.Flags().GetInt("max-lines")
38-
if maxLines <= 0 {
39-
maxLines = 10000
40-
}
4135

42-
// If session is "auto", use a default based on cwd
43-
if sessionID == "" || sessionID == "auto" {
44-
resolved, err := resolveSession(adapterName, "auto")
45-
if err != nil || resolved == "" {
46-
sessionID = "default"
47-
} else {
48-
sessionID = resolved
49-
}
36+
sources, cleanup, err := resolveDrainSources(cmd)
37+
if err != nil {
38+
return fmt.Errorf("resolve drain sources: %w", err)
5039
}
40+
defer cleanup()
5141

52-
// Pre-register adapter dir so writers can discover and fan-out to us.
42+
// Pre-register adapter dir for each source so writers can discover us.
5343
if storeDir, err := defaultStoreDir(); err == nil {
54-
_ = store.EnsureAdapterDir(storeDir, sessionID, adapterName)
44+
for _, src := range sources {
45+
_ = store.EnsureAdapterDir(storeDir, src.SessionID, adapterName)
46+
}
5547
}
5648

57-
srv := mcp.NewServer(s, sessionID, maxLines)
49+
srv := mcp.NewMultiSourceServer(sources, maxLines)
5850
return srv.Run()
5951
},
6052
}

cmd/devtap/run.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"os"
66
"path/filepath"
7+
"strings"
78

89
"github.com/spf13/cobra"
910

@@ -37,10 +38,9 @@ func runRun(cmd *cobra.Command, args []string) error {
3738
debounce, _ := cmd.Flags().GetDuration("debounce")
3839

3940
if tag == "" {
40-
if name := capture.CommandName(cmdArgs); name != "" {
41-
tag = filepath.Base(name)
42-
} else {
43-
tag = filepath.Base(cmdArgs[0])
41+
tag = shellJoin(cmdArgs)
42+
if len(tag) > 80 {
43+
tag = tag[:77] + "..."
4444
}
4545
}
4646

@@ -175,3 +175,17 @@ func findMarkerRoot(start string) (string, bool) {
175175
}
176176
return "", false
177177
}
178+
179+
// shellJoin joins command arguments into a display string,
180+
// quoting arguments that contain spaces or shell metacharacters.
181+
func shellJoin(args []string) string {
182+
parts := make([]string, len(args))
183+
for i, arg := range args {
184+
if strings.ContainsAny(arg, " \t'\"\\$`!#&|;(){}") {
185+
parts[i] = fmt.Sprintf("%q", arg)
186+
} else {
187+
parts[i] = arg
188+
}
189+
}
190+
return strings.Join(parts, " ")
191+
}

0 commit comments

Comments
 (0)