Skip to content

Commit fe66357

Browse files
committed
feat: improve stragety when sources unavailable
Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
1 parent a90dc06 commit fe66357

File tree

2 files changed

+60
-4
lines changed

2 files changed

+60
-4
lines changed

cmd/devtap/drain.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ func runDrain(cmd *cobra.Command, args []string) error {
4040
maxLines, _ := cmd.Flags().GetInt("max-lines")
4141
filterSQL, _ := cmd.Flags().GetString("filter-sql")
4242

43+
if maxLines <= 0 {
44+
maxLines = 10000
45+
}
46+
4347
// If --filter-sql is set, fall back to single-source drain against the configured store.
4448
if filterSQL != "" {
4549
return runDrainSingleSource(cmd, filterSQL, maxLines, event, autoLoop, maxRetries)
@@ -53,6 +57,8 @@ func runDrain(cmd *cobra.Command, args []string) error {
5357

5458
multiSource := len(sources) > 1
5559
var allMessages []store.LogMessage
60+
var drainErrors []string
61+
successCount := 0
5662

5763
// Track remaining message budget. Drain treats its limit as a message
5864
// count, so the budget uses the same unit. Line-level truncation is
@@ -67,11 +73,13 @@ func runDrain(cmd *cobra.Command, args []string) error {
6773
messages, err := src.Store.Drain(src.SessionID, remaining)
6874
if err != nil {
6975
if multiSource {
76+
drainErrors = append(drainErrors, fmt.Sprintf("source %q: %v", src.Label, err))
7077
fmt.Fprintf(cmd.ErrOrStderr(), "devtap: source %q unreachable: %v\n", src.Label, err)
7178
continue
7279
}
7380
return fmt.Errorf("drain: %w", err)
7481
}
82+
successCount++
7583

7684
if multiSource {
7785
for i := range messages {
@@ -87,6 +95,15 @@ func runDrain(cmd *cobra.Command, args []string) error {
8795
remaining -= len(messages)
8896
}
8997

98+
// All sources failed — return error rather than silently succeeding.
99+
// Exception: Stop+autoLoop should fall through so the user isn't blocked.
100+
if multiSource && successCount == 0 && len(drainErrors) > 0 {
101+
if event != "Stop" || !autoLoop {
102+
return fmt.Errorf("all sources failed: %s", strings.Join(drainErrors, "; "))
103+
}
104+
fmt.Fprintf(cmd.ErrOrStderr(), "devtap: all sources failed, allowing stop\n")
105+
}
106+
90107
if multiSource && len(allMessages) > 0 {
91108
allMessages = mcp.DedupMessages(allMessages)
92109
}

cmd/devtap/storefactory.go

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,31 @@ func openStoreByBackend(cfg *config.Config, backend, storeDir, adapter string) (
6666
}
6767
}
6868

69+
// openStoreStrict opens a store without fallback. Unlike openStoreByBackend,
70+
// a greptimedb failure returns an error instead of silently falling back to
71+
// the file store. This is appropriate for reader paths (drain / MCP) where
72+
// falling back would silently drain the wrong data.
73+
func openStoreStrict(cfg *config.Config, backend, storeDir, adapter string) (store.Store, error) {
74+
switch backend {
75+
case "", "file":
76+
return filestore.New(storeDir, adapter)
77+
case "greptimedb":
78+
gs, err := greptimestore.New(cfg.Store.GreptimeDB, storeDir, adapter)
79+
if err != nil {
80+
return nil, fmt.Errorf("greptimedb: %w", err)
81+
}
82+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
83+
defer cancel()
84+
if err := gs.Ping(ctx); err != nil {
85+
_ = gs.Close()
86+
return nil, fmt.Errorf("greptimedb ping: %w", err)
87+
}
88+
return gs, nil
89+
default:
90+
return nil, fmt.Errorf("unknown store backend: %q", backend)
91+
}
92+
}
93+
6994
// resolveDrainSources returns 1 or 2 drain sources for the MCP server and drain command.
7095
//
7196
// - "configured" source: from CLI --store/--session flags (or global config defaults)
@@ -108,10 +133,10 @@ func resolveDrainSources(cmd *cobra.Command) ([]mcp.DrainSource, func(), error)
108133
}
109134
}
110135

111-
configuredStore, err := openStoreByBackend(cfg, configuredBackend, storeDir, adapterName)
112-
if err != nil {
113-
return nil, nil, fmt.Errorf("open configured store: %w", err)
114-
}
136+
// Use strict open for the configured store so that a greptimedb failure
137+
// is surfaced rather than silently falling back to the local file store
138+
// (which would drain wrong data). On failure, degrade to local-only.
139+
configuredStore, configuredErr := openStoreStrict(cfg, configuredBackend, storeDir, adapterName)
115140

116141
// Resolve "local" source: always use global config backend + auto-detected session.
117142
localBackend := cfg.Store.Backend
@@ -128,6 +153,20 @@ func resolveDrainSources(cmd *cobra.Command) ([]mcp.DrainSource, func(), error)
128153
return b
129154
}
130155

156+
// Configured store unavailable — fall back to local-only single source.
157+
if configuredErr != nil {
158+
fmt.Fprintf(os.Stderr, "devtap: configured store %q unavailable (%v), using local only\n",
159+
configuredBackend, configuredErr)
160+
localStore, err := openStoreByBackend(cfg, localBackend, storeDir, adapterName)
161+
if err != nil {
162+
return nil, nil, fmt.Errorf("open local store: %w", err)
163+
}
164+
cleanup := func() { _ = localStore.Close() }
165+
return []mcp.DrainSource{
166+
{Store: localStore, SessionID: localSession, Label: localSession},
167+
}, cleanup, nil
168+
}
169+
131170
// If both sources resolve to the same (backend, session), single source.
132171
if normBackend(configuredBackend) == normBackend(localBackend) && configuredSession == localSession {
133172
cleanup := func() { _ = configuredStore.Close() }

0 commit comments

Comments
 (0)