Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 20 additions & 27 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -3140,7 +3140,7 @@ If a file can be fetched from a substitute source,
this function modifies the file->substitute field to reflect that source.
*/

static int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
struct vine_mount *m;

Expand Down Expand Up @@ -3321,55 +3321,41 @@ static int vine_manager_check_library_for_function_call(struct vine_manager *q,
}

/*
Consider if a task is eligible to run, and if so, find the best worker for it.
Consider if a task is eligible to run.
*/
static struct vine_worker_info *consider_task(struct vine_manager *q, struct vine_task *t)
int consider_task(struct vine_manager *q, struct vine_task *t)
{
timestamp_t now_usecs = timestamp_get();
double now_secs = ((double)now_usecs) / ONE_SECOND;

// Skip task if min requested start time not met.
if (t->resources_requested->start > now_secs) {
return NULL;
return 0;
}

// Skip if this task failed recently
if (t->time_when_last_failure + q->transient_error_interval > now_usecs) {
return NULL;
return 0;
}

// Skip if category already running maximum allowed tasks
struct category *c = vine_category_lookup_or_create(q, t->category);
if (c->max_concurrent > -1 && c->max_concurrent <= c->vine_stats->tasks_running) {
return NULL;
return 0;
}

// Skip task if temp input files have not been materialized.
if (!vine_manager_check_inputs_available(q, t)) {
return NULL;
return 0;
}

// Skip function call task if no suitable library template was installed
// Skip function call task if no suitable library template was installed.
if (!vine_manager_check_library_for_function_call(q, t)) {
return NULL;
}

// Find the best worker for the task
q->stats_measure->time_scheduling = timestamp_get();
struct vine_worker_info *w = vine_schedule_task_to_worker(q, t);
if (!w) {
return NULL;
}
q->stats->time_scheduling += timestamp_get() - q->stats_measure->time_scheduling;

// Check if there is transfer capacity available.
if (q->peer_transfers_enabled) {
if (!vine_manager_transfer_capacity_available(q, w, t))
return NULL;
return 0;
}

// All checks passed
return w;
// All checks passed, task is eligible to run.
return 1;
}

/*
Expand All @@ -3382,7 +3368,6 @@ static int send_one_task(struct vine_manager *q)
{
int t_idx;
struct vine_task *t;
struct vine_worker_info *w = NULL;

int iter_count = 0;
int iter_depth = MIN(priority_queue_size(q->ready_tasks), q->attempt_schedule_depth);
Expand All @@ -3403,7 +3388,15 @@ static int send_one_task(struct vine_manager *q)
// the priority queue data structure where also invokes priority_queue_rotate_reset.
PRIORITY_QUEUE_ROTATE_ITERATE(q->ready_tasks, t_idx, t, iter_count, iter_depth)
{
w = consider_task(q, t);
if (!consider_task(q, t)) {
continue;
}

// Find the best worker for the task
q->stats_measure->time_scheduling = timestamp_get();
struct vine_worker_info *w = vine_schedule_task_to_worker(q, t);
q->stats->time_scheduling += timestamp_get() - q->stats_measure->time_scheduling;

if (w) {
priority_queue_remove(q->ready_tasks, t_idx);
commit_task_to_worker(q, w, t);
Expand Down
3 changes: 3 additions & 0 deletions taskvine/src/manager/vine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ struct vine_task *vine_manager_no_wait(struct vine_manager *q, const char *tag,

void vine_manager_remove_worker(struct vine_manager *q, struct vine_worker_info *w, vine_worker_disconnect_reason_t reason);

/* Check if the worker is able to transfer the necessary files for this task. */
int vine_manager_transfer_capacity_available(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t);

/* The expected format of files created by the resource monitor.*/
#define RESOURCE_MONITOR_TASK_LOCAL_NAME "vine-task-%d"
#define RESOURCE_MONITOR_REMOTE_NAME "cctools-monitor"
Expand Down
5 changes: 5 additions & 0 deletions taskvine/src/manager/vine_schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ int check_worker_against_task(struct vine_manager *q, struct vine_worker_info *w
return 0;
}

/* If the worker has transfer capacity to get this task. */
if (q->peer_transfers_enabled && !vine_manager_transfer_capacity_available(q, w, t)) {
return 0;
}

/* If the worker doesn't have the features the task requires. */
if (t->feature_list) {
if (!w->features) {
Expand Down
Loading