-
Notifications
You must be signed in to change notification settings - Fork 1
Write server stats to postgres for catabalancer #1389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,8 @@ package catabalancer | |
|
|
||
| import ( | ||
| "context" | ||
| "database/sql" | ||
| "encoding/json" | ||
| "fmt" | ||
| "math/rand" | ||
| "sort" | ||
|
|
@@ -23,6 +25,7 @@ type CataBalancer struct { | |
| NodeMetrics map[string]NodeMetrics // Node name -> NodeMetrics | ||
| metricTimeout time.Duration | ||
| ingestStreamTimeout time.Duration | ||
| NodeStatsDB *sql.DB | ||
| } | ||
|
|
||
| type Streams map[string]Stream // Stream ID -> Stream | ||
|
|
@@ -79,7 +82,35 @@ func (s ScoredNode) String() string { | |
| ) | ||
| } | ||
|
|
||
| func NewBalancer(nodeName string, metricTimeout time.Duration, ingestStreamTimeout time.Duration) *CataBalancer { | ||
| // JSON representation is deliberately truncated to keep the message size small | ||
| type NodeUpdateEvent struct { | ||
| Resource string `json:"resource,omitempty"` | ||
| NodeID string `json:"n,omitempty"` | ||
| NodeMetrics NodeMetrics `json:"nm,omitempty"` | ||
| Streams string `json:"s,omitempty"` | ||
| } | ||
|
|
||
| func (n *NodeUpdateEvent) SetStreams(streamIDs []string, ingestStreamIDs []string) { | ||
| n.Streams = strings.Join(streamIDs, "|") + "~" + strings.Join(ingestStreamIDs, "|") | ||
| } | ||
|
|
||
| func (n *NodeUpdateEvent) GetStreams() []string { | ||
| before, _, _ := strings.Cut(n.Streams, "~") | ||
| if len(before) > 0 { | ||
| return strings.Split(before, "|") | ||
| } | ||
| return []string{} | ||
| } | ||
|
|
||
| func (n *NodeUpdateEvent) GetIngestStreams() []string { | ||
| _, after, _ := strings.Cut(n.Streams, "~") | ||
| if len(after) > 0 { | ||
| return strings.Split(after, "|") | ||
| } | ||
| return []string{} | ||
| } | ||
|
|
||
| func NewBalancer(nodeName string, metricTimeout time.Duration, ingestStreamTimeout time.Duration, nodeStatsDB *sql.DB) *CataBalancer { | ||
| return &CataBalancer{ | ||
| NodeName: nodeName, | ||
| Nodes: make(map[string]*Node), | ||
|
|
@@ -88,6 +119,7 @@ func NewBalancer(nodeName string, metricTimeout time.Duration, ingestStreamTimeo | |
| NodeMetrics: make(map[string]NodeMetrics), | ||
| metricTimeout: metricTimeout, | ||
| ingestStreamTimeout: ingestStreamTimeout, | ||
| NodeStatsDB: nodeStatsDB, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -134,6 +166,10 @@ func (c *CataBalancer) UpdateMembers(ctx context.Context, members []cluster.Memb | |
| } | ||
|
|
||
| func (c *CataBalancer) GetBestNode(ctx context.Context, redirectPrefixes []string, playbackID, lat, lon, fallbackPrefix string, isStudioReq bool) (string, string, error) { | ||
| if err := c.RefreshNodes(); err != nil { | ||
| return "", "", fmt.Errorf("error refreshing nodes: %w", err) | ||
| } | ||
|
|
||
| var err error | ||
| latf := 0.0 | ||
| if lat != "" { | ||
|
|
@@ -291,7 +327,58 @@ func truncateReturned(scoredNodes []ScoredNode, numNodes int) []ScoredNode { | |
| return scoredNodes[:numNodes] | ||
| } | ||
|
|
||
| func (c *CataBalancer) RefreshNodes() error { | ||
| log.LogNoRequestID("catabalancer refreshing nodes") | ||
| if c.NodeStatsDB == nil { | ||
| return fmt.Errorf("node stats DB was nil") | ||
| } | ||
|
|
||
| query := "SELECT stats FROM node_stats" | ||
| rows, err := c.NodeStatsDB.Query(query) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to query node stats: %w", err) | ||
| } | ||
| defer rows.Close() | ||
|
|
||
| // Process the result set | ||
| for rows.Next() { | ||
| var statsBytes []byte | ||
| if err := rows.Scan(&statsBytes); err != nil { | ||
| return fmt.Errorf("failed to scan node stats row: %w", err) | ||
| } | ||
|
|
||
| var event NodeUpdateEvent | ||
| err = json.Unmarshal(statsBytes, &event) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to unmarshal node update event: %w", err) | ||
| } | ||
|
|
||
| if isStale(event.NodeMetrics.Timestamp, c.metricTimeout) { | ||
| log.LogNoRequestID("catabalancer skipping stale data while refreshing", "nodeID", event.NodeID, "timestamp", event.NodeMetrics.Timestamp) | ||
| continue | ||
| } | ||
|
|
||
| c.UpdateNodes(event.NodeID, event.NodeMetrics) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we actually need to keep any internal state for nodes? If we make a query to DB with every playback redirect request, then couldn't we reason everything from there?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep sorry I totally forgot to mention refactoring that as a plan for another PR, in two minds about whether I should do that in this PR but I think since it's been approved I think I'll just do it now but in a fresh PR. |
||
| for _, stream := range event.GetStreams() { | ||
| c.UpdateStreams(event.NodeID, stream, false) | ||
| } | ||
| for _, stream := range event.GetIngestStreams() { | ||
| c.UpdateStreams(event.NodeID, stream, true) | ||
| } | ||
| } | ||
|
|
||
| // Check for errors after iterating through rows | ||
| if err := rows.Err(); err != nil { | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (c *CataBalancer) MistUtilLoadSource(ctx context.Context, streamID, lat, lon string) (string, error) { | ||
| if err := c.RefreshNodes(); err != nil { | ||
| return "", fmt.Errorf("error refreshing nodes: %w", err) | ||
| } | ||
|
|
||
| c.nodesLock.Lock() | ||
| defer c.nodesLock.Unlock() | ||
| for nodeName := range c.Nodes { | ||
|
|
@@ -318,13 +405,13 @@ func (c *CataBalancer) checkAndCreateNode(nodeName string) { | |
| } | ||
| } | ||
|
|
||
| func (c *CataBalancer) UpdateNodes(id string, nodeMetrics NodeMetrics) { | ||
| func (c *CataBalancer) UpdateNodes(nodeName string, nodeMetrics NodeMetrics) { | ||
| c.nodesLock.Lock() | ||
| defer c.nodesLock.Unlock() | ||
|
|
||
| c.checkAndCreateNode(id) | ||
| c.checkAndCreateNode(nodeName) | ||
| nodeMetrics.Timestamp = time.Now() | ||
| c.NodeMetrics[id] = nodeMetrics | ||
| c.NodeMetrics[nodeName] = nodeMetrics | ||
| } | ||
|
|
||
| var UpdateNodeStatsEvery = 5 * time.Second | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just moved this code from the events package to make the dependencies easier