Implementation of taskvine allpairs/map/reduce#4011
Implementation of taskvine allpairs/map/reduce#4011btovar merged 13 commits intocooperative-computing-lab:masterfrom
Conversation
|
Ready for review! |
|
@BarrySlyDelgado and @tphung3 if you have any concerns that can be addressed by a quick fix, please speak up now. Also keep in mind that this is a first version that can be evolved... |
|
Just a note that FunctionCalls in the latest master branch should run with dynamically executed functions. See TR_vine_python_serverless.sh and Otherwise no big issues on my end. |
| total = executor.reduce(sum, nums, fn_arity=11, chunk_size=13).result() | ||
| assert total == sum(nums) | ||
|
|
||
|
|
There was a problem hiding this comment.
Can we add tests for libraries/function calls here as well?
There was a problem hiding this comment.
It seems that library calls don't work with futures right now?
There was a problem hiding this comment.
Mmm, no, the test for funcall future calls works.
There was a problem hiding this comment.
Ok, the issue is that the auxiliary functions to run chunks will not be in the library constructed by the user. I guess we can always add them to all libraries that are created?
There was a problem hiding this comment.
@tphung3 libraries cannot be used right now. The issue is that we cannot compose functions in a library and these high order functions would need to construct the event structure to pass the correct arguments for remote execution.
We could make it work for this case, but I think we would be better if we come up with something more general.
There was a problem hiding this comment.
I'm good with that as long as we have a safeguard of not using functions atm. I see that the code raises error if library is used so that should be safe enough for users.
5151e2a to
190a6fa
Compare
* Implementation of taskvine allpairs/map/reduce * lint * lint v2 * cleanup code * cleanup reduce * add test * remove debug print * cleanup map * format * allpairs in terms of map * format * do not create lib in map * error on lib name --------- Co-authored-by: Kevin Xue <[email protected]> Co-authored-by: Benjamin Tovar <[email protected]>
Proposed Changes
This PR implements a basic implementation of map, reduce, and allpairs for the TaskVine Futures Executor that also supports batching. It currently works with PythonTasks and for small workloads, with additional errors for other cases mentioned below. There is a basic test below. Please note a couple of errors encountered during development. 1. It seems that Futures does not interface well with Condor, so please test locally. This issue was brought up in #3989. 2. Futures executor seems to fail with consecutive sequential workloads, which seems like it may be a downstream issue as a result of the implementation of futures itself. See #3988. 3. Upon talking to @dthain, there is also well-known issue that @tphung3 is tangentially working on which is that dynamically declared functions which are passed into taskvine functions such as executor.map may fail due to the functions not being serializable. This is problematic for this executor in attempting to implement FunctionCalls due to the need for a function to run our function on a list of iterables, which is necessary in an executor batching format. This prevents a FunctionCall implementation from being possible in the current version. The code that runs when method="FutureFunctionCall" is not operable but in the process of implementation. Regardless, this implements a version that works nicely with the default, FuturePythonTasks.
An overview of the functions
map:
executor.map(fn, iterable, library_name="Some_Library", method=None, chunk_size=1)
fn - a function to be executed
iterable - an iterable where each element contains the parameters of fn
library_name - a library name passed in by the user for FunctionCalls (not working)
method - "FutureFunctionCall" for FunctionCalls, otherwise PythonTask
chunk_size - A chunk size to execute the functions on
Returns: a future which returns the iterable when calling future.result()
reduce
executor.reduce(fn, iterable, library_name="Some_Library", method=None, chunk_size=1)
fn - a function to be executed
iterable - an iterable where each element contains the parameters of fn
library_name - a library name passed in by the user for FunctionCalls (not working)
method - "FutureFunctionCall" for FunctionCalls, otherwise PythonTask
chunk_size - A chunk size to execute the functions on
Returns: a future which returns the reduced value when calling future.result()
allpairs:
executor.allpairs(fn, iterable_a, iterable_b, library_name=None, method=None, chunk_size=1):
fn - a function to be executed
iterable_a - an iterable where each element contains the first parameters
iterable_b - an iterable where each element contains the second parameters
library_name - a library name passed in by the user for FunctionCalls (not working)
method - "FutureFunctionCall" for FunctionCalls, otherwise PythonTask
chunk_size - A chunk size to execute the functions on
Returns: a list of lists which returns essentially the cartesian product (combination) when fn is applied
Run a basic test with the following code
Merge Checklist
The following items must be completed before PRs can be merged.
Check these off to verify you have completed all steps.
make testRun local tests prior to pushing.make formatFormat source code to comply with lint policies. Note that some lint errors can only be resolved manually (e.g., Python)make lintRun lint on source code prior to pushing.