From 5c7e6acb124145b17ac96e066ac07acda2d8c38e Mon Sep 17 00:00:00 2001 From: Nik Weidenbacher Date: Tue, 3 Feb 2026 19:52:09 +0000 Subject: [PATCH] controller: publish getconfig metrics to clickhouse --- CHANGELOG.md | 2 + .../controller/cmd/controller/main.go | 22 +++ .../internal/controller/clickhouse.go | 127 ++++++++++++++++++ .../controller/internal/controller/server.go | 20 +++ 4 files changed, 171 insertions(+) create mode 100644 controlplane/controller/internal/controller/clickhouse.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e8b108698..c9062ec1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ All notable changes to this project will be documented in this file. - Add observability to BGP handleUpdate: log withdrawal/NLRI counts per batch and track processing duration via `doublezero_bgp_handle_update_duration_seconds` histogram - E2E tests - The QA alldevices test now skips devices that are not calling the controller +- Device controller + - Record successful GetConfig gRPC calls to ClickHouse for device telemetry tracking ## [v0.8.4](https://github.com/malbeclabs/doublezero/compare/client/v0.8.3...client/v0.8.4) – 2026-01-28 diff --git a/controlplane/controller/cmd/controller/main.go b/controlplane/controller/cmd/controller/main.go index 06c36706b..ff185446b 100644 --- a/controlplane/controller/cmd/controller/main.go +++ b/controlplane/controller/cmd/controller/main.go @@ -222,6 +222,28 @@ func (c *ControllerCommand) Run() error { options = append(options, controller.WithDeviceLocalASN(deviceLocalASN)) + if chAddr := os.Getenv("CLICKHOUSE_ADDR"); chAddr != "" { + chDB := os.Getenv("CLICKHOUSE_DB") + if chDB == "" { + chDB = "default" + } + chUser := os.Getenv("CLICKHOUSE_USER") + if chUser == "" { + chUser = "default" + } + chPass := os.Getenv("CLICKHOUSE_PASS") + chTLSDisabled := os.Getenv("CLICKHOUSE_TLS_DISABLED") == "true" + cw, err := controller.NewClickhouseWriter(log, chAddr, chDB, chUser, chPass, chTLSDisabled) + if err != nil { + log.Error("error creating clickhouse writer", "error", err) + os.Exit(1) + } + options = append(options, controller.WithClickhouse(cw)) + log.Info("clickhouse enabled", "addr", chAddr, "db", chDB, "user", chUser) + } else { + log.Info("clickhouse disabled (CLICKHOUSE_ADDR not set)") + } + if c.noHardware { options = append(options, controller.WithNoHardware()) } diff --git a/controlplane/controller/internal/controller/clickhouse.go b/controlplane/controller/internal/controller/clickhouse.go new file mode 100644 index 000000000..03973480e --- /dev/null +++ b/controlplane/controller/internal/controller/clickhouse.go @@ -0,0 +1,127 @@ +package controller + +import ( + "context" + "crypto/tls" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" +) + +type getConfigEvent struct { + Timestamp time.Time + DevicePubkey string +} + +type ClickhouseWriter struct { + conn clickhouse.Conn + log *slog.Logger + db string + mu sync.Mutex + events []getConfigEvent +} + +func NewClickhouseWriter(log *slog.Logger, addr, db, user, pass string, disableTLS bool) (*ClickhouseWriter, error) { + chOpts := &clickhouse.Options{ + Addr: []string{addr}, + Auth: clickhouse.Auth{ + Database: db, + Username: user, + Password: pass, + }, + } + if !disableTLS { + chOpts.TLS = &tls.Config{} + } + conn, err := clickhouse.Open(chOpts) + if err != nil { + return nil, fmt.Errorf("error opening clickhouse connection: %w", err) + } + if err := conn.Ping(context.Background()); err != nil { + return nil, fmt.Errorf("error pinging clickhouse: %w", err) + } + return &ClickhouseWriter{ + conn: conn, + log: log, + db: db, + }, nil +} + +func (cw *ClickhouseWriter) CreateTable(ctx context.Context) error { + err := cw.conn.Exec(ctx, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.controller_grpc_getconfig_success ( + timestamp DateTime64(3), + device_pubkey LowCardinality(String) + ) ENGINE = MergeTree + PARTITION BY toYYYYMM(timestamp) + ORDER BY (timestamp, device_pubkey) + `, cw.db)) + if err != nil { + return fmt.Errorf("error creating table: %w", err) + } + return nil +} + +func (cw *ClickhouseWriter) Record(event getConfigEvent) { + cw.mu.Lock() + cw.events = append(cw.events, event) + cw.mu.Unlock() +} + +func (cw *ClickhouseWriter) Run(ctx context.Context) { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + cw.flush(context.Background()) + return + case <-ticker.C: + cw.flush(ctx) + } + } +} + +func (cw *ClickhouseWriter) flush(ctx context.Context) { + cw.mu.Lock() + if len(cw.events) == 0 { + cw.mu.Unlock() + return + } + events := cw.events + cw.events = nil + cw.mu.Unlock() + + batch, err := cw.conn.PrepareBatch(ctx, fmt.Sprintf( + `INSERT INTO %s.controller_grpc_getconfig_success (timestamp, device_pubkey)`, cw.db, + )) + if err != nil { + cw.log.Error("error preparing clickhouse batch", "error", err) + return + } + for _, e := range events { + if err := batch.Append(e.Timestamp, e.DevicePubkey); err != nil { + cw.log.Error("error appending to clickhouse batch", "error", err) + } + } + if err := batch.Send(); err != nil { + _ = batch.Close() + cw.log.Error("error sending clickhouse batch", "error", err) + return + } + if err := batch.Close(); err != nil { + cw.log.Error("error closing clickhouse batch", "error", err) + return + } + cw.log.Debug("flushed getconfig events to clickhouse", "count", len(events)) +} + +func (cw *ClickhouseWriter) Close() { + cw.flush(context.Background()) + if err := cw.conn.Close(); err != nil { + cw.log.Error("error closing clickhouse connection", "error", err) + } +} diff --git a/controlplane/controller/internal/controller/server.go b/controlplane/controller/internal/controller/server.go index 02fa19d2b..9b29d4c06 100644 --- a/controlplane/controller/internal/controller/server.go +++ b/controlplane/controller/internal/controller/server.go @@ -69,6 +69,7 @@ type Controller struct { tlsConfig *tls.Config environment string deviceLocalASN uint32 + clickhouse *ClickhouseWriter } type Option func(*Controller) @@ -151,6 +152,12 @@ func WithDeviceLocalASN(asn uint32) Option { } } +func WithClickhouse(cw *ClickhouseWriter) Option { + return func(c *Controller) { + c.clickhouse = cw + } +} + // processDeviceInterfacesAndPeers processes a device's interfaces and extracts BGP peer information. // It returns the candidate VPNv4 and IPv4 BGP peers found from the device's loopback interfaces. func (c *Controller) processDeviceInterfacesAndPeers(device serviceability.Device, d *Device, devicePubKey string) (candidateVpnv4BgpPeer, candidateIpv4BgpPeer BgpPeer) { @@ -526,6 +533,13 @@ func (c *Controller) Run(ctx context.Context) error { http.ListenAndServe("127.0.0.1:2112", mux) //nolint }() + if c.clickhouse != nil { + if err := c.clickhouse.CreateTable(ctx); err != nil { + return fmt.Errorf("error creating clickhouse table: %w", err) + } + go c.clickhouse.Run(ctx) + } + // start on-chain fetcher go func() { c.log.Info("starting fetch of on-chain data", "program-id", c.serviceability.ProgramID()) @@ -697,6 +711,12 @@ func (c *Controller) GetConfig(ctx context.Context, req *pb.ConfigRequest) (*pb. resp := &pb.ConfigResponse{Config: config} getConfigMsgSize.Observe(float64(proto.Size(resp))) getConfigDuration.Observe(float64(time.Since(reqStart).Seconds())) + if c.clickhouse != nil { + c.clickhouse.Record(getConfigEvent{ + Timestamp: reqStart, + DevicePubkey: req.GetPubkey(), + }) + } return resp, nil }