Skip to content

Commit bb3553a

Browse files
committed
Avoid ThreadMessageQueue callback clashes when emptying queue
If a callback cleans up the queue, the next while-trypop loop could be problematic This should also lessen mutex contention slightly
1 parent af405a8 commit bb3553a

File tree

2 files changed

+17
-4
lines changed

2 files changed

+17
-4
lines changed

gf16/threadqueue.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,19 @@ class ThreadMessageQueue {
125125
return notEmpty;
126126
}
127127

128+
// NOTE: unlike 'pop', this doesn't wait - it's more like trypop
129+
std::vector<T> popall() {
130+
std::vector<T> copy;
131+
mutex_lock(mutex);
132+
copy.reserve(q.size());
133+
while(!q.empty()) {
134+
copy.emplace_back(std::move(q.front()));
135+
q.pop();
136+
}
137+
mutex_unlock(mutex);
138+
return copy;
139+
}
140+
128141
size_t size() const {
129142
mutex_lock(mutex);
130143
size_t s = q.size();
@@ -159,8 +172,8 @@ class ThreadNotifyQueue {
159172
#endif
160173
) {
161174
auto self = static_cast<ThreadNotifyQueue*>(handle->data);
162-
void* notification;
163-
while(self->q.trypop(&notification))
175+
auto notifications = self->q.popall();
176+
for(void* notification : notifications)
164177
(self->o->*(self->cb))(notification);
165178
}
166179
public:

src/gf.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -956,8 +956,8 @@ class HasherInput : public node::ObjectWrap {
956956
}
957957
}
958958
void after_process() {
959-
struct input_work_data* data;
960-
while(hashesDone.trypop(&data)) {
959+
auto doneData = hashesDone.popall();
960+
for(struct input_work_data* data : doneData) {
961961
static_cast<HasherInput*>(data->self)->queueCount--;
962962
data->cb->call();
963963
delete data->cb;

0 commit comments

Comments
 (0)