Skip to content

Commit 1c21ada

Browse files
committed
feat: sse stats
1 parent b9e0a86 commit 1c21ada

File tree

4 files changed

+334
-0
lines changed

4 files changed

+334
-0
lines changed

api_v1.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,13 @@ func v1_claim_task(c *gin.Context) {
115115
return
116116
}
117117
c.JSON(200, task)
118+
archiveEventChan <- ArchiveEvent{
119+
ProjectID: project.Meta.Identifier,
120+
Archivist: archivist,
121+
Message: fmt.Sprintf("claimed task:%v", (*task)["id"]),
122+
Tasks: 1,
123+
Archived: false,
124+
}
118125
}
119126

120127
func v1_update_task(c *gin.Context) {
@@ -186,6 +193,14 @@ func v1_update_task(c *gin.Context) {
186193
"_id": updated_doc["_id"],
187194
"msg": "Task updated successfully",
188195
})
196+
197+
archiveEventChan <- ArchiveEvent{
198+
ProjectID: project.Meta.Identifier,
199+
Archivist: archivist,
200+
Message: fmt.Sprintf("set task:%s to status:%s", task_id_str, status),
201+
Tasks: 1,
202+
Archived: false,
203+
}
189204
}
190205

191206
func v1_insert_many(c *gin.Context) {
@@ -291,6 +306,13 @@ func v1_insert_many(c *gin.Context) {
291306
"Labels": len(bulkWriteException.Labels),
292307
"WriteConcernError": bulkWriteException.WriteConcernError,
293308
})
309+
archiveEventChan <- ArchiveEvent{
310+
ProjectID: project.Meta.Identifier,
311+
Archivist: archivist,
312+
Archived: true,
313+
Message: fmt.Sprintf("tried %d items, inserted %d items", len(topItems), len(result.InsertedIDs)),
314+
Tasks: len(result.InsertedIDs),
315+
}
294316
}
295317

296318
func v1_insert_item(c *gin.Context) {
@@ -408,4 +430,11 @@ func v1_insert_item(c *gin.Context) {
408430
"_id": result.InsertedID,
409431
"msg": "Item inserted successfully",
410432
})
433+
archiveEventChan <- ArchiveEvent{
434+
ProjectID: project.Meta.Identifier,
435+
Archivist: archivist,
436+
Archived: true,
437+
Message: fmt.Sprintf("inserted item:%v", document[ID]),
438+
Tasks: 1,
439+
}
411440
}

main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func init() {
2525

2626
func main() {
2727
r := gin.Default()
28+
r.LoadHTMLGlob("templates/*")
2829
r.Use(gzip.Gzip(gzip.NoCompression, gzip.WithDecompressOnly(), gzip.WithDecompressFn(gzip.DefaultDecompressHandle)))
2930

3031
r.GET("/ping", ping)
@@ -54,5 +55,11 @@ func main() {
5455
adminApi.DELETE("/banlist/:type/:id", admin_unbanlist)
5556
}
5657

58+
panel := r.Group("/stats")
59+
{
60+
panel.GET("/live/:identifier", stats_panel)
61+
panel.GET("/live/:identifier/sse", SSEHeadersMiddleware(), sse_stream)
62+
}
63+
5764
r.Run()
5865
}

streaming.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"io"
7+
"log"
8+
9+
"github.com/gin-gonic/gin"
10+
"github.com/valkey-io/valkey-go"
11+
)
12+
13+
func SSEHeadersMiddleware() gin.HandlerFunc {
14+
return func(c *gin.Context) {
15+
c.Writer.Header().Set("Content-Type", "text/event-stream")
16+
c.Writer.Header().Set("Cache-Control", "no-cache")
17+
c.Writer.Header().Set("Connection", "keep-alive")
18+
c.Writer.Header().Set("Transfer-Encoding", "chunked")
19+
c.Next()
20+
}
21+
}
22+
23+
type ArchiveEvent struct {
24+
ProjectID string `json:"project_id"`
25+
Tasks int `json:"tasks"`
26+
Archived bool `json:"archived"` // whether the event is an archive completion
27+
Archivist string `json:"archivist"`
28+
Message string `json:"message"`
29+
}
30+
31+
var archiveEventChan = make(chan ArchiveEvent, 100)
32+
33+
func init() {
34+
go eventDispatcher()
35+
}
36+
37+
func eventDispatcher() {
38+
for event := range archiveEventChan {
39+
go publishArchiveEvent(event)
40+
}
41+
}
42+
43+
func publishArchiveEvent(event ArchiveEvent) {
44+
eventStr, err := json.Marshal(event)
45+
if err != nil {
46+
log.Println("Error marshalling archive event:", err)
47+
return
48+
}
49+
if err := valkeyClient.Do(context.Background(), valkeyClient.B().Publish().Channel("sse-"+event.ProjectID).Message(string(eventStr)).Build()).Error(); err != nil {
50+
log.Println("Error publishing archive event:", err)
51+
}
52+
}
53+
54+
func stats_panel(c *gin.Context) {
55+
identifier := c.Param("identifier")
56+
// project := GetProject(identifier)
57+
// if !project.Status.Public {
58+
// c.JSON(403, gin.H{"error": "Project is not public"})
59+
// return
60+
// }
61+
c.HTML(200, "stats.tmpl", gin.H{
62+
"identifier": identifier,
63+
})
64+
}
65+
66+
func sse_stream(c *gin.Context) {
67+
identifier := c.Param("identifier")
68+
returnsChan := make(chan valkey.PubSubMessage)
69+
70+
go func() {
71+
defer close(returnsChan)
72+
valkeyClient.Receive(c,
73+
valkeyClient.B().Subscribe().Channel("sse-"+identifier).Build(), func(msg valkey.PubSubMessage) {
74+
returnsChan <- msg
75+
})
76+
}()
77+
78+
c.Stream(func(w io.Writer) bool {
79+
for msg := range returnsChan {
80+
c.SSEvent("message", msg.Message)
81+
return true
82+
}
83+
return false
84+
})
85+
}

templates/stats.tmpl

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
<!DOCTYPE html>
2+
<html lang="en">
3+
<head>
4+
<meta charset="UTF-8">
5+
<meta name="viewport" content="width=device-width, initial-scale=1.0">
6+
<title>Project Stats - {{.identifier}}</title>
7+
<style>
8+
body {
9+
font-family: Arial, sans-serif;
10+
margin: 20px;
11+
background-color: #f4f4f4;
12+
}
13+
.container {
14+
max-width: 1200px;
15+
margin: 0 auto;
16+
background: white;
17+
padding: 20px;
18+
border-radius: 8px;
19+
box-shadow: 0 0 10px rgba(0,0,0,0.1);
20+
}
21+
h1 {
22+
text-align: center;
23+
color: #333;
24+
}
25+
.stats {
26+
display: flex;
27+
justify-content: space-around;
28+
margin: 20px 0;
29+
flex-wrap: wrap;
30+
}
31+
.stat {
32+
text-align: center;
33+
padding: 20px;
34+
border: 1px solid #ddd;
35+
border-radius: 8px;
36+
background: #f9f9f9;
37+
}
38+
.stat h2 {
39+
margin: 0;
40+
font-size: 2em;
41+
color: #e26331ff;
42+
}
43+
.stat p {
44+
margin: 5px 0 0 0;
45+
color: #666;
46+
}
47+
.events {
48+
margin-top: 30px;
49+
}
50+
.events h2 {
51+
color: #333;
52+
}
53+
#event-log {
54+
border: 1px solid #ddd;
55+
height: 300px;
56+
overflow-y: auto;
57+
padding: 10px;
58+
background: #fafafa;
59+
border-radius: 4px;
60+
}
61+
.event {
62+
margin: 5px 0;
63+
padding: 5px;
64+
border-left: 3px solid #b8590bff;
65+
background: white;
66+
display: flex;
67+
justify-content: space-between;
68+
align-items: center;
69+
}
70+
.event .time {
71+
font-size: 0.8em;
72+
color: #999;
73+
}
74+
.leaderboard {
75+
margin-top: 30px;
76+
}
77+
.leaderboard h2 {
78+
color: #333;
79+
}
80+
.leaderboard ul {
81+
list-style: none;
82+
padding: 0;
83+
}
84+
.leaderboard li {
85+
padding: 10px;
86+
border-bottom: 1px solid #ddd;
87+
border-left: 3px solid;
88+
display: flex;
89+
justify-content: space-between;
90+
}
91+
.leaderboard li:nth-child(odd) {
92+
background: #f9f9f9;
93+
}
94+
</style>
95+
</head>
96+
<body>
97+
<div class="container">
98+
<h1>Project Stats - {{.identifier}}</h1>
99+
<div class="stats">
100+
<div class="stat">
101+
<h2 id="total-tasks">0</h2>
102+
<p>Archived Items</p>
103+
</div>
104+
<div class="stat">
105+
<h2 id="active-archivists">0</h2>
106+
<p>Active Archivists</p>
107+
</div>
108+
<div class="stat">
109+
<h2 id="recent-events">0</h2>
110+
<p>Recent Events</p>
111+
</div>
112+
<div class="stat">
113+
<h2 id="uptime">00:00:00</h2>
114+
<p>Elapsed Time</p>
115+
</div>
116+
</div>
117+
<div class="events">
118+
<h2>Live Events</h2>
119+
<div id="event-log"></div>
120+
</div>
121+
<div class="leaderboard">
122+
<h2>Leaderboard</h2>
123+
<ul id="leaderboard-list"></ul>
124+
</div>
125+
</div>
126+
127+
<script>
128+
const identifier = '{{.identifier}}';
129+
const eventSource = new EventSource(`/stats/live/${identifier}/sse`);
130+
let totalTasks = 0;
131+
let activeArchivists = new Set();
132+
let recentEvents = 0;
133+
let archivistTasks = {};
134+
const startTime = Date.now();
135+
136+
// Uptime timer
137+
setInterval(() => {
138+
const elapsed = Date.now() - startTime;
139+
const seconds = Math.floor(elapsed / 1000) % 60;
140+
const minutes = Math.floor(elapsed / 60000) % 60;
141+
const hours = Math.floor(elapsed / 3600000);
142+
document.getElementById('uptime').textContent =
143+
`${hours.toString().padStart(2, '0')}:${minutes.toString().padStart(2, '0')}:${seconds.toString().padStart(2, '0')}`;
144+
}, 1000);
145+
146+
function getArchivistColor(archivist) {
147+
let hash = 0;
148+
for (let i = 0; i < archivist.length; i++) {
149+
hash = ((hash << 5) - hash) + archivist.charCodeAt(i);
150+
hash = hash & hash; // Convert to 32bit integer
151+
}
152+
const hue = Math.abs(hash) % 360;
153+
return `hsl(${hue}, 70%, 50%)`;
154+
}
155+
156+
function updateLeaderboard() {
157+
const leaderboardList = document.getElementById('leaderboard-list');
158+
const sortedArchivists = Object.entries(archivistTasks)
159+
.sort((a, b) => b[1] - a[1])
160+
.slice(0, 10); // Top 10
161+
162+
leaderboardList.innerHTML = '';
163+
sortedArchivists.forEach(([archivist, tasks]) => {
164+
const li = document.createElement('li');
165+
li.style.borderLeftColor = getArchivistColor(archivist);
166+
li.innerHTML = `<span>${archivist}</span><span>${tasks} items</span>`;
167+
leaderboardList.appendChild(li);
168+
});
169+
}
170+
171+
eventSource.onmessage = function(event) {
172+
const data = JSON.parse(event.data);
173+
console.log('Received event:', data);
174+
175+
// Update total archived tasks
176+
if (data.archived) {
177+
totalTasks += data.tasks;
178+
}
179+
document.getElementById('total-tasks').textContent = totalTasks;
180+
181+
// Update active archivists
182+
activeArchivists.add(data.archivist);
183+
document.getElementById('active-archivists').textContent = activeArchivists.size;
184+
185+
// Update recent events count
186+
recentEvents++;
187+
document.getElementById('recent-events').textContent = recentEvents;
188+
189+
// Update archivist tasks
190+
if (data.archived) {
191+
archivistTasks[data.archivist] = (archivistTasks[data.archivist] || 0) + data.tasks;
192+
updateLeaderboard();
193+
}
194+
195+
// Add event to log
196+
const eventLog = document.getElementById('event-log');
197+
const eventDiv = document.createElement('div');
198+
eventDiv.className = 'event';
199+
eventDiv.style.borderLeftColor = getArchivistColor(data.archivist);
200+
eventDiv.innerHTML = `
201+
<span class="message">${data.archivist}: ${data.message}</span>
202+
<span class="time">${new Date().toLocaleString()}</span>
203+
`;
204+
eventLog.appendChild(eventDiv);
205+
eventLog.scrollTop = eventLog.scrollHeight;
206+
};
207+
208+
eventSource.onerror = function(error) {
209+
console.error('SSE error:', error);
210+
};
211+
</script>
212+
</body>
213+
</html>

0 commit comments

Comments
 (0)