Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ All notable changes to this project will be documented in this file.
- Add observability to BGP handleUpdate: log withdrawal/NLRI counts per batch and track processing duration via `doublezero_bgp_handle_update_duration_seconds` histogram
- E2E tests
- The QA alldevices test now skips devices that are not calling the controller
- Device controller
- Record successful GetConfig gRPC calls to ClickHouse for device telemetry tracking

## [v0.8.4](https://github.com/malbeclabs/doublezero/compare/client/v0.8.3...client/v0.8.4) – 2026-01-28

Expand Down
22 changes: 22 additions & 0 deletions controlplane/controller/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,28 @@ func (c *ControllerCommand) Run() error {

options = append(options, controller.WithDeviceLocalASN(deviceLocalASN))

if chAddr := os.Getenv("CLICKHOUSE_ADDR"); chAddr != "" {
chDB := os.Getenv("CLICKHOUSE_DB")
if chDB == "" {
chDB = "default"
}
chUser := os.Getenv("CLICKHOUSE_USER")
if chUser == "" {
chUser = "default"
}
chPass := os.Getenv("CLICKHOUSE_PASS")
chTLSDisabled := os.Getenv("CLICKHOUSE_TLS_DISABLED") == "true"
cw, err := controller.NewClickhouseWriter(log, chAddr, chDB, chUser, chPass, chTLSDisabled)
if err != nil {
log.Error("error creating clickhouse writer", "error", err)
os.Exit(1)
}
options = append(options, controller.WithClickhouse(cw))
log.Info("clickhouse enabled", "addr", chAddr, "db", chDB, "user", chUser)
} else {
log.Info("clickhouse disabled (CLICKHOUSE_ADDR not set)")
}

if c.noHardware {
options = append(options, controller.WithNoHardware())
}
Expand Down
127 changes: 127 additions & 0 deletions controlplane/controller/internal/controller/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package controller

import (
"context"
"crypto/tls"
"fmt"
"log/slog"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
)

type getConfigEvent struct {
Timestamp time.Time
DevicePubkey string
}

type ClickhouseWriter struct {
conn clickhouse.Conn
log *slog.Logger
db string
mu sync.Mutex
events []getConfigEvent
}

func NewClickhouseWriter(log *slog.Logger, addr, db, user, pass string, disableTLS bool) (*ClickhouseWriter, error) {
chOpts := &clickhouse.Options{
Addr: []string{addr},
Auth: clickhouse.Auth{
Database: db,
Username: user,
Password: pass,
},
}
if !disableTLS {
chOpts.TLS = &tls.Config{}
}
conn, err := clickhouse.Open(chOpts)
if err != nil {
return nil, fmt.Errorf("error opening clickhouse connection: %w", err)
}
if err := conn.Ping(context.Background()); err != nil {
return nil, fmt.Errorf("error pinging clickhouse: %w", err)
}
return &ClickhouseWriter{
conn: conn,
log: log,
db: db,
}, nil
}

func (cw *ClickhouseWriter) CreateTable(ctx context.Context) error {
err := cw.conn.Exec(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.controller_grpc_getconfig_success (
timestamp DateTime64(3),
device_pubkey LowCardinality(String)
) ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, device_pubkey)
`, cw.db))
if err != nil {
return fmt.Errorf("error creating table: %w", err)
}
return nil
}

func (cw *ClickhouseWriter) Record(event getConfigEvent) {
cw.mu.Lock()
cw.events = append(cw.events, event)
cw.mu.Unlock()
}

func (cw *ClickhouseWriter) Run(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
cw.flush(context.Background())
return
case <-ticker.C:
cw.flush(ctx)
}
}
}

func (cw *ClickhouseWriter) flush(ctx context.Context) {
cw.mu.Lock()
if len(cw.events) == 0 {
cw.mu.Unlock()
return
}
events := cw.events
cw.events = nil
cw.mu.Unlock()

batch, err := cw.conn.PrepareBatch(ctx, fmt.Sprintf(
`INSERT INTO %s.controller_grpc_getconfig_success (timestamp, device_pubkey)`, cw.db,
))
if err != nil {
cw.log.Error("error preparing clickhouse batch", "error", err)
return
}
for _, e := range events {
if err := batch.Append(e.Timestamp, e.DevicePubkey); err != nil {
cw.log.Error("error appending to clickhouse batch", "error", err)
}
}
if err := batch.Send(); err != nil {
_ = batch.Close()
cw.log.Error("error sending clickhouse batch", "error", err)
return
}
if err := batch.Close(); err != nil {
cw.log.Error("error closing clickhouse batch", "error", err)
return
}
cw.log.Debug("flushed getconfig events to clickhouse", "count", len(events))
}

func (cw *ClickhouseWriter) Close() {
cw.flush(context.Background())
if err := cw.conn.Close(); err != nil {
cw.log.Error("error closing clickhouse connection", "error", err)
}
}
20 changes: 20 additions & 0 deletions controlplane/controller/internal/controller/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Controller struct {
tlsConfig *tls.Config
environment string
deviceLocalASN uint32
clickhouse *ClickhouseWriter
}

type Option func(*Controller)
Expand Down Expand Up @@ -151,6 +152,12 @@ func WithDeviceLocalASN(asn uint32) Option {
}
}

func WithClickhouse(cw *ClickhouseWriter) Option {
return func(c *Controller) {
c.clickhouse = cw
}
}

// processDeviceInterfacesAndPeers processes a device's interfaces and extracts BGP peer information.
// It returns the candidate VPNv4 and IPv4 BGP peers found from the device's loopback interfaces.
func (c *Controller) processDeviceInterfacesAndPeers(device serviceability.Device, d *Device, devicePubKey string) (candidateVpnv4BgpPeer, candidateIpv4BgpPeer BgpPeer) {
Expand Down Expand Up @@ -526,6 +533,13 @@ func (c *Controller) Run(ctx context.Context) error {
http.ListenAndServe("127.0.0.1:2112", mux) //nolint
}()

if c.clickhouse != nil {
if err := c.clickhouse.CreateTable(ctx); err != nil {
return fmt.Errorf("error creating clickhouse table: %w", err)
}
go c.clickhouse.Run(ctx)
}

// start on-chain fetcher
go func() {
c.log.Info("starting fetch of on-chain data", "program-id", c.serviceability.ProgramID())
Expand Down Expand Up @@ -697,6 +711,12 @@ func (c *Controller) GetConfig(ctx context.Context, req *pb.ConfigRequest) (*pb.
resp := &pb.ConfigResponse{Config: config}
getConfigMsgSize.Observe(float64(proto.Size(resp)))
getConfigDuration.Observe(float64(time.Since(reqStart).Seconds()))
if c.clickhouse != nil {
c.clickhouse.Record(getConfigEvent{
Timestamp: reqStart,
DevicePubkey: req.GetPubkey(),
})
}
return resp, nil
}

Expand Down