Skip to content

Commit ec2741e

Browse files
authored
vine: properly replicate temp files (#4004)
* print rep count * init * update * up * up * up * lint * up * limit the max number of rep attempts * update
1 parent baaf877 commit ec2741e

File tree

7 files changed

+124
-89
lines changed

7 files changed

+124
-89
lines changed

taskvine/src/bindings/python3/ndcctools/taskvine/manager.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1766,6 +1766,14 @@ def log_txn_app(self, entry):
17661766
def log_debug_app(self, entry):
17671767
cvine.vine_log_debug_app(self._taskvine, entry)
17681768

1769+
##
1770+
# Gets the number of replicas of a file.
1771+
#
1772+
# @param self The manager to register this file
1773+
# @param file The File object
1774+
def get_file_replica_count(self, file):
1775+
return cvine.vine_file_replica_count(self._taskvine, file._file)
1776+
17691777

17701778
##
17711779
# @class ndcctools.taskvine.manager.Factory

taskvine/src/manager/taskvine.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,13 @@ const char *vine_file_source(struct vine_file *f);
730730
*/
731731
vine_file_type_t vine_file_type(struct vine_file *f);
732732

733+
/** Get the number of replicas of a file.
734+
@param m A manager object
735+
@param f A file object.
736+
@return The number of replicas of the file.
737+
*/
738+
int vine_file_replica_count(struct vine_manager *m, struct vine_file *f);
739+
733740
/** Declare a file object from a local file
734741
@param m A manager object
735742
@param source The path of the file on the local filesystem

taskvine/src/manager/vine_current_transfers.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ See the file COPYING for details.
88
#include "uuid.h"
99

1010
#define VINE_FILE_SOURCE_MAX_TRANSFERS 1
11-
#define VINE_WORKER_SOURCE_MAX_TRANSFERS 3 // static 1 until if/when multiple transfer ports are opened up on worker transfer server
11+
#define VINE_WORKER_SOURCE_MAX_TRANSFERS 10
1212

1313
char *vine_current_transfers_add(struct vine_manager *q, struct vine_worker_info *to, struct vine_worker_info *source_worker, const char *source_url);
1414

@@ -24,7 +24,6 @@ int vine_current_transfers_url_in_use(struct vine_manager *q, const char *source
2424

2525
int vine_current_transfers_dest_in_use(struct vine_manager *q,struct vine_worker_info *w);
2626

27-
2827
int vine_current_transfers_wipe_worker(struct vine_manager *q, struct vine_worker_info *w);
2928

3029
void vine_current_transfers_print_table(struct vine_manager *q);

taskvine/src/manager/vine_file_replica_table.c

Lines changed: 44 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ struct vine_file_replica *vine_file_replica_table_lookup(struct vine_worker_info
7777
return hash_table_lookup(w->current_files, cachename);
7878
}
7979

80+
// count the number of in-cluster replicas of a file
81+
int vine_file_replica_count(struct vine_manager *m, struct vine_file *f)
82+
{
83+
return set_size(hash_table_lookup(m->file_worker_table, f->cached_name));
84+
}
85+
8086
// find a worker (randomly) in posession of a specific file, and is ready to transfer it.
8187
struct vine_worker_info *vine_file_replica_table_find_worker(struct vine_manager *q, const char *cachename)
8288
{
@@ -124,40 +130,21 @@ struct vine_worker_info *vine_file_replica_table_find_worker(struct vine_manager
124130
}
125131

126132
// trigger replications of file to satisfy temp_replica_count
127-
int vine_file_replica_table_replicate(struct vine_manager *m, struct vine_file *f)
133+
int vine_file_replica_table_replicate(struct vine_manager *m, struct vine_file *f, struct set *sources, int to_find)
128134
{
129-
/* the number of replicated copies in this round */
130-
int round_replication_count = 0;
131-
132-
if (vine_current_transfers_get_table_size(m) >= hash_table_size(m->worker_table) * m->worker_source_max_transfers) {
133-
return round_replication_count;
134-
}
135-
136-
struct set *sources = hash_table_lookup(m->file_worker_table, f->cached_name);
137-
if (!sources) {
138-
return round_replication_count;
139-
}
140-
141135
int nsources = set_size(sources);
142-
int to_find = MIN(m->temp_replica_count - nsources, m->transfer_replica_per_cycle);
143-
if (to_find < 1) {
144-
return round_replication_count;
145-
}
146-
147-
debug(D_VINE, "Found %d workers holding %s, %d replicas needed", nsources, f->cached_name, to_find);
136+
int round_replication_request_sent = 0;
148137

149138
/* get the elements of set so we can insert new replicas to sources */
150139
struct vine_worker_info **sources_frozen = (struct vine_worker_info **)set_values(sources);
151140
struct vine_worker_info *source;
152141

153-
int i = 0;
154-
for (source = sources_frozen[i]; i < nsources; i++) {
155-
if (round_replication_count >= to_find) {
156-
break;
157-
}
142+
for (int i = 0; i < nsources; i++) {
158143

159-
int found_per_source = 0;
144+
source = sources_frozen[i];
145+
int dest_found = 0;
160146

147+
// skip if the file on the source is not ready to transfer
161148
struct vine_file_replica *replica = hash_table_lookup(source->current_files, f->cached_name);
162149
if (!replica || replica->state != VINE_FILE_REPLICA_STATE_READY) {
163150
continue;
@@ -166,51 +153,60 @@ int vine_file_replica_table_replicate(struct vine_manager *m, struct vine_file *
166153
char *source_addr = string_format("%s/%s", source->transfer_url, f->cached_name);
167154
int source_in_use = vine_current_transfers_source_in_use(m, source);
168155

156+
// skip if the source is busy with other transfers
157+
if (source_in_use >= m->worker_source_max_transfers) {
158+
continue;
159+
}
160+
169161
char *id;
170-
struct vine_worker_info *peer;
162+
struct vine_worker_info *dest;
171163
int offset_bookkeep;
172-
HASH_TABLE_ITERATE_RANDOM_START(m->worker_table, offset_bookkeep, id, peer)
173-
{
174164

175-
if (found_per_source >= MIN(m->file_source_max_transfers, to_find)) {
176-
break;
177-
}
178-
179-
if (source_in_use >= m->worker_source_max_transfers) {
180-
break;
181-
}
182-
183-
if (!peer->transfer_port_active) {
165+
HASH_TABLE_ITERATE_RANDOM_START(m->worker_table, offset_bookkeep, id, dest)
166+
{
167+
// skip if the source and destination are on the same host
168+
if (set_lookup(sources, dest) || strcmp(source->hostname, dest->hostname) == 0) {
184169
continue;
185170
}
186171

187-
if (set_lookup(sources, peer)) {
172+
// skip if the destination is not ready to transfer
173+
if (!dest->transfer_port_active) {
188174
continue;
189175
}
190176

191-
if (vine_current_transfers_dest_in_use(m, peer) >= m->worker_source_max_transfers) {
177+
// skip if the destination is busy with other transfers
178+
if (vine_current_transfers_dest_in_use(m, dest) >= m->worker_source_max_transfers) {
192179
continue;
193180
}
194181

195-
if (strcmp(source->hostname, peer->hostname) == 0) {
196-
continue;
197-
}
182+
debug(D_VINE, "replicating %s from %s to %s", f->cached_name, source->addrport, dest->addrport);
198183

199-
debug(D_VINE, "replicating %s from %s to %s", f->cached_name, source->addrport, peer->addrport);
184+
vine_manager_put_url_now(m, dest, source_addr, f);
200185

201-
vine_manager_put_url_now(m, peer, source_addr, f);
186+
round_replication_request_sent++;
202187

203-
source_in_use++;
204-
found_per_source++;
205-
round_replication_count++;
188+
// break if we have found enough destinations for this source
189+
if (++dest_found >= MIN(m->file_source_max_transfers, to_find)) {
190+
break;
191+
}
192+
193+
// break if the source becomes busy with transfers
194+
if (++source_in_use >= m->worker_source_max_transfers) {
195+
break;
196+
}
206197
}
207198

208199
free(source_addr);
200+
201+
// break if we have sent enough replication requests for this file
202+
if (round_replication_request_sent >= to_find) {
203+
break;
204+
}
209205
}
210206

211207
free(sources_frozen);
212208

213-
return round_replication_count;
209+
return round_replication_request_sent;
214210
}
215211

216212
/*

taskvine/src/manager/vine_file_replica_table.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ See the file COPYING for details.
1414
#define VINE_FILE_REPLICA_TABLE_H
1515

1616
#include "taskvine.h"
17+
#include "set.h"
1718
#include "vine_file_replica.h"
1819
#include "vine_worker_info.h"
1920

@@ -27,7 +28,7 @@ struct vine_file_replica *vine_file_replica_table_lookup(struct vine_worker_info
2728

2829
struct vine_worker_info *vine_file_replica_table_find_worker(struct vine_manager *q, const char *cachename);
2930

30-
int vine_file_replica_table_replicate(struct vine_manager *q, struct vine_file *f);
31+
int vine_file_replica_table_replicate(struct vine_manager *q, struct vine_file *f, struct set *sources, int to_find);
3132

3233
int vine_file_replica_table_exists_somewhere( struct vine_manager *q, const char *cachename );
3334

taskvine/src/manager/vine_manager.c

Lines changed: 61 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -934,12 +934,15 @@ static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w)
934934
}
935935

936936
/* Start replicating files that may need replication */
937-
938-
static int recover_temp_files(struct vine_manager *q)
937+
static int consider_tempfile_replications(struct vine_manager *q)
939938
{
939+
if (hash_table_size(q->temp_files_to_replicate) <= 0) {
940+
return 0;
941+
}
942+
940943
char *cached_name = NULL;
941944
void *empty_val = NULL;
942-
int total_replication_count = 0;
945+
int total_replication_request_sent = 0;
943946

944947
static char key_start[PATH_MAX] = "random init";
945948
int iter_control;
@@ -949,32 +952,55 @@ static int recover_temp_files(struct vine_manager *q)
949952
{
950953
struct vine_file *f = hash_table_lookup(q->file_table, cached_name);
951954

952-
if (f) {
953-
int round_replication_count = vine_file_replica_table_replicate(q, f);
954-
955-
/* Worker busy or no replicas found */
956-
if (round_replication_count < 1) {
957-
/*
958-
If no replicas are found, it indicates that the file doesn't exist, either pruned or lost.
959-
Because a pruned file is removed from the recovery queue, so it definitely indicates that the file is lost.
960-
*/
961-
if (!vine_file_replica_table_exists_somewhere(q, f->cached_name) && q->transfer_temps_recovery) {
962-
vine_manager_consider_recovery_task(q, f, f->recovery_task);
963-
}
964-
hash_table_remove(q->temp_files_to_replicate, cached_name);
965-
} else {
966-
if (iter_count_var > q->attempt_schedule_depth) {
967-
strncpy(key_start, cached_name, PATH_MAX - 1);
968-
key_start[PATH_MAX - 1] = '\0';
969-
break;
970-
}
955+
if (!f) {
956+
continue;
957+
}
958+
959+
/* are there any available sources? */
960+
struct set *sources = hash_table_lookup(q->file_worker_table, f->cached_name);
961+
if (!sources) {
962+
/* If no sources found, it indicates that the file doesn't exist, either pruned or lost.
963+
Because a pruned file is removed from the recovery queue, so it definitely indicates that the file is lost. */
964+
if (q->transfer_temps_recovery) {
965+
vine_manager_consider_recovery_task(q, f, f->recovery_task);
966+
}
967+
hash_table_remove(q->temp_files_to_replicate, f->cached_name);
968+
continue;
969+
}
970+
971+
/* at least one source is able to transfer? */
972+
int has_valid_source = 0;
973+
struct vine_worker_info *s;
974+
SET_ITERATE(sources, s)
975+
{
976+
if (s->transfer_port_active && vine_current_transfers_source_in_use(q, s) < q->worker_source_max_transfers) {
977+
has_valid_source = 1;
978+
break;
971979
}
980+
}
981+
if (!has_valid_source) {
982+
continue;
983+
}
984+
985+
/* has this file been fully replicated? */
986+
int nsources = set_size(sources);
987+
int to_find = MIN(q->temp_replica_count - nsources, q->transfer_replica_per_cycle);
988+
if (to_find <= 0) {
989+
hash_table_remove(q->temp_files_to_replicate, f->cached_name);
990+
continue;
991+
}
992+
993+
debug(D_VINE, "Found %d workers holding %s, %d replicas needed", nsources, f->cached_name, to_find);
994+
995+
int round_replication_request_sent = vine_file_replica_table_replicate(q, f, sources, to_find);
996+
total_replication_request_sent += round_replication_request_sent;
972997

973-
total_replication_count += round_replication_count;
998+
if (total_replication_request_sent >= q->attempt_schedule_depth) {
999+
break;
9741000
}
9751001
}
9761002

977-
return total_replication_count;
1003+
return total_replication_request_sent;
9781004
}
9791005

9801006
/* Insert into hashtable temp files that may need replication. */
@@ -5118,6 +5144,16 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
51185144
}
51195145
}
51205146

5147+
// Check if any temp files need replication and start replicating
5148+
BEGIN_ACCUM_TIME(q, time_internal);
5149+
result = consider_tempfile_replications(q);
5150+
END_ACCUM_TIME(q, time_internal);
5151+
if (result) {
5152+
// recovered at least one temp file
5153+
events++;
5154+
continue;
5155+
}
5156+
51215157
// send keepalives to appropriate workers
51225158
BEGIN_ACCUM_TIME(q, time_status_msgs);
51235159
ask_for_workers_updates(q);
@@ -5142,16 +5178,6 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
51425178
continue;
51435179
}
51445180

5145-
// Check if any temp files need replication and start replicating
5146-
BEGIN_ACCUM_TIME(q, time_internal);
5147-
result = recover_temp_files(q);
5148-
END_ACCUM_TIME(q, time_internal);
5149-
if (result) {
5150-
// recovered at least one temp file
5151-
events++;
5152-
continue;
5153-
}
5154-
51555181
if (q->process_pending_check) {
51565182
BEGIN_ACCUM_TIME(q, time_internal);
51575183
int pending = process_pending();
@@ -6106,9 +6132,7 @@ void vine_prune_file(struct vine_manager *m, struct vine_file *f)
61066132
}
61076133
}
61086134

6109-
/*
6110-
Pruned files do not need to be scheduled for replication anymore.
6111-
*/
6135+
/* Also remove from the replication table. */
61126136
hash_table_remove(m->temp_files_to_replicate, f->cached_name);
61136137
}
61146138

taskvine/src/worker/vine_worker_options.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ struct vine_worker_options *vine_worker_options_create()
5656
self->transfer_port_min = 0;
5757
self->transfer_port_max = 0;
5858

59-
self->max_transfer_procs = 5;
59+
self->max_transfer_procs = 10;
6060

6161
self->reported_transfer_host = 0;
6262

0 commit comments

Comments
 (0)