Skip to content

Commit 3abe87b

Browse files
author
Jin Zhou
committed
add compute makespan
1 parent 3036a0d commit 3abe87b

File tree

8 files changed

+68
-30
lines changed

8 files changed

+68
-30
lines changed

taskvine/src/bindings/python3/ndcctools/taskvine/compat/dask_executor.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,10 @@ def get(self, dsk, keys, *,
138138
hoisting_modules=None, # Deprecated, use lib_modules
139139
import_modules=None, # Deprecated, use lib_modules
140140
lazy_transfers=True, # Deprecated, use worker_tranfers
141+
extra_serialize_time_sec=0,
141142
):
142143
try:
144+
self.extra_serialize_time_sec = extra_serialize_time_sec
143145
self.set_property("framework", "dask")
144146
if retries and retries < 1:
145147
raise ValueError("retries should be larger than 0")
@@ -213,10 +215,13 @@ def __call__(self, *args, **kwargs):
213215
return self.get(*args, **kwargs)
214216

215217
def _dask_execute(self, dsk, keys):
218+
216219
indices = {k: inds for (k, inds) in find_dask_keys(keys)}
217220
keys_flatten = indices.keys()
218221

222+
time_start = time.time()
219223
dag = DaskVineDag(dsk, low_memory_mode=self.low_memory_mode, prune_depth=self.prune_depth)
224+
print(f"Time taken to enqueue tasks: {time.time() - time_start:.6f} seconds")
220225
tag = f"dag-{id(dag)}"
221226

222227
# create Library if using 'function-calls' task mode.
@@ -437,7 +442,8 @@ def _enqueue_dask_calls(self, dag, tag, rs, retries, enqueued_calls):
437442
extra_files=self.extra_files,
438443
retries=retries,
439444
worker_transfers=lazy,
440-
wrapper=self.wrapper)
445+
wrapper=self.wrapper,
446+
extra_serialize_time_sec=self.extra_serialize_time_sec)
441447

442448
t.set_priority(priority)
443449
t.set_tag(tag) # tag that identifies this dag
@@ -557,7 +563,9 @@ def __init__(self, m,
557563
env_vars=None,
558564
retries=5,
559565
worker_transfers=False,
560-
wrapper=None):
566+
wrapper=None,
567+
extra_serialize_time_sec=0):
568+
time.sleep(extra_serialize_time_sec)
561569
self._key = key
562570
self._sexpr = sexpr
563571

@@ -658,7 +666,9 @@ def __init__(self, m,
658666
extra_files=None,
659667
retries=5,
660668
worker_transfers=False,
661-
wrapper=None):
669+
wrapper=None,
670+
extra_serialize_time_sec=0):
671+
time.sleep(extra_serialize_time_sec)
662672

663673
self._key = key
664674
self.resources = resources

taskvine/src/graph/dagvine/dagvine.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import signal
1717
import json
1818
import random
19+
import time
1920

2021

2122
def context_loader_func(graph_pkl):
@@ -69,7 +70,7 @@ def __init__(self):
6970
"checkpoint-dir": "./checkpoints",
7071
"checkpoint-fraction": 0,
7172
"progress-bar-update-interval-sec": 0.1,
72-
"time-metrics-filename": "time_metrics.csv",
73+
"time-metrics-filename": 0,
7374
"enable-debug-log": 1,
7475
"auto-recovery": 1,
7576
"max-retry-attempts": 15,
@@ -258,6 +259,8 @@ def create_proxy_library(self, py_graph, vine_graph, hoisting_modules, env_files
258259

259260
def run(self, task_dict, target_keys=[], params={}, hoisting_modules=[], env_files={}, adapt_dask=False):
260261
"""High-level entry point: normalise input, build graphs, ship the library, execute, and return results."""
262+
time_start = time.time()
263+
261264
# first update the params so that they can be used for the following construction
262265
self.update_params(params)
263266

@@ -278,13 +281,18 @@ def run(self, task_dict, target_keys=[], params={}, hoisting_modules=[], env_fil
278281

279282
try:
280283
print(f"=== Library serialized size: {color_text(proxy_library.get_context_size(), 92)} MB")
284+
print(f"Time taken to initialize the graph in Python: {time.time() - time_start:.6f} seconds")
281285
vine_graph.execute()
282286
results = {}
283287
for k in target_keys:
284288
if k not in py_graph.task_dict:
285289
continue
286290
outfile_path = os.path.join(self.param("output-dir"), py_graph.outfile_remote_name[k])
287291
results[k] = TaskOutputWrapper.load_from_path(outfile_path)
292+
makespan_s = round(vine_graph.get_makespan_us() / 1e6, 6)
293+
throughput_tps = round(len(py_graph.task_dict) / makespan_s, 6)
294+
print(f"Makespan: {color_text(makespan_s, 92)} seconds")
295+
print(f"Throughput: {color_text(throughput_tps, 92)} tasks/second")
288296
return results
289297
finally:
290298
try:

taskvine/src/graph/dagvine/vine_graph/vine_graph.c

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -563,13 +563,13 @@ static void print_time_metrics(struct vine_graph *vg, const char *filename)
563563
debug(D_ERROR, "failed to open file %s", filename);
564564
return;
565565
}
566-
fprintf(fp, "node_id,submission_time_us,scheduling_time_us,commit_time_us,execution_time_us,retrieval_time_us,postprocessing_time_us\n");
566+
fprintf(fp, "node_id,submission_time_us,commit_time_us,execution_time_us,retrieval_time_us,postprocessing_time_us\n");
567567

568568
uint64_t nid;
569569
struct vine_node *node;
570570
ITABLE_ITERATE(vg->nodes, nid, node)
571571
{
572-
fprintf(fp, "%" PRIu64 "," TIMESTAMP_FORMAT "," TIMESTAMP_FORMAT "," TIMESTAMP_FORMAT "," TIMESTAMP_FORMAT "," TIMESTAMP_FORMAT "," TIMESTAMP_FORMAT "\n", node->node_id, node->submission_time, node->scheduling_time, node->commit_time, node->execution_time, node->retrieval_time, node->postprocessing_time);
572+
fprintf(fp, "%" PRIu64 "," TIMESTAMP_FORMAT "," TIMESTAMP_FORMAT "," TIMESTAMP_FORMAT "," TIMESTAMP_FORMAT "," TIMESTAMP_FORMAT "\n", node->node_id, node->submission_time, node->commit_time, node->execution_time, node->retrieval_time, node->postprocessing_time);
573573
}
574574
fclose(fp);
575575

@@ -738,7 +738,7 @@ int vine_graph_tune(struct vine_graph *vg, const char *name, const char *value)
738738
vg->progress_bar_update_interval_sec = (val > 0.0) ? val : 0.1;
739739

740740
} else if (strcmp(name, "time-metrics-filename") == 0) {
741-
if (strcmp(value, "0") == 0) {
741+
if (value == NULL || strcmp(value, "0") == 0) {
742742
return 0;
743743
}
744744

@@ -1255,6 +1255,10 @@ struct vine_graph *vine_graph_create(struct vine_manager *q)
12551255
* we can be in control of when to recreate what lost data. */
12561256
vg->auto_recovery = 0;
12571257

1258+
vg->time_first_task_dispatched = UINT64_MAX;
1259+
vg->time_last_task_retrieved = 0;
1260+
vg->makespan_us = 0;
1261+
12581262
return vg;
12591263
}
12601264

@@ -1295,6 +1299,15 @@ void vine_graph_add_dependency(struct vine_graph *vg, uint64_t parent_id, uint64
12951299
return;
12961300
}
12971301

1302+
uint64_t vine_graph_get_makespan_us(const struct vine_graph *vg)
1303+
{
1304+
if (!vg) {
1305+
return 0;
1306+
}
1307+
1308+
return (uint64_t)vg->makespan_us;
1309+
}
1310+
12981311
/**
12991312
* Execute the vine graph. This must be called after all nodes and dependencies are added and the topology metrics are computed.
13001313
* @param vg Reference to the vine graph.
@@ -1305,6 +1318,8 @@ void vine_graph_execute(struct vine_graph *vg)
13051318
return;
13061319
}
13071320

1321+
timestamp_t time_start = timestamp_get();
1322+
13081323
void (*previous_sigint_handler)(int) = signal(SIGINT, handle_sigint);
13091324

13101325
debug(D_VINE, "start executing vine graph");
@@ -1363,6 +1378,9 @@ void vine_graph_execute(struct vine_graph *vg)
13631378
next_failure_threshold = vg->failure_injection_step_percent / 100.0;
13641379
}
13651380

1381+
timestamp_t time_end = timestamp_get();
1382+
printf("Time taken to initialize the graph in C: %.6f seconds\n", (double)(time_end - time_start) / 1e6);
1383+
13661384
struct ProgressBar *pbar = progress_bar_init("Executing Tasks");
13671385
progress_bar_set_update_interval(pbar, vg->progress_bar_update_interval_sec);
13681386

@@ -1425,6 +1443,15 @@ void vine_graph_execute(struct vine_graph *vg)
14251443
continue;
14261444
}
14271445

1446+
/* update time metrics */
1447+
vg->time_first_task_dispatched = MIN(vg->time_first_task_dispatched, task->time_when_commit_end);
1448+
vg->time_last_task_retrieved = MAX(vg->time_last_task_retrieved, task->time_when_retrieval);
1449+
if (vg->time_last_task_retrieved < vg->time_first_task_dispatched) {
1450+
debug(D_ERROR, "task %d time_last_task_retrieved < time_first_task_dispatched: %" PRIu64 " < %" PRIu64, task->task_id, vg->time_last_task_retrieved, vg->time_first_task_dispatched);
1451+
vg->time_last_task_retrieved = vg->time_first_task_dispatched;
1452+
}
1453+
vg->makespan_us = vg->time_last_task_retrieved - vg->time_first_task_dispatched;
1454+
14281455
/* if the outfile is set to save on the sharedfs, stat to get the size of the file */
14291456
switch (node->outfile_type) {
14301457
case NODE_OUTFILE_TYPE_SHARED_FILE_SYSTEM: {
@@ -1450,10 +1477,9 @@ void vine_graph_execute(struct vine_graph *vg)
14501477
* Only the first completion should advance the "User" progress. */
14511478
int first_completion = !node->completed;
14521479
node->completed = 1;
1453-
node->scheduling_time = task->time_when_scheduling_end - task->time_when_scheduling_start;
14541480
node->commit_time = task->time_when_commit_end - task->time_when_commit_start;
14551481
node->execution_time = task->time_workers_execute_last;
1456-
node->retrieval_time = task->time_when_get_result_end - task->time_when_get_result_start;
1482+
node->retrieval_time = task->time_when_done - task->time_when_retrieval;
14571483

14581484
/* prune nodes on task completion */
14591485
prune_ancestors_of_node(vg, node);

taskvine/src/graph/dagvine/vine_graph/vine_graph.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "set.h"
1212
#include "vine_node.h"
1313
#include "taskvine.h"
14+
#include "timestamp.h"
1415

1516
/** The task priority algorithm used for vine graph scheduling. */
1617
typedef enum {
@@ -72,6 +73,10 @@ struct vine_graph {
7273

7374
int max_retry_attempts; /* the maximum number of times to retry a task */
7475
double retry_interval_sec; /* the interval between retries in seconds, 0 means no retry interval */
76+
77+
timestamp_t time_first_task_dispatched; /* the time when the first task is dispatched */
78+
timestamp_t time_last_task_retrieved; /* the time when the last task is retrieved */
79+
timestamp_t makespan_us; /* the makespan of the vine graph in microseconds */
7580
};
7681

7782
/* Public APIs for operating the vine graph */
@@ -171,4 +176,10 @@ void vine_graph_set_proxy_function_name(struct vine_graph *vg, const char *proxy
171176
*/
172177
int vine_graph_tune(struct vine_graph *vg, const char *name, const char *value);
173178

179+
/** Get the makespan of the vine graph in microseconds.
180+
@param vg Reference to the vine graph.
181+
@return The makespan in microseconds.
182+
*/
183+
uint64_t vine_graph_get_makespan_us(const struct vine_graph *vg);
184+
174185
#endif // VINE_GRAPH_H

taskvine/src/graph/dagvine/vine_graph/vine_graph_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ def execute(self):
8484
"""Kick off execution; runs through SWIG down into the C orchestration loop."""
8585
vine_graph_capi.vine_graph_execute(self._c_graph)
8686

87+
def get_makespan_us(self):
88+
"""Get the makespan of the vine graph in microseconds."""
89+
return vine_graph_capi.vine_graph_get_makespan_us(self._c_graph)
90+
8791
def delete(self):
8892
"""Release the C resources and clear the client."""
8993
vine_graph_capi.vine_graph_delete(self._c_graph)

taskvine/src/manager/vine_manager.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -595,8 +595,6 @@ static vine_result_code_t get_completion_result(struct vine_manager *q, struct v
595595
return VINE_SUCCESS;
596596
}
597597

598-
t->time_when_get_result_start = timestamp_get();
599-
600598
if (task_status != VINE_RESULT_SUCCESS) {
601599
w->last_failure_time = timestamp_get();
602600
t->time_when_last_failure = w->last_failure_time;

taskvine/src/manager/vine_task.c

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,9 @@ struct vine_task *vine_task_create(const char *command_line)
8989

9090
void vine_task_clean(struct vine_task *t)
9191
{
92-
t->time_when_scheduling_start = 0;
93-
t->time_when_scheduling_end = 0;
94-
9592
t->time_when_commit_start = 0;
9693
t->time_when_commit_end = 0;
9794

98-
t->time_when_get_result_start = 0;
99-
t->time_when_get_result_end = 0;
100-
10195
t->time_when_retrieval = 0;
10296
t->time_when_done = 0;
10397

@@ -161,15 +155,9 @@ void vine_task_reset(struct vine_task *t)
161155
t->time_workers_execute_exhaustion = 0;
162156
t->time_workers_execute_failure = 0;
163157

164-
t->time_when_scheduling_start = 0;
165-
t->time_when_scheduling_end = 0;
166-
167158
t->time_when_commit_start = 0;
168159
t->time_when_commit_end = 0;
169160

170-
t->time_when_get_result_start = 0;
171-
t->time_when_get_result_end = 0;
172-
173161
rmsummary_delete(t->resources_measured);
174162
rmsummary_delete(t->resources_allocated);
175163
t->resources_measured = rmsummary_create(-1);

taskvine/src/manager/vine_task.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,20 +117,13 @@ struct vine_task {
117117
timestamp_t time_when_submitted; /**< The time at which this task was added to the queue. */
118118
timestamp_t time_when_done; /**< The time at which the task is mark as retrieved, after transfering output files and other final processing. */
119119

120-
timestamp_t time_when_scheduling_start; /**< The time when the task starts to be considered for scheduling. */
121-
timestamp_t time_when_scheduling_end; /**< The time when the task is mapped to a worker and ready to be committed. */
122-
123120
timestamp_t time_when_commit_start; /**< The time when the task starts to be transfered to a worker. */
124121
timestamp_t time_when_commit_end; /**< The time when the task is completely transfered to a worker. */
125122

126-
timestamp_t time_when_get_result_start; /**< The time when the task starts to get the result from the worker. */
127-
timestamp_t time_when_get_result_end; /**< The time when the task gets the result from the worker. */
128-
129123
timestamp_t time_when_retrieval; /**< The time when output files start to be transfered back to the manager. time_done - time_when_retrieval is the time taken to transfer output files. */
130124

131125
timestamp_t time_when_last_failure; /**< If larger than 0, the time at which the last task failure was detected. */
132126

133-
134127
timestamp_t time_workers_execute_last_start; /**< The time when the last complete execution for this task started at a worker. */
135128
timestamp_t time_workers_execute_last_end; /**< The time when the last complete execution for this task ended at a worker. */
136129

0 commit comments

Comments
 (0)