Skip to content

Commit 35d3e30

Browse files
add worker code
1 parent 1ca5af3 commit 35d3e30

File tree

5 files changed

+67
-61
lines changed

5 files changed

+67
-61
lines changed

taskvine/src/manager/vine_manager.c

Lines changed: 40 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2724,41 +2724,6 @@ static vine_result_code_t start_one_task(struct vine_manager *q, struct vine_wor
27242724
return result;
27252725
}
27262726

2727-
/*
2728-
Start one task on a given worker by specializing the task to the worker,
2729-
sending the appropriate input files, and then sending the details of the task.
2730-
Note that the "infile" and "outfile" components of the task refer to
2731-
files that have already been uploaded into the worker's cache by the manager.
2732-
*/
2733-
2734-
static vine_result_code_t start_group_task(
2735-
struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t, struct list *l)
2736-
{
2737-
vine_result_code_t result = 0;
2738-
struct vine_task *lt;
2739-
LIST_ITERATE(l, lt)
2740-
{
2741-
struct rmsummary *limits = vine_manager_choose_resources_for_task(q, w, t);
2742-
char *command_line;
2743-
2744-
if (q->monitor_mode && !t->needs_library) {
2745-
command_line = vine_monitor_wrap(q, w, t, limits);
2746-
} else {
2747-
command_line = xxstrdup(t->command_line);
2748-
}
2749-
result = vine_manager_put_task(q, w, lt, command_line, limits, 0);
2750-
free(command_line);
2751-
if (result == VINE_SUCCESS) {
2752-
t->current_resource_box = limits;
2753-
rmsummary_merge_override_basic(t->resources_allocated, limits);
2754-
debug(D_VINE, "%s (%s) busy on group '%s'", w->hostname, w->addrport, t->command_line);
2755-
} else {
2756-
rmsummary_delete(limits);
2757-
}
2758-
}
2759-
return result;
2760-
}
2761-
27622727
static void count_worker_resources(struct vine_manager *q, struct vine_worker_info *w)
27632728
{
27642729
w->resources->cores.inuse = 0;
@@ -2865,6 +2830,7 @@ assignment and the new task state.
28652830

28662831
static vine_result_code_t commit_task_to_worker(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
28672832
{
2833+
<<<<<<< HEAD
28682834
vine_result_code_t result = VINE_SUCCESS;
28692835

28702836
/* Kill unused libraries on this worker to reclaim resources. */
@@ -2894,31 +2860,52 @@ static vine_result_code_t commit_task_to_worker(struct vine_manager *q, struct v
28942860

28952861
t->time_when_commit_start = timestamp_get();
28962862
vine_result_code_t result;
2863+
=======
2864+
vine_result_code_t result = 0;
2865+
>>>>>>> 5ab8af678 (add worker code)
28972866
struct list *l = 0;
2898-
if (t->group_id) {
2899-
l = hash_table_lookup(q->task_group_table, t->group_id);
2900-
}
2901-
if (l && list_size(l) > 1) {
2902-
result = start_group_task(q, w, t, l);
2903-
} else {
2867+
l = hash_table_lookup(q->task_group_table, t->group_id);
2868+
int counter = 0;
2869+
do {
2870+
/* Kill empty libraries to reclaim resources. Match the assumption of
2871+
* @vine.schedule.c:check_worker_have_enough_resources() */
2872+
kill_empty_libraries_on_worker(q, w, t);
2873+
t->hostname = xxstrdup(w->hostname);
2874+
t->addrport = xxstrdup(w->addrport);
2875+
2876+
t->time_when_commit_start = timestamp_get();
29042877
result = start_one_task(q, w, t);
2905-
}
2906-
t->time_when_commit_end = timestamp_get();
2878+
t->time_when_commit_end = timestamp_get();
2879+
2880+
itable_insert(w->current_tasks, t->task_id, t);
2881+
t->worker = w;
2882+
2883+
/* Increment the function count if this is a function task.
2884+
* If the manager fails to send this function task to the worker however,
2885+
* then the count will be decremented properly in @handle_failure() below. */
2886+
if (t->needs_library) {
2887+
t->library_task = find_library_on_worker_for_task(w, t->needs_library);
2888+
t->library_task->function_slots_inuse++;
2889+
vine_txn_log_write_library_update(q, w, t->task_id, VINE_LIBRARY_SENT);
2890+
}
29072891

2908-
itable_insert(w->current_tasks, t->task_id, t);
2909-
t->worker = w;
2892+
change_task_state(q, t, VINE_TASK_RUNNING);
29102893

2911-
change_task_state(q, t, VINE_TASK_RUNNING);
2894+
t->try_count += 1;
2895+
q->stats->tasks_dispatched += 1;
29122896

2913-
t->try_count += 1;
2914-
q->stats->tasks_dispatched += 1;
2897+
count_worker_resources(q, w);
29152898

2916-
count_worker_resources(q, w);
2899+
if (result != VINE_SUCCESS) {
2900+
debug(D_VINE, "Failed to send task %d to worker %s (%s).", t->task_id, w->hostname, w->addrport);
2901+
handle_failure(q, w, t, result);
2902+
}
29172903

2918-
if (result != VINE_SUCCESS) {
2919-
debug(D_VINE, "Failed to send task %d to worker %s (%s).", t->task_id, w->hostname, w->addrport);
2920-
handle_failure(q, w, t, result);
2921-
}
2904+
counter++;
2905+
2906+
} while((t = list_next_item(l)));
2907+
2908+
debug(D_VINE, "Sent batch of %d tasks to worker %s", counter, w->hostname);
29222909

29232910
return result;
29242911
}

taskvine/src/manager/vine_manager_put.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,10 @@ vine_result_code_t vine_manager_put_task(
560560
}
561561
}
562562

563+
if(t->group_id) {
564+
vine_manager_send(q, w, "groupid %s\n", t->group_id);
565+
}
566+
563567
// vine_manager_send returns the number of bytes sent, or a number less than
564568
// zero to indicate errors. We are lazy here, we only check the last
565569
// message we sent to the worker (other messages may have failed above).

taskvine/src/manager/vine_task_groups.c

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ static int vine_task_groups_create_group(struct vine_manager *q, struct vine_tas
1919

2020
t->group_id = id;
2121

22-
struct vine_task *tc = vine_task_copy(t);
22+
struct vine_task *tc = vine_task_clone(t);
2323

2424
list_push_head(l, tc);
2525
hash_table_insert(q->task_group_table, id, l);
@@ -33,13 +33,18 @@ static int vine_task_groups_add_to_group(struct vine_manager *q, struct vine_tas
3333
char *id;
3434
HASH_TABLE_ITERATE(q->task_group_table, id, l)
3535
{
36-
struct vine_file *f;
37-
LIST_ITERATE(l, f)
36+
struct vine_task *lt;
37+
LIST_ITERATE(l, lt)
3838
{
39-
if (f == m->file) {
40-
struct vine_task *tc = vine_task_copy(t);
41-
list_push_tail(l, tc);
42-
return 1;
39+
struct vine_mount *lm;
40+
LIST_ITERATE(lt->output_mounts, lm)
41+
{
42+
if (m->file == lm->file) {
43+
t->group_id = lt->group_id;
44+
struct vine_task *tc = vine_task_clone(t);
45+
list_push_tail(l, tc);
46+
return 1;
47+
}
4348
}
4449
}
4550
}
@@ -78,8 +83,10 @@ int vine_task_groups_assign_task(struct vine_manager *q, struct vine_task *t)
7883
// could also be inputs_present && outputs_present
7984
if (inputs_present) {
8085
vine_task_groups_add_to_group(q, t, input_mount);
86+
debug(D_VINE, "Assigned task to group %s", t->group_id);
8187
} else if (outputs_present) {
8288
vine_task_groups_create_group(q, t, output_mount);
89+
debug(D_VINE, "Create task with group %s", t->group_id);
8390
}
8491

8592
return inputs_present || outputs_present;

taskvine/src/worker/vine_sandbox.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ vine_cache_status_t vine_sandbox_ensure(struct vine_process *p, struct vine_cach
4343
LIST_ITERATE(p->task->input_mounts, m)
4444
{
4545
vine_cache_status_t cache_status = vine_cache_ensure(cache, m->file->cached_name);
46-
46+
4747
switch (cache_status) {
4848
case VINE_CACHE_STATUS_PENDING:
4949
case VINE_CACHE_STATUS_PROCESSING:
@@ -54,6 +54,11 @@ vine_cache_status_t vine_sandbox_ensure(struct vine_process *p, struct vine_cach
5454
break;
5555
case VINE_CACHE_STATUS_UNKNOWN:
5656
case VINE_CACHE_STATUS_FAILED:
57+
if(p->task->group_id)
58+
{
59+
processing++;
60+
break;
61+
}
5762
return VINE_CACHE_STATUS_FAILED;
5863
}
5964
}

taskvine/src/worker/vine_worker.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,6 +834,7 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t
834834
char taskname_encoded[VINE_LINE_MAX];
835835
char library_name[VINE_LINE_MAX];
836836
char category[VINE_LINE_MAX];
837+
char groupid[VINE_LINE_MAX];
837838
int flags, length;
838839
int64_t n;
839840

@@ -879,6 +880,8 @@ static struct vine_task *do_task_body(struct link *manager, int task_id, time_t
879880
vine_task_set_disk(task, n);
880881
} else if (sscanf(line, "gpus %" PRId64, &n)) {
881882
vine_task_set_gpus(task, n);
883+
} else if (sscanf(line, "groupid %s", groupid)) {
884+
task->group_id = xxstrdup(groupid);
882885
} else if (sscanf(line, "wall_time %" PRIu64, &nt)) {
883886
vine_task_set_time_max(task, nt);
884887
} else if (sscanf(line, "end_time %" PRIu64, &nt)) {

0 commit comments

Comments
 (0)