@@ -24,6 +24,7 @@ See the file COPYING for details.
2424#include "vine_runtime_dir.h"
2525#include "vine_schedule.h"
2626#include "vine_task.h"
27+ #include "vine_task_groups.h"
2728#include "vine_task_info.h"
2829#include "vine_taskgraph_log.h"
2930#include "vine_txn_log.h"
@@ -2992,6 +2993,35 @@ static vine_result_code_t commit_task_to_worker(struct vine_manager *q, struct v
29922993 return result ;
29932994}
29942995
2996+ static vine_result_code_t commit_task_group_to_worker (struct vine_manager * q , struct vine_worker_info * w , struct vine_task * t )
2997+ {
2998+ vine_result_code_t result = VINE_SUCCESS ;
2999+
3000+ struct list * l = NULL ;
3001+ if (t -> group_id ) {
3002+ l = itable_lookup (q -> task_group_table , t -> group_id );
3003+ list_remove (l , t );
3004+ // decrement refcount
3005+ vine_task_delete (t );
3006+ }
3007+
3008+ int counter = 0 ;
3009+ do {
3010+
3011+ if (counter && (result == VINE_SUCCESS )) {
3012+ int t_idx = priority_queue_find_idx (q -> ready_tasks , t );
3013+ priority_queue_remove (q -> ready_tasks , t_idx );
3014+ // decrement refcount
3015+ vine_task_delete (t );
3016+ }
3017+ result = commit_task_to_worker (q , w , t );
3018+ counter ++ ;
3019+ } while ((l && (t = list_pop_head (l ))));
3020+
3021+ debug (D_VINE , "Sent batch of %d tasks to worker %s" , counter , w -> hostname );
3022+ return result ;
3023+ }
3024+
29953025/* 1 if task resubmitted, 0 otherwise */
29963026static int resubmit_task_on_exhaustion (struct vine_manager * q , struct vine_worker_info * w , struct vine_task * t )
29973027{
@@ -3300,6 +3330,9 @@ static void vine_manager_consider_recovery_task(struct vine_manager *q, struct v
33003330 if (!rt )
33013331 return ;
33023332
3333+ /* Do not try to group recovery tasks */
3334+ rt -> group_id = 0 ;
3335+
33033336 switch (rt -> state ) {
33043337 case VINE_TASK_INITIAL :
33053338 /* The recovery task has never been run, so submit it now. */
@@ -3441,7 +3474,26 @@ static int send_one_task(struct vine_manager *q)
34413474
34423475 if (w ) {
34433476 priority_queue_remove (q -> ready_tasks , t_idx );
3444- vine_result_code_t result = commit_task_to_worker (q , w , t );
3477+
3478+ // do not continue if this worker is running a group task
3479+ if (q -> task_groups_enabled ) {
3480+ struct vine_task * it ;
3481+ uint64_t taskid ;
3482+ ITABLE_ITERATE (w -> current_tasks , taskid , it )
3483+ {
3484+ if (it -> group_id ) {
3485+ return 0 ;
3486+ }
3487+ }
3488+ }
3489+
3490+ vine_result_code_t result ;
3491+ if (q -> task_groups_enabled ) {
3492+ result = commit_task_group_to_worker (q , w , t );
3493+ } else {
3494+ result = commit_task_to_worker (q , w , t );
3495+ }
3496+
34453497 switch (result ) {
34463498 case VINE_SUCCESS :
34473499 /* return on successful commit. */
@@ -3945,6 +3997,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
39453997
39463998 q -> factory_table = hash_table_create (0 , 0 );
39473999 q -> current_transfer_table = hash_table_create (0 , 0 );
4000+ q -> task_group_table = itable_create (0 );
4001+ q -> group_id_counter = 1 ;
39484002 q -> fetch_factory = 0 ;
39494003
39504004 q -> measured_local_resources = rmsummary_create (-1 );
@@ -4010,6 +4064,8 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
40104064 // peer transfers enabled by default
40114065 q -> peer_transfers_enabled = 1 ;
40124066
4067+ q -> task_groups_enabled = 0 ;
4068+
40134069 q -> load_from_shared_fs_enabled = 0 ;
40144070
40154071 q -> file_source_max_transfers = VINE_FILE_SOURCE_MAX_TRANSFERS ;
@@ -4287,6 +4343,9 @@ void vine_delete(struct vine_manager *q)
42874343 vine_current_transfers_clear (q );
42884344 hash_table_delete (q -> current_transfer_table );
42894345
4346+ vine_task_groups_clear (q );
4347+ itable_delete (q -> task_group_table );
4348+
42904349 itable_clear (q -> tasks , (void * )delete_task_at_exit );
42914350 itable_delete (q -> tasks );
42924351
@@ -4656,6 +4715,11 @@ int vine_submit(struct vine_manager *q, struct vine_task *t)
46564715 vine_task_set_scheduler (t , VINE_SCHEDULE_FILES );
46574716 }
46584717
4718+ /* Attempt to group this task based on temp dependencies. */
4719+ if (q -> task_groups_enabled ) {
4720+ vine_task_groups_assign_task (q , t );
4721+ }
4722+
46594723 /* If the task produces temporary files, create recovery tasks for those. */
46604724 vine_manager_create_recovery_tasks (q , t );
46614725
@@ -5472,6 +5536,15 @@ int vine_cancel_by_task_id(struct vine_manager *q, int task_id)
54725536 return 0 ;
54735537 }
54745538
5539+ if (task -> group_id ) {
5540+ struct list * l = itable_lookup (q -> task_group_table , task -> group_id );
5541+ if (l ) {
5542+ list_remove (l , task );
5543+ }
5544+ task -> group_id = 0 ;
5545+ vine_task_delete (task );
5546+ }
5547+
54755548 reset_task_to_state (q , task , VINE_TASK_RETRIEVED );
54765549
54775550 task -> result = VINE_RESULT_CANCELLED ;
@@ -5673,6 +5746,9 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
56735746 } else if (!strcmp (name , "update-interval" )) {
56745747 q -> update_interval = MAX (1 , (int )value );
56755748
5749+ } else if (!strcmp (name , "task-groups" )) {
5750+ q -> task_groups_enabled = MIN (1 , (int )value );
5751+
56765752 } else if (!strcmp (name , "resource-management-interval" )) {
56775753 q -> resource_management_interval = MAX (1 , (int )value );
56785754
0 commit comments