1-
21from . import cvine
32import hashlib
4- from collections import deque
5- from concurrent .futures import Executor
6- from concurrent . futures import Future
7- from concurrent . futures import FIRST_COMPLETED
8- from concurrent . futures import FIRST_EXCEPTION
9- from concurrent . futures import ALL_COMPLETED
10- from concurrent . futures . _base import PENDING
11- from concurrent . futures . _base import CANCELLED
12- from concurrent .futures ._base import FINISHED
3+ from collections import deque , namedtuple
4+ from concurrent .futures import (
5+ Executor ,
6+ Future ,
7+ FIRST_COMPLETED ,
8+ FIRST_EXCEPTION ,
9+ ALL_COMPLETED ,
10+ )
11+ from concurrent .futures ._base import PENDING , CANCELLED , FINISHED
1312from concurrent .futures import TimeoutError
14- from collections import namedtuple
13+
1514from .task import (
1615 PythonTask ,
1716 FunctionCall ,
1817 FunctionCallNoResult ,
1918)
19+
2020from .manager import (
2121 Factory ,
2222 Manager ,
2323)
2424
25+ import math
2526import os
2627import time
2728import textwrap
29+ from functools import partial
30+ from collections .abc import Sequence
2831
2932RESULT_PENDING = 'result_pending'
3033
@@ -109,7 +112,7 @@ def as_completed(fs, timeout=None):
109112 f .module_manager .submit (f ._task )
110113
111114 start = time .perf_counter ()
112- result_timeout = min (timeout , 5 ) if timeout is not None else 5
115+ result_timeout = max ( 1 , min (timeout , 5 )) if timeout else 5
113116
114117 def _iterator ():
115118 # iterate of queue of futures, yeilding completed futures and
@@ -133,22 +136,39 @@ def _iterator():
133136 assert result != RESULT_PENDING
134137 yield f
135138
136- if (
137- fs and timeout is not None
138- and time .perf_counter () - start > timeout
139- ):
139+ if fs and timeout and time .perf_counter () - start > timeout :
140140 raise TimeoutError ()
141141
142142 return _iterator ()
143143
144144
145+ def run_iterable (fn , * args ):
146+ return list (map (fn , args ))
147+
148+
149+ def reduction_tree (fn , * args , n = 2 ):
150+ # n is the arity of the reduction function fn
151+ # if less than 2, we have an infinite loop
152+ assert n > 1
153+ entries = [f .result () if isinstance (f , VineFuture ) else f for f in args ]
154+ if len (entries ) < 2 :
155+ return entries [0 ]
156+
157+ len_multiple = int (math .ceil (len (entries ) / n ) * n )
158+ new_args = map (fn , [entries [i :i + n ] for i in range (0 , len_multiple , n )])
159+
160+ return reduction_tree (fn , * new_args , n = n )
161+
145162##
146163# \class FuturesExecutor
147164#
148165# TaskVine FuturesExecutor object
149166#
150167# This class acts as an interface for the creation of Futures
168+
169+
151170class FuturesExecutor (Executor ):
171+
152172 def __init__ (self , port = 9123 , batch_type = "local" , manager = None , manager_host_port = None , manager_name = None , factory_binary = None , worker_binary = None , log_file = os .devnull , factory = True , opts = {}):
153173 self .manager = Manager (port = port )
154174 self .port = self .manager .port
@@ -173,6 +193,100 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por
173193 else :
174194 self .factory = None
175195
196+ def map (self , fn , iterable , library_name = None , chunk_size = 1 ):
197+ assert chunk_size > 0
198+ assert isinstance (iterable , Sequence )
199+
200+ def wait_for_map_resolution (* futures_batch ):
201+ result = []
202+ for f in futures_batch :
203+ result .extend (f .result () if isinstance (f , VineFuture ) else f )
204+ return result
205+
206+ tasks = []
207+ fn_wrapped = partial (run_iterable , fn )
208+ while iterable :
209+ heads , iterable = iterable [:chunk_size ], iterable [chunk_size :]
210+
211+ if library_name :
212+ raise NotImplementedError ("Using a library not currently supported." )
213+ future_batch_task = self .submit (self .future_funcall (library_name , fn_wrapped , * heads ))
214+ else :
215+ future_batch_task = self .submit (self .future_task (fn_wrapped , * heads ))
216+
217+ tasks .append (future_batch_task )
218+
219+ return self .submit (self .future_task (wait_for_map_resolution , * tasks ))
220+
221+ # Reduce performs a reduction tree on the iterable and currently returns a single value
222+ #
223+ # parameters:
224+ # - Function
225+ # - a function that receives fn_arity arguments
226+ # - A sequence of parameters that function will take
227+ # - a chunk_size to group elements in sequence to dispatch to a single task
228+ # - arity of the function, elements of a chunk are reduce arity-wise.
229+ # - an optional library_name for a library function call
230+ def reduce (self , fn , iterable , library_name = None , chunk_size = 2 , fn_arity = 2 ):
231+ assert chunk_size > 1
232+ assert fn_arity > 1
233+ assert isinstance (iterable , Sequence )
234+ chunk_size = max (fn_arity , chunk_size )
235+
236+ new_iterable = []
237+ while iterable :
238+ heads , iterable = iterable [:chunk_size ], iterable [chunk_size :]
239+ heads = [f .result () if isinstance (f , VineFuture ) else f for f in heads ]
240+ if library_name :
241+ raise NotImplementedError ("Using a library not currently supported." )
242+ future_batch_task = self .submit (
243+ self .future_funcall (
244+ library_name , reduction_tree , fn , * heads , n = fn_arity
245+ )
246+ )
247+ else :
248+ future_batch_task = self .submit (self .future_task (reduction_tree , fn , * heads , n = fn_arity ))
249+
250+ new_iterable .append (future_batch_task )
251+
252+ if len (new_iterable ) > 1 :
253+ return self .reduce (fn , new_iterable , library_name , chunk_size , fn_arity )
254+ else :
255+ return new_iterable [0 ]
256+
257+ def allpairs (self , fn , iterable_rows , iterable_cols , library_name = None , chunk_size = 1 ):
258+ assert chunk_size > 0
259+ assert isinstance (iterable_rows , Sequence )
260+ assert isinstance (iterable_cols , Sequence )
261+
262+ def wait_for_allpairs_resolution (row_size , col_size , mapped ):
263+ result = []
264+ for _ in range (row_size ):
265+ result .append ([0 ] * col_size )
266+
267+ mapped = mapped .result () if isinstance (mapped , VineFuture ) else mapped
268+ for p in mapped :
269+ (i , j , r ) = p .result () if isinstance (p , VineFuture ) else p
270+ result [i ][j ] = r
271+
272+ return result
273+
274+ def wrap_idx (args ):
275+ i , j , a , b = args
276+ return (i , j , fn (a , b ))
277+
278+ iterable = [(i , j , a , b ) for (i , a ) in enumerate (iterable_rows ) for (j , b ) in enumerate (iterable_cols )]
279+ mapped = self .map (wrap_idx , iterable , library_name , chunk_size )
280+
281+ return self .submit (
282+ self .future_task (
283+ wait_for_allpairs_resolution ,
284+ len (iterable_rows ),
285+ len (iterable_cols ),
286+ mapped ,
287+ )
288+ )
289+
176290 def submit (self , fn , * args , ** kwargs ):
177291 if isinstance (fn , (FuturePythonTask , FutureFunctionCall )):
178292 self .manager .submit (fn )
@@ -240,15 +354,15 @@ def cancelled(self):
240354 return False
241355
242356 def running (self ):
243- state = self ._task .state
244- if state == "RUNNING" :
357+ state = self ._task ._module_manager . task_state ( self . _task . id )
358+ if state == cvine . VINE_TASK_RUNNING :
245359 return True
246360 else :
247361 return False
248362
249363 def done (self ):
250- state = self ._task .state
251- if state == "DONE" or state == "RETRIEVED" :
364+ state = self ._task ._module_manager . task_state ( self . _task . id )
365+ if state == cvine . VINE_TASK_DONE :
252366 return True
253367 else :
254368 return False
@@ -301,7 +415,6 @@ def __init__(self, manager, library_name, fn, *args, **kwargs):
301415 self .manager = manager
302416 self .library_name = library_name
303417 self ._envs = []
304-
305418 self ._future = VineFuture (self )
306419 self ._has_retrieved = False
307420
@@ -326,7 +439,6 @@ def output(self, timeout="wait_forever"):
326439 self ._saved_output = output ['Result' ]
327440 else :
328441 self ._saved_output = FunctionCallNoResult (output ['Reason' ])
329-
330442 except Exception as e :
331443 self ._saved_output = e
332444 else :
@@ -400,6 +512,7 @@ def output(self, timeout="wait_forever"):
400512 # task or the exception object of a failed task.
401513 self ._output = cloudpickle .loads (self ._output_file .contents ())
402514 except Exception as e :
515+ print (self ._output_file .contents ())
403516 # handle output file fetch/deserialization failures
404517 self ._output = e
405518 self ._output_loaded = True
0 commit comments