Skip to content

Commit a09c990

Browse files
merge vine_task.c
1 parent 872de31 commit a09c990

File tree

9 files changed

+217
-2
lines changed

9 files changed

+217
-2
lines changed

taskvine/src/manager/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ SOURCES = \
2727
vine_current_transfers.c \
2828
vine_file_replica_table.c \
2929
vine_fair.c \
30-
vine_runtime_dir.c
30+
vine_runtime_dir.c \
31+
vine_task_groups.c
3132

3233
PUBLIC_HEADERS = taskvine.h
3334

taskvine/src/manager/taskvine.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ typedef enum {
7272
VINE_SCHEDULE_UNSET = 0, /**< Internal use only. */
7373
VINE_SCHEDULE_FCFS, /**< Select worker on a first-come-first-serve basis. */
7474
VINE_SCHEDULE_FILES, /**< Select worker that has the most data required by the task. (default) */
75+
VINE_SCHEDULE_GROUPS, /**< Select a worker running a task in the task group */
7576
VINE_SCHEDULE_TIME, /**< Select worker that has the fastest execution time on previous tasks. */
7677
VINE_SCHEDULE_RAND, /**< Select a random worker. */
7778
VINE_SCHEDULE_WORST /**< Select the worst fit worker (the worker with more unused resources). */

taskvine/src/manager/vine_manager.c

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ See the file COPYING for details.
2424
#include "vine_runtime_dir.h"
2525
#include "vine_schedule.h"
2626
#include "vine_task.h"
27+
#include "vine_task_groups.h"
2728
#include "vine_task_info.h"
2829
#include "vine_taskgraph_log.h"
2930
#include "vine_txn_log.h"
@@ -2709,6 +2710,41 @@ static vine_result_code_t start_one_task(struct vine_manager *q, struct vine_wor
27092710
return result;
27102711
}
27112712

2713+
/*
2714+
Start one task on a given worker by specializing the task to the worker,
2715+
sending the appropriate input files, and then sending the details of the task.
2716+
Note that the "infile" and "outfile" components of the task refer to
2717+
files that have already been uploaded into the worker's cache by the manager.
2718+
*/
2719+
2720+
static vine_result_code_t start_group_task(
2721+
struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t, struct list *l)
2722+
{
2723+
vine_result_code_t result = 0;
2724+
struct vine_task *lt;
2725+
LIST_ITERATE(l, lt)
2726+
{
2727+
struct rmsummary *limits = vine_manager_choose_resources_for_task(q, w, t);
2728+
char *command_line;
2729+
2730+
if (q->monitor_mode && !t->needs_library) {
2731+
command_line = vine_monitor_wrap(q, w, t, limits);
2732+
} else {
2733+
command_line = xxstrdup(t->command_line);
2734+
}
2735+
result = vine_manager_put_task(q, w, lt, command_line, limits, 0);
2736+
free(command_line);
2737+
if (result == VINE_SUCCESS) {
2738+
t->current_resource_box = limits;
2739+
rmsummary_merge_override_basic(t->resources_allocated, limits);
2740+
debug(D_VINE, "%s (%s) busy on group '%s'", w->hostname, w->addrport, t->command_line);
2741+
} else {
2742+
rmsummary_delete(limits);
2743+
}
2744+
}
2745+
return result;
2746+
}
2747+
27122748
static void count_worker_resources(struct vine_manager *q, struct vine_worker_info *w)
27132749
{
27142750
w->resources->cores.inuse = 0;
@@ -2816,7 +2852,16 @@ static vine_result_code_t commit_task_to_worker(struct vine_manager *q, struct v
28162852
t->addrport = xxstrdup(w->addrport);
28172853

28182854
t->time_when_commit_start = timestamp_get();
2819-
vine_result_code_t result = start_one_task(q, w, t);
2855+
vine_result_code_t result;
2856+
struct list *l = 0;
2857+
if (t->group_id) {
2858+
l = hash_table_lookup(q->task_group_table, t->group_id);
2859+
}
2860+
if (l && list_size(l) > 1) {
2861+
result = start_group_task(q, w, t, l);
2862+
} else {
2863+
result = start_one_task(q, w, t);
2864+
}
28202865
t->time_when_commit_end = timestamp_get();
28212866

28222867
itable_insert(w->current_tasks, t->task_id, t);
@@ -3779,6 +3824,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
37793824

37803825
q->factory_table = hash_table_create(0, 0);
37813826
q->current_transfer_table = hash_table_create(0, 0);
3827+
q->task_group_table = hash_table_create(0, 0);
37823828
q->fetch_factory = 0;
37833829

37843830
q->measured_local_resources = rmsummary_create(-1);
@@ -4525,6 +4571,9 @@ int vine_submit(struct vine_manager *q, struct vine_task *t)
45254571
/* Ensure category structure is created. */
45264572
vine_category_lookup_or_create(q, t->category);
45274573

4574+
/* Attemp to group this task based on temp dependencies. */
4575+
vine_task_groups_assign_task(q, t);
4576+
45284577
change_task_state(q, t, VINE_TASK_READY);
45294578

45304579
t->time_when_submitted = timestamp_get();

taskvine/src/manager/vine_manager.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ struct vine_manager {
117117
struct hash_table *factory_table; /* Maps factory_name -> vine_factory_info */
118118
struct hash_table *workers_with_available_results; /* Maps link -> vine_worker_info */
119119
struct hash_table *current_transfer_table; /* Maps uuid -> struct transfer_pair */
120+
struct hash_table *task_group_table; /* Maps uuid -> list vine_task */
120121

121122
/* Primary data structures for tracking files. */
122123

taskvine/src/manager/vine_schedule.c

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,61 @@ static struct vine_worker_info *find_worker_by_files(struct vine_manager *q, str
303303
return best_worker;
304304
}
305305

306+
/*
307+
Find the worker that has the largest quantity of cached data needed
308+
by this task, so as to minimize transfer work that must be done
309+
by the manager.
310+
*/
311+
312+
static struct vine_worker_info *find_worker_by_task_groups(struct vine_manager *q, struct vine_task *t)
313+
{
314+
char *key;
315+
struct vine_worker_info *w;
316+
struct vine_worker_info *best_worker = 0;
317+
int offset_bookkeep;
318+
int64_t most_task_cached_bytes = 0;
319+
int64_t task_cached_bytes;
320+
uint8_t has_all_files;
321+
struct vine_file_replica *replica;
322+
struct vine_mount *m;
323+
324+
int ramp_down = vine_schedule_in_ramp_down(q);
325+
326+
HASH_TABLE_ITERATE_RANDOM_START(q->worker_table, offset_bookkeep, key, w)
327+
{
328+
/* Careful: If check_worker_against task fails, then w may no longer be valid. */
329+
if (check_worker_against_task(q, w, t)) {
330+
task_cached_bytes = 0;
331+
has_all_files = 1;
332+
333+
LIST_ITERATE(t->input_mounts, m)
334+
{
335+
replica = hash_table_lookup(w->current_files, m->file->cached_name);
336+
337+
if (replica && m->file->type == VINE_FILE) {
338+
task_cached_bytes += replica->size;
339+
} else if (m->file->cache_level > VINE_CACHE_LEVEL_TASK) {
340+
has_all_files = 0;
341+
}
342+
}
343+
344+
/* Return the worker if it was in possession of all cacheable files */
345+
if (has_all_files && !ramp_down) {
346+
return w;
347+
}
348+
349+
if (!best_worker || task_cached_bytes > most_task_cached_bytes ||
350+
(ramp_down && task_cached_bytes == most_task_cached_bytes &&
351+
candidate_has_worse_fit(best_worker, w))) {
352+
best_worker = w;
353+
most_task_cached_bytes = task_cached_bytes;
354+
}
355+
}
356+
}
357+
358+
return best_worker;
359+
}
360+
306361
/*
307362
Find the first available worker in first-come, first-served order.
308363
Since the order of workers in the hashtable is somewhat arbitrary,
@@ -444,6 +499,8 @@ struct vine_worker_info *vine_schedule_task_to_worker(struct vine_manager *q, st
444499
return find_worker_by_worst_fit(q, t);
445500
case VINE_SCHEDULE_FCFS:
446501
return find_worker_by_fcfs(q, t);
502+
case VINE_SCHEDULE_GROUPS:
503+
return find_worker_by_task_groups(q, t);
447504
case VINE_SCHEDULE_RAND:
448505
default:
449506
return find_worker_by_random(q, t);

taskvine/src/manager/vine_task.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ struct vine_task *vine_task_create(const char *command_line)
7575
t->refcount = 1;
7676

7777
vine_counters.task.created++;
78+
t->group_id = 0;
7879

7980
return t;
8081
}
@@ -248,6 +249,11 @@ struct vine_task *vine_task_copy(const struct vine_task *task)
248249
new->resources_requested = rmsummary_copy(task->resources_requested, 0);
249250
}
250251

252+
/* Group ID is copied. */
253+
if (task->group_id) {
254+
new->group_id = strdup(task->group_id);
255+
}
256+
251257
return new;
252258
}
253259

taskvine/src/manager/vine_task.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ End user may only use the API described in taskvine.h
1818

1919
#include "list.h"
2020
#include "category.h"
21+
#include "uuid.h"
2122

2223
#include <stdint.h>
2324

@@ -118,6 +119,7 @@ struct vine_task {
118119

119120
int has_fixed_locations; /**< Whether at least one file was added with the VINE_FIXED_LOCATION flag. Task fails immediately if no
120121
worker can satisfy all the strict inputs of the task. */
122+
char *group_id;
121123

122124
int refcount; /**< Number of remaining references to this object. */
123125
};
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
Copyright (C) 2022- The University of Notre Dame
3+
This software is distributed under the GNU General Public License.
4+
See the file COPYING for details.
5+
*/
6+
7+
#include "vine_task_groups.h"
8+
#include "debug.h"
9+
#include "vine_mount.h"
10+
#include "vine_task.h"
11+
12+
// create a new task group for this task based on the temp mount file
13+
static int vine_task_groups_create_group(struct vine_manager *q, struct vine_task *t, struct vine_mount *m)
14+
{
15+
cctools_uuid_t uuid;
16+
cctools_uuid_create(&uuid);
17+
char *id = strdup(uuid.str);
18+
struct list *l = list_create();
19+
20+
t->group_id = id;
21+
22+
struct vine_task *tc = vine_task_copy(t);
23+
24+
list_push_head(l, tc);
25+
hash_table_insert(q->task_group_table, id, l);
26+
return 1;
27+
}
28+
29+
// locate the group with the task which outputs the desired file, and add the new task
30+
static int vine_task_groups_add_to_group(struct vine_manager *q, struct vine_task *t, struct vine_mount *m)
31+
{
32+
struct list *l;
33+
char *id;
34+
HASH_TABLE_ITERATE(q->task_group_table, id, l)
35+
{
36+
struct vine_file *f;
37+
LIST_ITERATE(l, f)
38+
{
39+
if (f == m->file) {
40+
struct vine_task *tc = vine_task_copy(t);
41+
list_push_tail(l, tc);
42+
return 1;
43+
}
44+
}
45+
}
46+
return 0;
47+
}
48+
49+
/*
50+
When a task comes in through vine_submit, look for temp files in its inputs/outputs
51+
If there is a temp file on the input there is already a task group it should be assigned to.
52+
If there is only a temp output it would be the first of a new group.
53+
*/
54+
int vine_task_groups_assign_task(struct vine_manager *q, struct vine_task *t)
55+
{
56+
struct vine_mount *input_mount;
57+
struct vine_mount *output_mount;
58+
59+
int inputs_present = 0;
60+
int outputs_present = 0;
61+
62+
LIST_ITERATE(t->input_mounts, input_mount)
63+
{
64+
if (input_mount->file->type == VINE_TEMP) {
65+
inputs_present++;
66+
break;
67+
}
68+
}
69+
70+
LIST_ITERATE(t->output_mounts, output_mount)
71+
{
72+
if (output_mount->file->type == VINE_TEMP) {
73+
outputs_present++;
74+
break;
75+
}
76+
}
77+
78+
// could also be inputs_present && outputs_present
79+
if (inputs_present) {
80+
vine_task_groups_add_to_group(q, t, input_mount);
81+
} else if (outputs_present) {
82+
vine_task_groups_create_group(q, t, output_mount);
83+
}
84+
85+
return inputs_present || outputs_present;
86+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
Copyright (C) 2022- The University of Notre Dame
3+
This software is distributed under the GNU General Public License.
4+
See the file COPYING for details.
5+
*/
6+
7+
#include "taskvine.h"
8+
#include "vine_manager.h"
9+
#include "uuid.h"
10+
11+
12+
int vine_task_groups_assign_task(struct vine_manager *q, struct vine_task *t);

0 commit comments

Comments
 (0)