Skip to content

Commit 032049b

Browse files
committed
Add asCompleted
1 parent ff98750 commit 032049b

File tree

2 files changed

+109
-5
lines changed

2 files changed

+109
-5
lines changed

src/asynchronous/locks.d

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ final class Lock
102102
auto waiter = new Waiter(eventLoop);
103103
scope (exit)
104104
{
105-
waiters.remove!(w => w == waiter);
105+
waiters = waiters.remove!(w => w is waiter);
106106
}
107107

108108
waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep);
@@ -220,7 +220,7 @@ final class Event
220220
auto waiter = new Waiter(eventLoop);
221221
scope (exit)
222222
{
223-
waiters.remove!(w => w == waiter);
223+
waiters = waiters.remove!(w => w is waiter);
224224
}
225225

226226
waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep);
@@ -370,7 +370,7 @@ final class Condition
370370
auto waiter = new Waiter(eventLoop);
371371
scope (exit)
372372
{
373-
waiters.remove!(w => w == waiter);
373+
waiters = waiters.remove!(w => w is waiter);
374374
}
375375

376376
waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep);
@@ -459,7 +459,7 @@ class Semaphore
459459
auto waiter = new Waiter(eventLoop);
460460
scope (exit)
461461
{
462-
waiters.remove!(w => w == waiter);
462+
waiters = waiters.remove!(w => w is waiter);
463463
}
464464

465465
waiter.addDoneCallback(&TaskHandle.currentTask(eventLoop).scheduleStep);

src/asynchronous/tasks.d

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ package class TaskRepository
6767
}
6868
body
6969
{
70-
tasks[eventLoop].remove!(a => a is taskHandle);
70+
tasks[eventLoop] = tasks[eventLoop].remove!(a => a is taskHandle);
7171
}
7272
}
7373

@@ -372,6 +372,110 @@ class Task(Coroutine, Args...) : Future!(ReturnType!Coroutine), TaskHandle
372372
}
373373
}
374374

375+
/**
376+
* Return an generator whose values, when waited for, are Future instances in
377+
* the order in which and as soon as they complete.
378+
*
379+
* Raises $(D_PSYMBOL TimeoutException) if the timeout occurs before all futures
380+
* are done.
381+
*
382+
* Example:
383+
*
384+
* foreach (f; getEventLoop.asCompleted(fs))
385+
* // use f.result
386+
*
387+
* Note: The futures $(D_PSYMBOL f) are not necessarily members of $(D_PSYMBOL
388+
* fs).
389+
*/
390+
auto asCompleted(EventLoop eventLoop, FutureHandle[] futures,
391+
Duration timeout = Duration.zero)
392+
in
393+
{
394+
futures.all!"a !is null";
395+
}
396+
body
397+
{
398+
if (eventLoop is null)
399+
eventLoop = getEventLoop;
400+
401+
import asynchronous.queues : Queue;
402+
auto done = new Queue!FutureHandle;
403+
CallbackHandle timeoutCallback = null;
404+
Tuple!(FutureHandle, void delegate())[] todo;
405+
406+
foreach (future; futures)
407+
{
408+
void delegate() doneCallback = (f => {
409+
if (todo.empty)
410+
return; // timeoutCallback was here first
411+
todo = todo.remove!(a => a[0] is f);
412+
done.putNowait(f);
413+
if (todo.empty && timeoutCallback !is null)
414+
timeoutCallback.cancel;
415+
})(future); // workaround bug #2043
416+
417+
todo ~= tuple(future, doneCallback);
418+
future.addDoneCallback(doneCallback);
419+
}
420+
421+
if (!todo.empty && timeout > Duration.zero)
422+
{
423+
timeoutCallback = eventLoop.callLater(timeout, {
424+
foreach (future_callback; todo)
425+
{
426+
future_callback[0].removeDoneCallback(future_callback[1]);
427+
future_callback[0].cancel;
428+
}
429+
if (!todo.empty)
430+
done.putNowait(null);
431+
todo = null;
432+
});
433+
}
434+
435+
return new GeneratorTask!FutureHandle(eventLoop, {
436+
while (!todo.empty || !done.empty)
437+
{
438+
auto f = done.get;
439+
if (f is null)
440+
throw new TimeoutException;
441+
yieldValue(f);
442+
}
443+
});
444+
}
445+
446+
unittest
447+
{
448+
auto eventLoop = getEventLoop;
449+
450+
auto task1 = eventLoop.createTask({
451+
eventLoop.sleep(3.msecs);
452+
});
453+
auto task2 = eventLoop.createTask({
454+
eventLoop.sleep(2.msecs);
455+
});
456+
auto testTask = eventLoop.createTask({
457+
auto tasks = asCompleted(eventLoop, [task1, task2]).array;
458+
459+
assert(tasks[0] is cast(FutureHandle) task2);
460+
assert(tasks[1] is cast(FutureHandle) task1);
461+
});
462+
eventLoop.runUntilComplete(testTask);
463+
464+
task1 = eventLoop.createTask({
465+
eventLoop.sleep(10.msecs);
466+
});
467+
task2 = eventLoop.createTask({
468+
eventLoop.sleep(2.msecs);
469+
});
470+
testTask = eventLoop.createTask({
471+
auto tasks = asCompleted(eventLoop, [task1, task2], 5.msecs).array;
472+
473+
assert(tasks.length == 1);
474+
assert(tasks[0] is cast(FutureHandle) task2);
475+
});
476+
eventLoop.runUntilComplete(testTask);
477+
}
478+
375479
/**
376480
* Schedule the execution of a coroutine: wrap it in a future. Return a
377481
* $(D_PSYMBOL Task) object.

0 commit comments

Comments
 (0)