From 658c21205b4ed3d7a3e38bc4c1b48ca133587359 Mon Sep 17 00:00:00 2001 From: Kevin Xue Date: Wed, 18 Dec 2024 17:35:16 -0500 Subject: [PATCH 01/13] Implementation of taskvine allpairs/map/reduce --- .../python3/ndcctools/taskvine/futures.py | 144 ++++++++++++++++-- 1 file changed, 133 insertions(+), 11 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index b74cc1a3c0..4ef535c91f 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -1,4 +1,3 @@ - from . import cvine import hashlib from collections import deque @@ -11,7 +10,7 @@ from concurrent.futures._base import CANCELLED from concurrent.futures._base import FINISHED from concurrent.futures import TimeoutError -from collections import namedtuple +from collections import namedtuple, deque from .task import ( PythonTask, FunctionCall, @@ -25,6 +24,8 @@ import os import time import textwrap +import inspect +from functools import partial RESULT_PENDING = 'result_pending' @@ -141,13 +142,48 @@ def _iterator(): return _iterator() - +def run_iterable(fn, iterable, dimensions=1): + if not ((hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str)): + return fn(element) + if dimensions < 1: + return None + result = [] + if dimensions == 1: + for element in iterable: + if (hasattr(element, '__iter__') or hasattr(element, '__getitem__')) and not isinstance(element, str): + result.append(fn(*element)) + else: + result.append(fn(element)) + else: + for inner_iterable in iterable: + result.append(run_iterable(fn, inner_iterable, dimensions-1)) + return result + +def reduction_tree(fn, *args): + minimum_parameters = len(inspect.signature(fn).parameters) + curr_size = len(args) + entries = deque([f.result() if isinstance(f, VineFuture) else f for f in args]) + return_val = entries + while curr_size >= minimum_parameters: + parameters = [] + for _ in range(minimum_parameters): + parameters.append(entries.popleft()) + new_result = fn(*parameters) + if (hasattr(new_result, '__getitem__') or hasattr(new_result, '__iter__')) and not isinstance(new_result, str): + for result in new_result: + entries.appendleft(result) + else: + entries.appendleft(new_result) + curr_size = len(entries) + return_val = entries if len(entries) > 1 else entries[0] + return entries[0] ## # \class FuturesExecutor # # TaskVine FuturesExecutor object # # This class acts as an interface for the creation of Futures + class FuturesExecutor(Executor): def __init__(self, port=9123, batch_type="local", manager=None, manager_host_port=None, manager_name=None, factory_binary=None, worker_binary=None, log_file=os.devnull, factory=True, opts={}): self.manager = Manager(port=port) @@ -171,8 +207,94 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por self.set(opt, opts[opt]) self.factory.start() else: - self.factory = None + self.factory = None + + def map(self, fn, iterable, library_name="Some_Library", method=None, chunk_size=1): + def wait_for_map_resolution(*futures_batch): + result = [] + for computed_result in futures_batch: + result.extend(computed_result) + return result + if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str): + tasks = [] + if method == "FutureFunctionCall": # this currently does not work (error described in PR) + partial_obj = partial(run_iterable, fn) + def partial_func(*args, **kwargs): + return partial_obj(*args, **kwargs) + libtask = self.create_library_from_functions(library_name, partial_func) + self.install_library(libtask) + for i in range(0, len(iterable), chunk_size): + future_batch_task = self.submit(self.future_funcall(library_name, partial_func, iterable[i:i+chunk_size])) + tasks.append(future_batch_task) + else: + for i in range(0, len(iterable), chunk_size): + future_batch_task = self.submit(run_iterable, fn, iterable[i:i+chunk_size]) + tasks.append(future_batch_task) + future = self.submit(wait_for_map_resolution, *tasks) + else: + raise Exception('Error: Map function takes in an iterable') + return future + # Reduce performs a reduction tree on the iterable and currently returns a single value + # + # parameters: + # - Function + # - Iterable of parameters that function will take + # - a library_name for a library function call + # - a method + # - a chunk_size, which is the number of iterations to complete in one task. if c is chunk_size, a single task will reduce c(n-1) + 1 nodes to 1 node + + def reduce(self, fn, iterable, library_name=None, method=None, chunk_size=1): + # This line is just the identity - since when a future is pickled, it actually becomes some file, which means it is evaluated immediately/sent to queue + if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str): + sub_futures = [iterable] + num_parameters = len(inspect.signature(fn).parameters) + reduction_size = chunk_size*(num_parameters-1) + while len(sub_futures[-1]) > 1 or len(sub_futures) == 1: + layer = [] + for i in range(0, len(sub_futures[-1]), reduction_size): + if method == "FutureFunctionCall": + future_batch_task = self.submit(self.future_funcall(library_name, reduction_tree, fn, *[self.submit(fetch_future_result, f) if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i+reduction_size]])) + else: # Method is FuturePythonTask + future_batch_task = self.submit(reduction_tree, fn, *[f if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i+reduction_size]]) + layer.append(future_batch_task) + sub_futures.append(layer) + future = sub_futures[-1][0] + # if this is a single value + else: + if method == "FutureFunctionCall": + future = self.submit(self.future_funcall(library_name, reduction_tree, fn, iterable)) + else: + future = self.submit(reduction_tree, fn, iterable) + return future + + def allpairs(self, fn, iterable_a, iterable_b, library_name=None, method=None, chunk_size=1): + def wait_for_allpairs_resolution(row_size, *futures_batch): + result = [] + for computed_result in futures_batch: + result.extend(computed_result) + processed_result = [] + for i in range(len(result)//row_size): + row = result[i*row_size:i*row_size+row_size] + processed_result.append(row) + return processed_result + iterable = [(a, b) for b in iterable_b for a in iterable_a] + if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str): + tasks = [] + for i in range(0, len(iterable), chunk_size): + if method == "FutureFunctionCall": + future_batch_task = self.submit(self.future_funcall(library_name, run_iterable, fn, iterable[i:i+chunk_size])) + else: # Method is FuturePythonTask + future_batch_task = self.submit(run_iterable, fn, iterable[i:i+chunk_size]) + tasks.append(future_batch_task) + future = self.submit(wait_for_allpairs_resolution, len(iterable_b), *tasks) + else: + if method == "FutureFunctionCall": + future = self.submit(self.future_funcall(library_name, run_iterable, fn, iterable)) + else: # Method is FuturePythonTask + future = self.submit(run_iterable, fn, iterable) + return future + def submit(self, fn, *args, **kwargs): if isinstance(fn, FuturePythonTask): self.manager.submit(fn) @@ -246,15 +368,15 @@ def cancelled(self): return False def running(self): - state = self._task.state - if state == "RUNNING": + state = self._task._module_manager.task_state(self._task.id) + if state == cvine.VINE_TASK_RUNNING: return True else: return False def done(self): - state = self._task.state - if state == "DONE" or state == "RETRIEVED": + state = self._task._module_manager.task_state(self._task.id) + if state == cvine.VINE_TASK_DONE: return True else: return False @@ -307,7 +429,6 @@ def __init__(self, manager, library_name, fn, *args, **kwargs): self.manager = manager self.library_name = library_name self._envs = [] - self._future = VineFuture(self) self._has_retrieved = False @@ -315,7 +436,7 @@ def __init__(self, manager, library_name, fn, *args, **kwargs): # we must first fetch the file before retruning the result. # to bring that output back to the manager. def output(self, timeout="wait_forever"): - + if not self._has_retrieved: result = self.manager.wait_for_task_id(self.id, timeout=timeout) if result: @@ -332,7 +453,6 @@ def output(self, timeout="wait_forever"): self._saved_output = output['Result'] else: self._saved_output = FunctionCallNoResult(output['Reason']) - except Exception as e: self._saved_output = e else: @@ -406,6 +526,7 @@ def output(self, timeout="wait_forever"): # task or the exception object of a failed task. self._output = cloudpickle.loads(self._output_file.contents()) except Exception as e: + print(self._output_file.contents()) # handle output file fetch/deserialization failures self._output = e self._output_loaded = True @@ -479,3 +600,4 @@ def vineLoadArg(arg): return manager._function_buffers[base] # vim: set sts=4 sw=4 ts=4 expandtab ft=python: + From 378aa5265b564c2223996ef8e2dd71ffee87e42c Mon Sep 17 00:00:00 2001 From: Kevin Xue Date: Wed, 18 Dec 2024 17:41:27 -0500 Subject: [PATCH 02/13] lint --- .../python3/ndcctools/taskvine/futures.py | 61 ++++++++++--------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index 4ef535c91f..25890d5daa 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -1,6 +1,6 @@ from . import cvine import hashlib -from collections import deque +from collections import deque, namedtuple from concurrent.futures import Executor from concurrent.futures import Future from concurrent.futures import FIRST_COMPLETED @@ -10,7 +10,6 @@ from concurrent.futures._base import CANCELLED from concurrent.futures._base import FINISHED from concurrent.futures import TimeoutError -from collections import namedtuple, deque from .task import ( PythonTask, FunctionCall, @@ -142,9 +141,11 @@ def _iterator(): return _iterator() + def run_iterable(fn, iterable, dimensions=1): + if not ((hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str)): - return fn(element) + return fn(iterable) if dimensions < 1: return None result = [] @@ -156,14 +157,15 @@ def run_iterable(fn, iterable, dimensions=1): result.append(fn(element)) else: for inner_iterable in iterable: - result.append(run_iterable(fn, inner_iterable, dimensions-1)) + result.append(run_iterable(fn, inner_iterable, dimensions - 1)) return result + def reduction_tree(fn, *args): + minimum_parameters = len(inspect.signature(fn).parameters) curr_size = len(args) entries = deque([f.result() if isinstance(f, VineFuture) else f for f in args]) - return_val = entries while curr_size >= minimum_parameters: parameters = [] for _ in range(minimum_parameters): @@ -175,7 +177,6 @@ def reduction_tree(fn, *args): else: entries.appendleft(new_result) curr_size = len(entries) - return_val = entries if len(entries) > 1 else entries[0] return entries[0] ## # \class FuturesExecutor @@ -184,7 +185,9 @@ def reduction_tree(fn, *args): # # This class acts as an interface for the creation of Futures + class FuturesExecutor(Executor): + def __init__(self, port=9123, batch_type="local", manager=None, manager_host_port=None, manager_name=None, factory_binary=None, worker_binary=None, log_file=os.devnull, factory=True, opts={}): self.manager = Manager(port=port) self.port = self.manager.port @@ -207,28 +210,29 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por self.set(opt, opts[opt]) self.factory.start() else: - self.factory = None + self.factory = None def map(self, fn, iterable, library_name="Some_Library", method=None, chunk_size=1): def wait_for_map_resolution(*futures_batch): result = [] for computed_result in futures_batch: - result.extend(computed_result) + result.extend(computed_result) return result - if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str): + if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str): tasks = [] - if method == "FutureFunctionCall": # this currently does not work (error described in PR) + if method == "FutureFunctionCall": # this currently does not work (error described in PR) partial_obj = partial(run_iterable, fn) + def partial_func(*args, **kwargs): return partial_obj(*args, **kwargs) libtask = self.create_library_from_functions(library_name, partial_func) self.install_library(libtask) for i in range(0, len(iterable), chunk_size): - future_batch_task = self.submit(self.future_funcall(library_name, partial_func, iterable[i:i+chunk_size])) + future_batch_task = self.submit(self.future_funcall(library_name, partial_func, iterable[i:i + chunk_size])) tasks.append(future_batch_task) else: for i in range(0, len(iterable), chunk_size): - future_batch_task = self.submit(run_iterable, fn, iterable[i:i+chunk_size]) + future_batch_task = self.submit(run_iterable, fn, iterable[i:i + chunk_size]) tasks.append(future_batch_task) future = self.submit(wait_for_map_resolution, *tasks) else: @@ -246,17 +250,17 @@ def partial_func(*args, **kwargs): def reduce(self, fn, iterable, library_name=None, method=None, chunk_size=1): # This line is just the identity - since when a future is pickled, it actually becomes some file, which means it is evaluated immediately/sent to queue - if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str): + if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str): sub_futures = [iterable] num_parameters = len(inspect.signature(fn).parameters) - reduction_size = chunk_size*(num_parameters-1) + reduction_size = chunk_size * (num_parameters - 1) while len(sub_futures[-1]) > 1 or len(sub_futures) == 1: layer = [] for i in range(0, len(sub_futures[-1]), reduction_size): if method == "FutureFunctionCall": - future_batch_task = self.submit(self.future_funcall(library_name, reduction_tree, fn, *[self.submit(fetch_future_result, f) if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i+reduction_size]])) - else: # Method is FuturePythonTask - future_batch_task = self.submit(reduction_tree, fn, *[f if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i+reduction_size]]) + future_batch_task = self.submit(self.future_funcall(library_name, reduction_tree, fn, *[f if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i + reduction_size]])) + else: # Method is FuturePythonTask + future_batch_task = self.submit(reduction_tree, fn, *[f if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i + reduction_size]]) layer.append(future_batch_task) sub_futures.append(layer) future = sub_futures[-1][0] @@ -272,29 +276,29 @@ def allpairs(self, fn, iterable_a, iterable_b, library_name=None, method=None, c def wait_for_allpairs_resolution(row_size, *futures_batch): result = [] for computed_result in futures_batch: - result.extend(computed_result) + result.extend(computed_result) processed_result = [] - for i in range(len(result)//row_size): - row = result[i*row_size:i*row_size+row_size] + for i in range(len(result) // row_size): + row = result[i * row_size:i * row_size + row_size] processed_result.append(row) return processed_result iterable = [(a, b) for b in iterable_b for a in iterable_a] - if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str): + if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str): tasks = [] for i in range(0, len(iterable), chunk_size): if method == "FutureFunctionCall": - future_batch_task = self.submit(self.future_funcall(library_name, run_iterable, fn, iterable[i:i+chunk_size])) - else: # Method is FuturePythonTask - future_batch_task = self.submit(run_iterable, fn, iterable[i:i+chunk_size]) + future_batch_task = self.submit(self.future_funcall(library_name, run_iterable, fn, iterable[i:i + chunk_size])) + else: # Method is FuturePythonTask + future_batch_task = self.submit(run_iterable, fn, iterable[i:i + chunk_size]) tasks.append(future_batch_task) future = self.submit(wait_for_allpairs_resolution, len(iterable_b), *tasks) else: if method == "FutureFunctionCall": future = self.submit(self.future_funcall(library_name, run_iterable, fn, iterable)) - else: # Method is FuturePythonTask + else: # Method is FuturePythonTask future = self.submit(run_iterable, fn, iterable) return future - + def submit(self, fn, *args, **kwargs): if isinstance(fn, FuturePythonTask): self.manager.submit(fn) @@ -436,7 +440,7 @@ def __init__(self, manager, library_name, fn, *args, **kwargs): # we must first fetch the file before retruning the result. # to bring that output back to the manager. def output(self, timeout="wait_forever"): - + if not self._has_retrieved: result = self.manager.wait_for_task_id(self.id, timeout=timeout) if result: @@ -599,5 +603,4 @@ def vineLoadArg(arg): manager._function_buffers[base] = manager.declare_file(name, cache=True) return manager._function_buffers[base] -# vim: set sts=4 sw=4 ts=4 expandtab ft=python: - +# vim: set sts=4 sw=4 ts=4 expandtab ft=python: \ No newline at end of file From 586d7f93df8e759136bd09b4f3b8f08cc799e5d9 Mon Sep 17 00:00:00 2001 From: Kevin Xue Date: Wed, 18 Dec 2024 17:54:08 -0500 Subject: [PATCH 03/13] lint v2 --- taskvine/src/bindings/python3/ndcctools/taskvine/futures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index 25890d5daa..9f338bd76f 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -603,4 +603,4 @@ def vineLoadArg(arg): manager._function_buffers[base] = manager.declare_file(name, cache=True) return manager._function_buffers[base] -# vim: set sts=4 sw=4 ts=4 expandtab ft=python: \ No newline at end of file +# vim: set sts=4 sw=4 ts=4 expandtab ft=python: From 57883210901997793935d9c5c0182e9ea5d41645 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Thu, 23 Jan 2025 09:47:27 -0500 Subject: [PATCH 04/13] cleanup code --- .../python3/ndcctools/taskvine/futures.py | 95 ++++++++++--------- 1 file changed, 49 insertions(+), 46 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index 9f338bd76f..a954d44681 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -1,20 +1,22 @@ from . import cvine import hashlib from collections import deque, namedtuple -from concurrent.futures import Executor -from concurrent.futures import Future -from concurrent.futures import FIRST_COMPLETED -from concurrent.futures import FIRST_EXCEPTION -from concurrent.futures import ALL_COMPLETED -from concurrent.futures._base import PENDING -from concurrent.futures._base import CANCELLED -from concurrent.futures._base import FINISHED +from concurrent.futures import ( + Executor, + Future, + FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, +) +from concurrent.futures._base import PENDING, CANCELLED, FINISHED from concurrent.futures import TimeoutError + from .task import ( PythonTask, FunctionCall, FunctionCallNoResult, ) + from .manager import ( Factory, Manager, @@ -25,6 +27,7 @@ import textwrap import inspect from functools import partial +from collections.abc import Iterator RESULT_PENDING = 'result_pending' @@ -142,27 +145,13 @@ def _iterator(): return _iterator() -def run_iterable(fn, iterable, dimensions=1): - - if not ((hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str)): +def run_iterable(fn, iterable): + if not isinstance(iterable, Iterator): return fn(iterable) - if dimensions < 1: - return None - result = [] - if dimensions == 1: - for element in iterable: - if (hasattr(element, '__iter__') or hasattr(element, '__getitem__')) and not isinstance(element, str): - result.append(fn(*element)) - else: - result.append(fn(element)) - else: - for inner_iterable in iterable: - result.append(run_iterable(fn, inner_iterable, dimensions - 1)) - return result + return list(map(fn, iterable)) def reduction_tree(fn, *args): - minimum_parameters = len(inspect.signature(fn).parameters) curr_size = len(args) entries = deque([f.result() if isinstance(f, VineFuture) else f for f in args]) @@ -171,13 +160,14 @@ def reduction_tree(fn, *args): for _ in range(minimum_parameters): parameters.append(entries.popleft()) new_result = fn(*parameters) - if (hasattr(new_result, '__getitem__') or hasattr(new_result, '__iter__')) and not isinstance(new_result, str): + if isinstance(new_result): for result in new_result: entries.appendleft(result) else: entries.appendleft(new_result) curr_size = len(entries) return entries[0] + ## # \class FuturesExecutor # @@ -218,17 +208,15 @@ def wait_for_map_resolution(*futures_batch): for computed_result in futures_batch: result.extend(computed_result) return result - if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str): + if isinstance(iterable, Iterator): tasks = [] if method == "FutureFunctionCall": # this currently does not work (error described in PR) partial_obj = partial(run_iterable, fn) + libtask = self.create_library_from_functions(library_name, partial_obj) - def partial_func(*args, **kwargs): - return partial_obj(*args, **kwargs) - libtask = self.create_library_from_functions(library_name, partial_func) self.install_library(libtask) for i in range(0, len(iterable), chunk_size): - future_batch_task = self.submit(self.future_funcall(library_name, partial_func, iterable[i:i + chunk_size])) + future_batch_task = self.submit(self.future_funcall(library_name, partial_obj, iterable[i:i + chunk_size])) tasks.append(future_batch_task) else: for i in range(0, len(iterable), chunk_size): @@ -250,7 +238,7 @@ def partial_func(*args, **kwargs): def reduce(self, fn, iterable, library_name=None, method=None, chunk_size=1): # This line is just the identity - since when a future is pickled, it actually becomes some file, which means it is evaluated immediately/sent to queue - if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str): + if isinstance(iterable, Iterator): sub_futures = [iterable] num_parameters = len(inspect.signature(fn).parameters) reduction_size = chunk_size * (num_parameters - 1) @@ -258,9 +246,26 @@ def reduce(self, fn, iterable, library_name=None, method=None, chunk_size=1): layer = [] for i in range(0, len(sub_futures[-1]), reduction_size): if method == "FutureFunctionCall": - future_batch_task = self.submit(self.future_funcall(library_name, reduction_tree, fn, *[f if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i + reduction_size]])) + future_batch_task = self.submit( + self.future_funcall( + library_name, + reduction_tree, + fn, + *[ + f if isinstance(f, VineFuture) else f + for f in sub_futures[-1][i:i + reduction_size] + ] + ) + ) else: # Method is FuturePythonTask - future_batch_task = self.submit(reduction_tree, fn, *[f if isinstance(f, VineFuture) else f for f in sub_futures[-1][i:i + reduction_size]]) + future_batch_task = self.submit( + reduction_tree, + fn, + *[ + f if isinstance(f, VineFuture) else f + for f in sub_futures[-1][i:i + reduction_size] + ] + ) layer.append(future_batch_task) sub_futures.append(layer) future = sub_futures[-1][0] @@ -283,20 +288,18 @@ def wait_for_allpairs_resolution(row_size, *futures_batch): processed_result.append(row) return processed_result iterable = [(a, b) for b in iterable_b for a in iterable_a] - if (hasattr(iterable, '__iter__') or hasattr(iterable, '__getitem__')) and not isinstance(iterable, str): - tasks = [] - for i in range(0, len(iterable), chunk_size): - if method == "FutureFunctionCall": - future_batch_task = self.submit(self.future_funcall(library_name, run_iterable, fn, iterable[i:i + chunk_size])) - else: # Method is FuturePythonTask - future_batch_task = self.submit(run_iterable, fn, iterable[i:i + chunk_size]) - tasks.append(future_batch_task) - future = self.submit(wait_for_allpairs_resolution, len(iterable_b), *tasks) - else: + tasks = [] + for i in range(0, len(iterable), chunk_size): if method == "FutureFunctionCall": - future = self.submit(self.future_funcall(library_name, run_iterable, fn, iterable)) + future_batch_task = self.submit( + self.future_funcall( + library_name, run_iterable, fn, iterable[i:i + chunk_size] + ) + ) else: # Method is FuturePythonTask - future = self.submit(run_iterable, fn, iterable) + future_batch_task = self.submit(run_iterable, fn, iterable[i:i + chunk_size]) + tasks.append(future_batch_task) + future = self.submit(wait_for_allpairs_resolution, len(iterable_b), *tasks) return future def submit(self, fn, *args, **kwargs): From 3b9d8a427bdfc5e707fb97402ebddc54d6960eca Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Thu, 23 Jan 2025 13:06:27 -0500 Subject: [PATCH 05/13] cleanup reduce --- .../python3/ndcctools/taskvine/futures.py | 111 +++++++----------- 1 file changed, 45 insertions(+), 66 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index a954d44681..02df79c10a 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -22,12 +22,12 @@ Manager, ) +import math import os import time import textwrap -import inspect from functools import partial -from collections.abc import Iterator +from collections.abc import Iterator, Sequence RESULT_PENDING = 'result_pending' @@ -112,7 +112,7 @@ def as_completed(fs, timeout=None): f.module_manager.submit(f._task) start = time.perf_counter() - result_timeout = min(timeout, 5) if timeout is not None else 5 + result_timeout = max(1, min(timeout, 5)) if timeout else 5 def _iterator(): # iterate of queue of futures, yeilding completed futures and @@ -136,10 +136,7 @@ def _iterator(): assert result != RESULT_PENDING yield f - if ( - fs and timeout is not None - and time.perf_counter() - start > timeout - ): + if fs and timeout and time.perf_counter() - start > timeout: raise TimeoutError() return _iterator() @@ -151,22 +148,19 @@ def run_iterable(fn, iterable): return list(map(fn, iterable)) -def reduction_tree(fn, *args): - minimum_parameters = len(inspect.signature(fn).parameters) - curr_size = len(args) - entries = deque([f.result() if isinstance(f, VineFuture) else f for f in args]) - while curr_size >= minimum_parameters: - parameters = [] - for _ in range(minimum_parameters): - parameters.append(entries.popleft()) - new_result = fn(*parameters) - if isinstance(new_result): - for result in new_result: - entries.appendleft(result) - else: - entries.appendleft(new_result) - curr_size = len(entries) - return entries[0] +def reduction_tree(fn, *args, n=2): + # n is the arity of the reduction function fn + # if less than 2, we have an infinite loop + print(fn, args, n) + assert n > 1 + entries = [f.result() if isinstance(f, VineFuture) else f for f in args] + if len(entries) < 2: + return entries[0] + + len_multiple = int(math.ceil(len(entries)/n) * n) + new_args = map(fn, [entries[i:i+n] for i in range(0, len_multiple, n)]) + + return reduction_tree(fn, *new_args, n=n) ## # \class FuturesExecutor @@ -231,51 +225,36 @@ def wait_for_map_resolution(*futures_batch): # # parameters: # - Function - # - Iterable of parameters that function will take - # - a library_name for a library function call - # - a method - # - a chunk_size, which is the number of iterations to complete in one task. if c is chunk_size, a single task will reduce c(n-1) + 1 nodes to 1 node + # - a function that receives fn_arity arguments + # - A sequence of parameters that function will take + # - a chunk_size to group elements in sequence to dispatch to a single task + # - arity of the function, elements of a chunk are reduce arity-wise. + # - an optional library_name for a library function call + def reduce(self, fn, iterable, library_name=None, chunk_size=2, fn_arity=2): + assert chunk_size > 1 + assert fn_arity > 1 + assert isinstance(iterable, Sequence) + chunk_size = max(fn_arity, chunk_size) + + new_iterable = [] + while iterable: + heads, iterable = iterable[:chunk_size], iterable[chunk_size:] + heads = [f.result() if isinstance(f, VineFuture) else f for f in heads] + if library_name: + future_batch_task = self.submit( + self.future_funcall( + library_name, reduction_tree, fn, *heads, n=fn_arity + ) + ) + else: + future_batch_task = self.submit(self.future_task(reduction_tree, fn, *heads, n=fn_arity)) - def reduce(self, fn, iterable, library_name=None, method=None, chunk_size=1): - # This line is just the identity - since when a future is pickled, it actually becomes some file, which means it is evaluated immediately/sent to queue - if isinstance(iterable, Iterator): - sub_futures = [iterable] - num_parameters = len(inspect.signature(fn).parameters) - reduction_size = chunk_size * (num_parameters - 1) - while len(sub_futures[-1]) > 1 or len(sub_futures) == 1: - layer = [] - for i in range(0, len(sub_futures[-1]), reduction_size): - if method == "FutureFunctionCall": - future_batch_task = self.submit( - self.future_funcall( - library_name, - reduction_tree, - fn, - *[ - f if isinstance(f, VineFuture) else f - for f in sub_futures[-1][i:i + reduction_size] - ] - ) - ) - else: # Method is FuturePythonTask - future_batch_task = self.submit( - reduction_tree, - fn, - *[ - f if isinstance(f, VineFuture) else f - for f in sub_futures[-1][i:i + reduction_size] - ] - ) - layer.append(future_batch_task) - sub_futures.append(layer) - future = sub_futures[-1][0] - # if this is a single value + new_iterable.append(future_batch_task) + + if len(new_iterable) > 1: + return self.reduce(fn, new_iterable, library_name, chunk_size, fn_arity) else: - if method == "FutureFunctionCall": - future = self.submit(self.future_funcall(library_name, reduction_tree, fn, iterable)) - else: - future = self.submit(reduction_tree, fn, iterable) - return future + return new_iterable[0] def allpairs(self, fn, iterable_a, iterable_b, library_name=None, method=None, chunk_size=1): def wait_for_allpairs_resolution(row_size, *futures_batch): From fe44574a89497025c4b91c25fd26054798bccdae Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Thu, 23 Jan 2025 13:12:38 -0500 Subject: [PATCH 06/13] add test --- taskvine/test/TR_vine_python_future_hof.sh | 84 ++++++++++++++++++++++ taskvine/test/vine_python_future_hof.py | 49 +++++++++++++ 2 files changed, 133 insertions(+) create mode 100755 taskvine/test/TR_vine_python_future_hof.sh create mode 100644 taskvine/test/vine_python_future_hof.py diff --git a/taskvine/test/TR_vine_python_future_hof.sh b/taskvine/test/TR_vine_python_future_hof.sh new file mode 100755 index 0000000000..56fe2b9066 --- /dev/null +++ b/taskvine/test/TR_vine_python_future_hof.sh @@ -0,0 +1,84 @@ +#!/bin/sh + +set -e + +. ../../dttools/test/test_runner_common.sh + +import_config_val CCTOOLS_PYTHON_TEST_EXEC +import_config_val CCTOOLS_PYTHON_TEST_DIR + +export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH + +STATUS_FILE=vine.status +PORT_FILE=vine.port + +check_needed() +{ + [ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1 + + # Poncho currently requires ast.unparse to serialize the function, + # which only became available in Python 3.9. Some older platforms + # (e.g. almalinux8) will not have this natively. + "${CCTOOLS_PYTHON_TEST_EXEC}" -c "from ast import unparse" || return 1 + + # In some limited build circumstances (e.g. macos build on github), + # poncho doesn't work due to lack of conda-pack or cloudpickle + "${CCTOOLS_PYTHON_TEST_EXEC}" -c "import conda_pack" || return 1 + "${CCTOOLS_PYTHON_TEST_EXEC}" -c "import cloudpickle" || return 1 + + return 0 +} + +prepare() +{ + rm -f $STATUS_FILE + rm -f $PORT_FILE + return 0 +} + +run() +{ + ( ${CCTOOLS_PYTHON_TEST_EXEC} vine_python_future_hof.py $PORT_FILE; echo $? > $STATUS_FILE ) & + + # wait at most 15 seconds for vine to find a port. + wait_for_file_creation $PORT_FILE 15 + + run_taskvine_worker $PORT_FILE worker.log --cores 2 --memory 2000 --disk 2000 + + # wait for vine to exit. + wait_for_file_creation $STATUS_FILE 15 + + # retrieve exit status + status=$(cat $STATUS_FILE) + if [ $status -ne 0 ] + then + # display log files in case of failure. + logfile=$(latest_vine_debug_log) + if [ -f ${logfile} ] + then + echo "master log:" + cat ${logfile} + fi + + if [ -f worker.log ] + then + echo "worker log:" + cat worker.log + fi + + exit 1 + fi + + exit 0 +} + +clean() +{ + rm -f $STATUS_FILE + rm -f $PORT_FILE + rm -rf vine-run-info + exit 0 +} + + +dispatch "$@" diff --git a/taskvine/test/vine_python_future_hof.py b/taskvine/test/vine_python_future_hof.py new file mode 100644 index 0000000000..85599f9603 --- /dev/null +++ b/taskvine/test/vine_python_future_hof.py @@ -0,0 +1,49 @@ +#! /usr/bin/env python + +import sys +import ndcctools.taskvine as vine + +port_file = None +try: + port_file = sys.argv[1] +except IndexError: + sys.stderr.write("Usage: {} PORTFILE\n".format(sys.argv[0])) + raise + +def main(): + executor = vine.FuturesExecutor( + port=[9123, 9129], manager_name="vine_hof_test", factory=False + ) + + print("listening on port {}".format(executor.manager.port)) + with open(port_file, "w") as f: + f.write(str(executor.manager.port)) + + nums = list(range(101)) + + maximum = executor.reduce(max, nums, fn_arity=2) + assert maximum.result() == 100 + + maximum = executor.reduce(max, nums, fn_arity=25) + assert maximum.result() == 100 + + maximum = executor.reduce(max, nums, fn_arity=1000) + assert maximum.result() == 100 + + maximum = executor.reduce(max, nums, fn_arity=2, chunk_size=50) + assert maximum.result() == 100 + + minimum = executor.reduce(min, nums, fn_arity=2, chunk_size=50) + assert minimum.result() == 0 + + total = executor.reduce(sum, nums, fn_arity=11, chunk_size=13) + assert total.result() == sum(nums) + + + + +if __name__ == "__main__": + main() + + +# vim: set sts=4 sw=4 ts=4 expandtab ft=python: From fb342bdeb6dcdeb4f7e2fe90cf8461581a7936ec Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Thu, 23 Jan 2025 13:22:21 -0500 Subject: [PATCH 07/13] remove debug print --- taskvine/src/bindings/python3/ndcctools/taskvine/futures.py | 1 - 1 file changed, 1 deletion(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index 02df79c10a..959df947a6 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -151,7 +151,6 @@ def run_iterable(fn, iterable): def reduction_tree(fn, *args, n=2): # n is the arity of the reduction function fn # if less than 2, we have an infinite loop - print(fn, args, n) assert n > 1 entries = [f.result() if isinstance(f, VineFuture) else f for f in args] if len(entries) < 2: From 7048609450725466feb7ad8e1e028973341f5308 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Thu, 23 Jan 2025 14:01:17 -0500 Subject: [PATCH 08/13] cleanup map --- .../python3/ndcctools/taskvine/futures.py | 51 ++++++++++--------- taskvine/test/vine_python_future_hof.py | 6 +++ 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index 959df947a6..3db764b167 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -27,7 +27,7 @@ import time import textwrap from functools import partial -from collections.abc import Iterator, Sequence +from collections.abc import Iterable, Sequence RESULT_PENDING = 'result_pending' @@ -142,10 +142,8 @@ def _iterator(): return _iterator() -def run_iterable(fn, iterable): - if not isinstance(iterable, Iterator): - return fn(iterable) - return list(map(fn, iterable)) +def run_iterable(fn, *args): + return list(map(fn, args)) def reduction_tree(fn, *args, n=2): @@ -195,30 +193,33 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por else: self.factory = None - def map(self, fn, iterable, library_name="Some_Library", method=None, chunk_size=1): + def map(self, fn, iterable, library_name=None, chunk_size=1): + assert chunk_size > 0 + assert isinstance(iterable, Sequence) + def wait_for_map_resolution(*futures_batch): result = [] - for computed_result in futures_batch: - result.extend(computed_result) + for f in futures_batch: + result.extend(f.result() if isinstance(f, VineFuture) else f) return result - if isinstance(iterable, Iterator): - tasks = [] - if method == "FutureFunctionCall": # this currently does not work (error described in PR) - partial_obj = partial(run_iterable, fn) - libtask = self.create_library_from_functions(library_name, partial_obj) - - self.install_library(libtask) - for i in range(0, len(iterable), chunk_size): - future_batch_task = self.submit(self.future_funcall(library_name, partial_obj, iterable[i:i + chunk_size])) - tasks.append(future_batch_task) + + tasks = [] + fn_wrapped = partial(run_iterable, fn) + if library_name: + libtask = self.create_library_from_functions(library_name, fn_wrapped) + self.install_library(libtask) + + while iterable: + heads, iterable = iterable[:chunk_size], iterable[chunk_size:] + + if library_name: + future_batch_task = self.submit(self.future_funcall(library_name, fn_wrapped, *heads)) else: - for i in range(0, len(iterable), chunk_size): - future_batch_task = self.submit(run_iterable, fn, iterable[i:i + chunk_size]) - tasks.append(future_batch_task) - future = self.submit(wait_for_map_resolution, *tasks) - else: - raise Exception('Error: Map function takes in an iterable') - return future + future_batch_task = self.submit(self.future_task(fn_wrapped, *heads)) + + tasks.append(future_batch_task) + + return self.submit(self.future_task(wait_for_map_resolution, *tasks)) # Reduce performs a reduction tree on the iterable and currently returns a single value # diff --git a/taskvine/test/vine_python_future_hof.py b/taskvine/test/vine_python_future_hof.py index 85599f9603..f9b226e7b2 100644 --- a/taskvine/test/vine_python_future_hof.py +++ b/taskvine/test/vine_python_future_hof.py @@ -21,6 +21,12 @@ def main(): nums = list(range(101)) + doubles = executor.map(lambda x: 2*x, nums, chunk_size=10) + assert sum(doubles.result()) == sum(nums)*2 + + doubles = executor.map(lambda x: 2*x, nums, chunk_size=13) + assert sum(doubles.result()) == sum(nums)*2 + maximum = executor.reduce(max, nums, fn_arity=2) assert maximum.result() == 100 From 81a13ec68185246d597dd8d7deeda307051b653e Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Thu, 23 Jan 2025 14:03:55 -0500 Subject: [PATCH 09/13] format --- taskvine/src/bindings/python3/ndcctools/taskvine/futures.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index 3db764b167..1fb29b2033 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -27,7 +27,7 @@ import time import textwrap from functools import partial -from collections.abc import Iterable, Sequence +from collections.abc import Sequence RESULT_PENDING = 'result_pending' @@ -154,8 +154,8 @@ def reduction_tree(fn, *args, n=2): if len(entries) < 2: return entries[0] - len_multiple = int(math.ceil(len(entries)/n) * n) - new_args = map(fn, [entries[i:i+n] for i in range(0, len_multiple, n)]) + len_multiple = int(math.ceil(len(entries) / n) * n) + new_args = map(fn, [entries[i:i + n] for i in range(0, len_multiple, n)]) return reduction_tree(fn, *new_args, n=n) From 06dd5f89c9d0ec8ca24cfa46e066c67a867ab925 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Fri, 24 Jan 2025 08:14:12 -0500 Subject: [PATCH 10/13] allpairs in terms of map --- .../python3/ndcctools/taskvine/futures.py | 55 +++++++++++-------- taskvine/test/vine_python_future_hof.py | 37 +++++++------ 2 files changed, 53 insertions(+), 39 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index 1fb29b2033..21aab4fa67 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -256,30 +256,39 @@ def reduce(self, fn, iterable, library_name=None, chunk_size=2, fn_arity=2): else: return new_iterable[0] - def allpairs(self, fn, iterable_a, iterable_b, library_name=None, method=None, chunk_size=1): - def wait_for_allpairs_resolution(row_size, *futures_batch): + def allpairs(self, fn, iterable_rows, iterable_cols, library_name=None, chunk_size=1): + assert chunk_size > 0 + assert isinstance(iterable_rows, Sequence) + assert isinstance(iterable_cols, Sequence) + + def wait_for_allpairs_resolution(row_size, col_size, mapped): result = [] - for computed_result in futures_batch: - result.extend(computed_result) - processed_result = [] - for i in range(len(result) // row_size): - row = result[i * row_size:i * row_size + row_size] - processed_result.append(row) - return processed_result - iterable = [(a, b) for b in iterable_b for a in iterable_a] - tasks = [] - for i in range(0, len(iterable), chunk_size): - if method == "FutureFunctionCall": - future_batch_task = self.submit( - self.future_funcall( - library_name, run_iterable, fn, iterable[i:i + chunk_size] - ) - ) - else: # Method is FuturePythonTask - future_batch_task = self.submit(run_iterable, fn, iterable[i:i + chunk_size]) - tasks.append(future_batch_task) - future = self.submit(wait_for_allpairs_resolution, len(iterable_b), *tasks) - return future + for _ in range(row_size): + result.append([0] * col_size) + + mapped = mapped.result() if isinstance(mapped, VineFuture) else mapped + for p in mapped: + (i, j, r) = p.result() if isinstance(p, VineFuture) else p + result[i][j] = r + + return result + + def wrap_idx(args): + i, j, a, b = args + return (i, j, fn(a, b)) + + iterable = [(i, j, a, b) for (i, a) in enumerate(iterable_rows) for (j, b) in enumerate(iterable_cols)] + mapped = self.map(wrap_idx, iterable, library_name, chunk_size) + + return self.submit( + self.future_task( + wait_for_allpairs_resolution, + len(iterable_rows), + len(iterable_cols), + mapped, + ) + ) + def submit(self, fn, *args, **kwargs): if isinstance(fn, FuturePythonTask): diff --git a/taskvine/test/vine_python_future_hof.py b/taskvine/test/vine_python_future_hof.py index f9b226e7b2..e14b00f024 100644 --- a/taskvine/test/vine_python_future_hof.py +++ b/taskvine/test/vine_python_future_hof.py @@ -21,29 +21,34 @@ def main(): nums = list(range(101)) - doubles = executor.map(lambda x: 2*x, nums, chunk_size=10) - assert sum(doubles.result()) == sum(nums)*2 + rows = 3 + mult_table = executor.allpairs(lambda x, y: x*y, range(rows), nums, chunk_size=11).result() + assert sum(mult_table[1]) == sum(nums) + assert sum(sum(r) for r in mult_table) == sum(sum(nums) * n for n in range(rows)) - doubles = executor.map(lambda x: 2*x, nums, chunk_size=13) - assert sum(doubles.result()) == sum(nums)*2 + doubles = executor.map(lambda x: 2*x, nums, chunk_size=10).result() + assert sum(doubles) == sum(nums)*2 - maximum = executor.reduce(max, nums, fn_arity=2) - assert maximum.result() == 100 + doubles = executor.map(lambda x: 2*x, nums, chunk_size=13).result() + assert sum(doubles) == sum(nums)*2 - maximum = executor.reduce(max, nums, fn_arity=25) - assert maximum.result() == 100 + maximum = executor.reduce(max, nums, fn_arity=2).result() + assert maximum == 100 - maximum = executor.reduce(max, nums, fn_arity=1000) - assert maximum.result() == 100 + maximum = executor.reduce(max, nums, fn_arity=25).result() + assert maximum == 100 - maximum = executor.reduce(max, nums, fn_arity=2, chunk_size=50) - assert maximum.result() == 100 + maximum = executor.reduce(max, nums, fn_arity=1000).result() + assert maximum == 100 - minimum = executor.reduce(min, nums, fn_arity=2, chunk_size=50) - assert minimum.result() == 0 + maximum = executor.reduce(max, nums, fn_arity=2, chunk_size=50).result() + assert maximum == 100 - total = executor.reduce(sum, nums, fn_arity=11, chunk_size=13) - assert total.result() == sum(nums) + minimum = executor.reduce(min, nums, fn_arity=2, chunk_size=50).result() + assert minimum == 0 + + total = executor.reduce(sum, nums, fn_arity=11, chunk_size=13).result() + assert total == sum(nums) From ef3f742b6add2913572266f586ec17beea404267 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Fri, 24 Jan 2025 08:17:20 -0500 Subject: [PATCH 11/13] format --- taskvine/src/bindings/python3/ndcctools/taskvine/futures.py | 1 - 1 file changed, 1 deletion(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index 21aab4fa67..9b69d76dea 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -289,7 +289,6 @@ def wrap_idx(args): ) ) - def submit(self, fn, *args, **kwargs): if isinstance(fn, FuturePythonTask): self.manager.submit(fn) From 190a6faab56d5e680e712cfdb36c384e5fa27a89 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Mon, 27 Jan 2025 08:55:23 -0500 Subject: [PATCH 12/13] do not create lib in map --- taskvine/src/bindings/python3/ndcctools/taskvine/futures.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index 9b69d76dea..9d0dbba7a2 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -205,10 +205,6 @@ def wait_for_map_resolution(*futures_batch): tasks = [] fn_wrapped = partial(run_iterable, fn) - if library_name: - libtask = self.create_library_from_functions(library_name, fn_wrapped) - self.install_library(libtask) - while iterable: heads, iterable = iterable[:chunk_size], iterable[chunk_size:] From e5fe9f711023a5f9f51cbbb898209a07031ee590 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Thu, 30 Jan 2025 08:56:53 -0500 Subject: [PATCH 13/13] error on lib name --- taskvine/src/bindings/python3/ndcctools/taskvine/futures.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index 9d0dbba7a2..6b290e2494 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -209,6 +209,7 @@ def wait_for_map_resolution(*futures_batch): heads, iterable = iterable[:chunk_size], iterable[chunk_size:] if library_name: + raise NotImplementedError("Using a library not currently supported.") future_batch_task = self.submit(self.future_funcall(library_name, fn_wrapped, *heads)) else: future_batch_task = self.submit(self.future_task(fn_wrapped, *heads)) @@ -237,6 +238,7 @@ def reduce(self, fn, iterable, library_name=None, chunk_size=2, fn_arity=2): heads, iterable = iterable[:chunk_size], iterable[chunk_size:] heads = [f.result() if isinstance(f, VineFuture) else f for f in heads] if library_name: + raise NotImplementedError("Using a library not currently supported.") future_batch_task = self.submit( self.future_funcall( library_name, reduction_tree, fn, *heads, n=fn_arity