Skip to content

Commit 0c9dfff

Browse files
committed
unbuffer
1 parent 964932b commit 0c9dfff

File tree

2 files changed

+24
-6
lines changed

2 files changed

+24
-6
lines changed

api_v1.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,17 @@ func v1_claim_task(c *gin.Context) {
115115
return
116116
}
117117
c.JSON(200, task)
118-
archiveEventChan <- ArchiveEvent{
118+
119+
select {
120+
case archiveEventChan <- ArchiveEvent{
119121
ProjectID: project.Meta.Identifier,
120122
Archivist: archivist,
121123
Message: fmt.Sprintf("claimed task:%v", (*task)["id"]),
122124
Tasks: 1,
123125
Archived: false,
126+
}:
127+
default:
128+
fmt.Println("archiveEventChan is full, dropping event")
124129
}
125130
}
126131

@@ -194,12 +199,16 @@ func v1_update_task(c *gin.Context) {
194199
"msg": "Task updated successfully",
195200
})
196201

197-
archiveEventChan <- ArchiveEvent{
202+
select {
203+
case archiveEventChan <- ArchiveEvent{
198204
ProjectID: project.Meta.Identifier,
199205
Archivist: archivist,
200206
Message: fmt.Sprintf("set task:%s to status:%s", task_id_str, status),
201207
Tasks: 1,
202208
Archived: false,
209+
}:
210+
default:
211+
fmt.Println("archiveEventChan is full, dropping event")
203212
}
204213
}
205214

@@ -306,12 +315,16 @@ func v1_insert_many(c *gin.Context) {
306315
"Labels": len(bulkWriteException.Labels),
307316
"WriteConcernError": bulkWriteException.WriteConcernError,
308317
})
309-
archiveEventChan <- ArchiveEvent{
318+
319+
select {
320+
case archiveEventChan <- ArchiveEvent{
310321
ProjectID: project.Meta.Identifier,
311322
Archivist: archivist,
312323
Archived: true,
313324
Message: fmt.Sprintf("tried %d items, inserted %d items", len(topItems), len(result.InsertedIDs)),
314-
Tasks: len(result.InsertedIDs),
325+
}:
326+
default:
327+
fmt.Println("archiveEventChan is full, dropping event")
315328
}
316329
}
317330

@@ -430,11 +443,16 @@ func v1_insert_item(c *gin.Context) {
430443
"_id": result.InsertedID,
431444
"msg": "Item inserted successfully",
432445
})
433-
archiveEventChan <- ArchiveEvent{
446+
447+
select {
448+
case archiveEventChan <- ArchiveEvent{
434449
ProjectID: project.Meta.Identifier,
435450
Archivist: archivist,
436451
Archived: true,
437452
Message: fmt.Sprintf("inserted item:%v", document[ID]),
438453
Tasks: 1,
454+
}:
455+
default:
456+
fmt.Println("archiveEventChan is full, dropping event")
439457
}
440458
}

streaming.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func sse_stream(c *gin.Context) {
7272
return
7373
}
7474

75-
returnsChan := make(chan valkey.PubSubMessage)
75+
returnsChan := make(chan valkey.PubSubMessage, 100)
7676

7777
go func() {
7878
defer close(returnsChan)

0 commit comments

Comments
 (0)