Skip to content

Commit 47cf5d6

Browse files
committed
ai/mcp: geoip
1 parent 306cf4d commit 47cf5d6

File tree

27 files changed

+2865
-52
lines changed

27 files changed

+2865
-52
lines changed

telemetry/global-monitor/internal/gm/planner_dz_icmp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ func (p *DoubleZeroUserICMPPlanner) recordResult(source *Source, user *dz.User,
223223
tags["target_geoip_region"] = geoIP.Region
224224
tags["target_geoip_city"] = geoIP.City
225225
tags["target_geoip_city_id"] = strconv.Itoa(geoIP.CityID)
226-
tags["target_geoip_metro"] = geoIP.Metro
226+
tags["target_geoip_metro"] = geoIP.MetroName
227227
tags["target_geoip_asn"] = strconv.Itoa(int(geoIP.ASN))
228228
tags["target_geoip_asn_org"] = geoIP.ASNOrg
229229
fields["target_geoip_latitude"] = geoIP.Latitude

telemetry/global-monitor/internal/gm/planner_dz_icmp_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func TestGlobalMonitor_DoubleZeroUserICMPPlanner_Record_WritesExpectedInfluxPoin
166166
Region: "ON",
167167
City: "Toronto",
168168
CityID: 123,
169-
Metro: "YYZ",
169+
MetroName: "Yorkton",
170170
ASN: 64500,
171171
ASNOrg: "Example",
172172
Latitude: 43.7,

telemetry/global-monitor/internal/gm/planner_sol_icmp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ func (p *SolanaValidatorICMPPlanner) recordResult(source *Source, val *sol.Valid
228228
tags["target_geoip_region"] = val.GeoIP.Region
229229
tags["target_geoip_city"] = val.GeoIP.City
230230
tags["target_geoip_city_id"] = strconv.Itoa(val.GeoIP.CityID)
231-
tags["target_geoip_metro"] = val.GeoIP.Metro
231+
tags["target_geoip_metro"] = val.GeoIP.MetroName
232232
tags["target_geoip_asn"] = strconv.Itoa(int(val.GeoIP.ASN))
233233
tags["target_geoip_asn_org"] = val.GeoIP.ASNOrg
234234
fields["target_geoip_latitude"] = val.GeoIP.Latitude

telemetry/global-monitor/internal/gm/planner_sol_icmp_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func TestGlobalMonitor_SolanaValidatorICMPPlanner_Record_WritesExpectedInfluxPoi
165165
Region: "ON",
166166
City: "Toronto",
167167
CityID: 123,
168-
Metro: "YYZ",
168+
MetroName: "Yorkton",
169169
ASN: 64500,
170170
ASNOrg: "Example",
171171
Latitude: 43.7,

telemetry/global-monitor/internal/gm/planner_sol_tpuquic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func (p *SolanaValidatorTPUQUICPlanner) recordResult(source *Source, val *sol.Va
251251
tags["target_geoip_region"] = val.GeoIP.Region
252252
tags["target_geoip_city"] = val.GeoIP.City
253253
tags["target_geoip_city_id"] = strconv.Itoa(val.GeoIP.CityID)
254-
tags["target_geoip_metro"] = val.GeoIP.Metro
254+
tags["target_geoip_metro"] = val.GeoIP.MetroName
255255
tags["target_geoip_asn"] = strconv.Itoa(int(val.GeoIP.ASN))
256256
tags["target_geoip_asn_org"] = val.GeoIP.ASNOrg
257257
fields["target_geoip_latitude"] = val.GeoIP.Latitude

telemetry/global-monitor/internal/gm/planner_sol_tpuquic_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func TestGlobalMonitor_SolanaValidatorTPUQUICPlanner_Record_WritesExpectedInflux
162162
Region: "ON",
163163
City: "Toronto",
164164
CityID: 123,
165-
Metro: "YYZ",
165+
MetroName: "Yorkton",
166166
ASN: 64500,
167167
ASNOrg: "Example",
168168
Latitude: 43.7,

tools/dz-ai/cmd/mcp-server/main.go

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ import (
2929
"github.com/malbeclabs/doublezero/tools/dz-ai/internal/mcp/duck"
3030
"github.com/malbeclabs/doublezero/tools/dz-ai/internal/mcp/metrics"
3131
"github.com/malbeclabs/doublezero/tools/dz-ai/internal/mcp/server"
32+
"github.com/malbeclabs/doublezero/tools/maxmind/pkg/geoip"
33+
"github.com/malbeclabs/doublezero/tools/maxmind/pkg/metrodb"
3234
"github.com/malbeclabs/doublezero/tools/solana/pkg/rpc"
35+
"github.com/oschwald/geoip2-golang"
3336
)
3437

3538
var (
@@ -45,10 +48,13 @@ const (
4548
defaultMaxConcurrency = 64
4649
defaultDZEnv = config.EnvMainnetBeta
4750
defaultMetricsAddr = "0.0.0.0:8080"
48-
// defaultDBPath = ".tmp/mcp.duckdb"
4951
defaultDBPath = ":memory:"
5052
defaultDuckDBSpillPath = ".tmp/duckdb-spill-tmp"
5153
defaultDBPathEnvVar = "MCP_DB_PATH"
54+
defaultGeoipCityDBPath = "/usr/share/GeoIP/GeoLite2-City.mmdb"
55+
defaultGeoipASNDBPath = "/usr/share/GeoIP/GeoLite2-ASN.mmdb"
56+
geoipCityDBPathEnvVar = "MCP_GEOIP_CITY_DB_PATH"
57+
geoipASNDBPathEnvVar = "MCP_GEOIP_ASN_DB_PATH"
5258
)
5359

5460
func main() {
@@ -69,6 +75,8 @@ func run() error {
6975
metricsAddrFlag := flag.String("metrics-addr", defaultMetricsAddr, "Address to listen on for prometheus metrics")
7076
dbPathFlag := flag.String("db-path", defaultDBPath, "Path to DuckDB database file (empty for in-memory, or set MCP_DB_PATH env var)")
7177
dbSpillDirFlag := flag.String("db-spill-dir", defaultDuckDBSpillPath, "Path to DuckDB temporary spill directory")
78+
geoipCityDBPathFlag := flag.String("geoip-city-db-path", defaultGeoipCityDBPath, "Path to MaxMind GeoIP2 City database file (or set MCP_GEOIP_CITY_DB_PATH env var)")
79+
geoipASNDBPathFlag := flag.String("geoip-asn-db-path", defaultGeoipASNDBPath, "Path to MaxMind GeoIP2 ASN database file (or set MCP_GEOIP_ASN_DB_PATH env var)")
7280
flag.Parse()
7381

7482
networkConfig, err := config.NetworkConfigForEnv(*envFlag)
@@ -137,8 +145,10 @@ func run() error {
137145

138146
// Determine database path: flag takes precedence, then env var, then default
139147
dbPath := *dbPathFlag
140-
if dbPath == "" {
141-
dbPath = os.Getenv(defaultDBPathEnvVar)
148+
if dbPath == "" || dbPath == defaultDBPath {
149+
if envPath := os.Getenv(defaultDBPathEnvVar); envPath != "" {
150+
dbPath = envPath
151+
}
142152
}
143153
if dbPath == "" {
144154
dbPath = defaultDBPath
@@ -151,6 +161,28 @@ func run() error {
151161
}
152162
defer dbCloseFn()
153163

164+
// Determine GeoIP database paths: flag takes precedence, then env var, then default
165+
geoipCityDBPath := *geoipCityDBPathFlag
166+
if geoipCityDBPath == defaultGeoipCityDBPath {
167+
if envPath := os.Getenv(geoipCityDBPathEnvVar); envPath != "" {
168+
geoipCityDBPath = envPath
169+
}
170+
}
171+
172+
geoipASNDBPath := *geoipASNDBPathFlag
173+
if geoipASNDBPath == defaultGeoipASNDBPath {
174+
if envPath := os.Getenv(geoipASNDBPathEnvVar); envPath != "" {
175+
geoipASNDBPath = envPath
176+
}
177+
}
178+
179+
// Initialize GeoIP resolver
180+
geoIPResolver, geoIPCloseFn, err := initializeGeoIP(geoipCityDBPath, geoipASNDBPath, log)
181+
if err != nil {
182+
return fmt.Errorf("failed to initialize GeoIP: %w", err)
183+
}
184+
defer geoIPCloseFn()
185+
154186
// Parse allowed tokens from environment variable (comma-separated)
155187
// Auth can be explicitly disabled with MCP_AUTH_DISABLED=true
156188
var allowedTokens []string
@@ -188,6 +220,7 @@ func run() error {
188220
InternetLatencyAgentPK: networkConfig.InternetLatencyCollectorPK,
189221
InternetDataProviders: telemetryconfig.InternetTelemetryDataProviders,
190222
AllowedTokens: allowedTokens,
223+
GeoIPResolver: geoIPResolver,
191224
})
192225
if err != nil {
193226
return fmt.Errorf("failed to create server: %w", err)
@@ -273,3 +306,40 @@ func initializeDuckDB(dbPath string, spillDir string, log *slog.Logger) (duck.DB
273306
return nil
274307
}, nil
275308
}
309+
310+
func initializeGeoIP(cityDBPath, asnDBPath string, log *slog.Logger) (geoip.Resolver, func() error, error) {
311+
cityDB, err := geoip2.Open(cityDBPath)
312+
if err != nil {
313+
return nil, nil, fmt.Errorf("failed to open GeoIP city database: %w", err)
314+
}
315+
316+
asnDB, err := geoip2.Open(asnDBPath)
317+
if err != nil {
318+
cityDB.Close()
319+
return nil, nil, fmt.Errorf("failed to open GeoIP ASN database: %w", err)
320+
}
321+
322+
metroDB, err := metrodb.New()
323+
if err != nil {
324+
cityDB.Close()
325+
asnDB.Close()
326+
return nil, nil, fmt.Errorf("failed to create metro database: %w", err)
327+
}
328+
329+
resolver, err := geoip.NewResolver(log, cityDB, asnDB, metroDB)
330+
if err != nil {
331+
cityDB.Close()
332+
asnDB.Close()
333+
return nil, nil, fmt.Errorf("failed to create GeoIP resolver: %w", err)
334+
}
335+
336+
return resolver, func() error {
337+
if err := cityDB.Close(); err != nil {
338+
return fmt.Errorf("failed to close city database: %w", err)
339+
}
340+
if err := asnDB.Close(); err != nil {
341+
return fmt.Errorf("failed to close ASN database: %w", err)
342+
}
343+
return nil
344+
}, nil
345+
}

tools/dz-ai/internal/agent/system_prompt.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ ANSWERING RULES:
2626
- Latency units: display in milliseconds (ms) by default; use microseconds (µs) only when values are < 0.1 ms.
2727
- Drain semantics: treat dz_links.delay_override_ns = 1000000000 as soft-drained when interpreting link state.
2828
- Link health: consider drained, telemetry packet loss, and delay delta from committed delay when interpreting link health.
29+
- User location: use geoip data and connected devices but tell the user that's how it was determined.
2930
3031
OUTPUT STYLE (MANDATORY):
3132
- Always structure responses using section headers, even for short answers.
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package duck
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"encoding/csv"
7+
"errors"
8+
"fmt"
9+
"log/slog"
10+
"os"
11+
"time"
12+
)
13+
14+
func UpsertTableViaCSV(ctx context.Context, log *slog.Logger, db DB, tableName string, count int, writeCSVFn func(*csv.Writer, int) error) error {
15+
upsertStart := time.Now()
16+
defer func() {
17+
duration := time.Since(upsertStart)
18+
log.Debug("upserting to table completed", "table", tableName, "rows", count, "duration", duration.String())
19+
}()
20+
21+
if count == 0 {
22+
return nil
23+
}
24+
25+
// Create a temporary CSV file for COPY FROM (much faster than individual INSERTs)
26+
tmpFile, err := os.CreateTemp("", fmt.Sprintf("%s_upsert_*.csv", tableName))
27+
if err != nil {
28+
return fmt.Errorf("failed to create temp file: %w", err)
29+
}
30+
defer os.Remove(tmpFile.Name())
31+
defer tmpFile.Close()
32+
33+
// Write CSV data
34+
csvWriter := csv.NewWriter(tmpFile)
35+
csvWriter.Comma = ','
36+
37+
// Log progress every 5 seconds for long-running operations
38+
progressLogInterval := 5 * time.Second
39+
lastProgressLog := time.Now()
40+
41+
for i := range count {
42+
// Check for context cancellation during long-running write operations
43+
select {
44+
case <-ctx.Done():
45+
return fmt.Errorf("context cancelled while writing CSV for %s: %w", tableName, ctx.Err())
46+
default:
47+
}
48+
49+
if err := writeCSVFn(csvWriter, i); err != nil {
50+
log.Error("failed to write CSV record", "table", tableName, "row", i, "total", count, "error", err)
51+
return fmt.Errorf("failed to write CSV record for %s: %w", tableName, err)
52+
}
53+
54+
// Log progress periodically for large operations
55+
if count > 1000 {
56+
now := time.Now()
57+
if now.Sub(lastProgressLog) >= progressLogInterval {
58+
log.Debug("write progress", "table", tableName, "written", i+1, "total", count)
59+
lastProgressLog = now
60+
}
61+
}
62+
}
63+
64+
csvWriter.Flush()
65+
if err := csvWriter.Error(); err != nil {
66+
return fmt.Errorf("CSV writer error: %w", err)
67+
}
68+
if err := tmpFile.Sync(); err != nil {
69+
return fmt.Errorf("failed to sync temp file: %w", err)
70+
}
71+
72+
// Close file before COPY (DuckDB needs to open it)
73+
tmpFile.Close()
74+
75+
// Check for context cancellation before starting transaction
76+
select {
77+
case <-ctx.Done():
78+
return fmt.Errorf("context cancelled before transaction for %s: %w", tableName, ctx.Err())
79+
default:
80+
}
81+
82+
tx, err := db.Begin()
83+
if err != nil {
84+
return fmt.Errorf("failed to begin transaction for %s: %w", tableName, err)
85+
}
86+
defer func() {
87+
if err := tx.Rollback(); err != nil && !errors.Is(err, sql.ErrTxDone) {
88+
log.Error("failed to rollback transaction", "table", tableName, "error", err)
89+
}
90+
}()
91+
92+
// Create temporary table with same structure
93+
// Use DROP IF EXISTS to handle case where temp table already exists from previous failed transaction
94+
tempTableName := fmt.Sprintf("%s_temp_upsert", tableName)
95+
dropTempSQL := fmt.Sprintf(`DROP TABLE IF EXISTS %s`, tempTableName)
96+
if _, err := tx.Exec(dropTempSQL); err != nil {
97+
return fmt.Errorf("failed to drop temp table for %s: %w", tableName, err)
98+
}
99+
createTempSQL := fmt.Sprintf(`CREATE TEMP TABLE %s AS SELECT * FROM %s WHERE 1=0`, tempTableName, tableName)
100+
if _, err := tx.Exec(createTempSQL); err != nil {
101+
return fmt.Errorf("failed to create temp table for %s: %w", tableName, err)
102+
}
103+
104+
// Load CSV into temporary table
105+
copySQL := fmt.Sprintf("COPY %s FROM '%s' (FORMAT CSV, HEADER false)", tempTableName, tmpFile.Name())
106+
if _, err := tx.Exec(copySQL); err != nil {
107+
return fmt.Errorf("failed to COPY FROM CSV for %s: %w", tableName, err)
108+
}
109+
110+
// Use INSERT OR REPLACE to merge temp table into main table
111+
upsertSQL := fmt.Sprintf(`INSERT OR REPLACE INTO %s SELECT * FROM %s`, tableName, tempTableName)
112+
if _, err := tx.Exec(upsertSQL); err != nil {
113+
return fmt.Errorf("failed to upsert from temp table for %s: %w", tableName, err)
114+
}
115+
116+
if err := tx.Commit(); err != nil {
117+
log.Error("transaction commit failed", "table", tableName, "error", err)
118+
return fmt.Errorf("failed to commit transaction for %s: %w", tableName, err)
119+
}
120+
return nil
121+
}

0 commit comments

Comments
 (0)