diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index b74cc1a3c0..6b290e2494 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -1,30 +1,33 @@ - from . import cvine import hashlib -from collections import deque -from concurrent.futures import Executor -from concurrent.futures import Future -from concurrent.futures import FIRST_COMPLETED -from concurrent.futures import FIRST_EXCEPTION -from concurrent.futures import ALL_COMPLETED -from concurrent.futures._base import PENDING -from concurrent.futures._base import CANCELLED -from concurrent.futures._base import FINISHED +from collections import deque, namedtuple +from concurrent.futures import ( + Executor, + Future, + FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, +) +from concurrent.futures._base import PENDING, CANCELLED, FINISHED from concurrent.futures import TimeoutError -from collections import namedtuple + from .task import ( PythonTask, FunctionCall, FunctionCallNoResult, ) + from .manager import ( Factory, Manager, ) +import math import os import time import textwrap +from functools import partial +from collections.abc import Sequence RESULT_PENDING = 'result_pending' @@ -109,7 +112,7 @@ def as_completed(fs, timeout=None): f.module_manager.submit(f._task) start = time.perf_counter() - result_timeout = min(timeout, 5) if timeout is not None else 5 + result_timeout = max(1, min(timeout, 5)) if timeout else 5 def _iterator(): # iterate of queue of futures, yeilding completed futures and @@ -133,22 +136,39 @@ def _iterator(): assert result != RESULT_PENDING yield f - if ( - fs and timeout is not None - and time.perf_counter() - start > timeout - ): + if fs and timeout and time.perf_counter() - start > timeout: raise TimeoutError() return _iterator() +def run_iterable(fn, *args): + return list(map(fn, args)) + + +def reduction_tree(fn, *args, n=2): + # n is the arity of the reduction function fn + # if less than 2, we have an infinite loop + assert n > 1 + entries = [f.result() if isinstance(f, VineFuture) else f for f in args] + if len(entries) < 2: + return entries[0] + + len_multiple = int(math.ceil(len(entries) / n) * n) + new_args = map(fn, [entries[i:i + n] for i in range(0, len_multiple, n)]) + + return reduction_tree(fn, *new_args, n=n) + ## # \class FuturesExecutor # # TaskVine FuturesExecutor object # # This class acts as an interface for the creation of Futures + + class FuturesExecutor(Executor): + def __init__(self, port=9123, batch_type="local", manager=None, manager_host_port=None, manager_name=None, factory_binary=None, worker_binary=None, log_file=os.devnull, factory=True, opts={}): self.manager = Manager(port=port) self.port = self.manager.port @@ -173,6 +193,100 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por else: self.factory = None + def map(self, fn, iterable, library_name=None, chunk_size=1): + assert chunk_size > 0 + assert isinstance(iterable, Sequence) + + def wait_for_map_resolution(*futures_batch): + result = [] + for f in futures_batch: + result.extend(f.result() if isinstance(f, VineFuture) else f) + return result + + tasks = [] + fn_wrapped = partial(run_iterable, fn) + while iterable: + heads, iterable = iterable[:chunk_size], iterable[chunk_size:] + + if library_name: + raise NotImplementedError("Using a library not currently supported.") + future_batch_task = self.submit(self.future_funcall(library_name, fn_wrapped, *heads)) + else: + future_batch_task = self.submit(self.future_task(fn_wrapped, *heads)) + + tasks.append(future_batch_task) + + return self.submit(self.future_task(wait_for_map_resolution, *tasks)) + + # Reduce performs a reduction tree on the iterable and currently returns a single value + # + # parameters: + # - Function + # - a function that receives fn_arity arguments + # - A sequence of parameters that function will take + # - a chunk_size to group elements in sequence to dispatch to a single task + # - arity of the function, elements of a chunk are reduce arity-wise. + # - an optional library_name for a library function call + def reduce(self, fn, iterable, library_name=None, chunk_size=2, fn_arity=2): + assert chunk_size > 1 + assert fn_arity > 1 + assert isinstance(iterable, Sequence) + chunk_size = max(fn_arity, chunk_size) + + new_iterable = [] + while iterable: + heads, iterable = iterable[:chunk_size], iterable[chunk_size:] + heads = [f.result() if isinstance(f, VineFuture) else f for f in heads] + if library_name: + raise NotImplementedError("Using a library not currently supported.") + future_batch_task = self.submit( + self.future_funcall( + library_name, reduction_tree, fn, *heads, n=fn_arity + ) + ) + else: + future_batch_task = self.submit(self.future_task(reduction_tree, fn, *heads, n=fn_arity)) + + new_iterable.append(future_batch_task) + + if len(new_iterable) > 1: + return self.reduce(fn, new_iterable, library_name, chunk_size, fn_arity) + else: + return new_iterable[0] + + def allpairs(self, fn, iterable_rows, iterable_cols, library_name=None, chunk_size=1): + assert chunk_size > 0 + assert isinstance(iterable_rows, Sequence) + assert isinstance(iterable_cols, Sequence) + + def wait_for_allpairs_resolution(row_size, col_size, mapped): + result = [] + for _ in range(row_size): + result.append([0] * col_size) + + mapped = mapped.result() if isinstance(mapped, VineFuture) else mapped + for p in mapped: + (i, j, r) = p.result() if isinstance(p, VineFuture) else p + result[i][j] = r + + return result + + def wrap_idx(args): + i, j, a, b = args + return (i, j, fn(a, b)) + + iterable = [(i, j, a, b) for (i, a) in enumerate(iterable_rows) for (j, b) in enumerate(iterable_cols)] + mapped = self.map(wrap_idx, iterable, library_name, chunk_size) + + return self.submit( + self.future_task( + wait_for_allpairs_resolution, + len(iterable_rows), + len(iterable_cols), + mapped, + ) + ) + def submit(self, fn, *args, **kwargs): if isinstance(fn, FuturePythonTask): self.manager.submit(fn) @@ -246,15 +360,15 @@ def cancelled(self): return False def running(self): - state = self._task.state - if state == "RUNNING": + state = self._task._module_manager.task_state(self._task.id) + if state == cvine.VINE_TASK_RUNNING: return True else: return False def done(self): - state = self._task.state - if state == "DONE" or state == "RETRIEVED": + state = self._task._module_manager.task_state(self._task.id) + if state == cvine.VINE_TASK_DONE: return True else: return False @@ -307,7 +421,6 @@ def __init__(self, manager, library_name, fn, *args, **kwargs): self.manager = manager self.library_name = library_name self._envs = [] - self._future = VineFuture(self) self._has_retrieved = False @@ -332,7 +445,6 @@ def output(self, timeout="wait_forever"): self._saved_output = output['Result'] else: self._saved_output = FunctionCallNoResult(output['Reason']) - except Exception as e: self._saved_output = e else: @@ -406,6 +518,7 @@ def output(self, timeout="wait_forever"): # task or the exception object of a failed task. self._output = cloudpickle.loads(self._output_file.contents()) except Exception as e: + print(self._output_file.contents()) # handle output file fetch/deserialization failures self._output = e self._output_loaded = True diff --git a/taskvine/test/TR_vine_python_future_hof.sh b/taskvine/test/TR_vine_python_future_hof.sh new file mode 100755 index 0000000000..56fe2b9066 --- /dev/null +++ b/taskvine/test/TR_vine_python_future_hof.sh @@ -0,0 +1,84 @@ +#!/bin/sh + +set -e + +. ../../dttools/test/test_runner_common.sh + +import_config_val CCTOOLS_PYTHON_TEST_EXEC +import_config_val CCTOOLS_PYTHON_TEST_DIR + +export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH + +STATUS_FILE=vine.status +PORT_FILE=vine.port + +check_needed() +{ + [ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1 + + # Poncho currently requires ast.unparse to serialize the function, + # which only became available in Python 3.9. Some older platforms + # (e.g. almalinux8) will not have this natively. + "${CCTOOLS_PYTHON_TEST_EXEC}" -c "from ast import unparse" || return 1 + + # In some limited build circumstances (e.g. macos build on github), + # poncho doesn't work due to lack of conda-pack or cloudpickle + "${CCTOOLS_PYTHON_TEST_EXEC}" -c "import conda_pack" || return 1 + "${CCTOOLS_PYTHON_TEST_EXEC}" -c "import cloudpickle" || return 1 + + return 0 +} + +prepare() +{ + rm -f $STATUS_FILE + rm -f $PORT_FILE + return 0 +} + +run() +{ + ( ${CCTOOLS_PYTHON_TEST_EXEC} vine_python_future_hof.py $PORT_FILE; echo $? > $STATUS_FILE ) & + + # wait at most 15 seconds for vine to find a port. + wait_for_file_creation $PORT_FILE 15 + + run_taskvine_worker $PORT_FILE worker.log --cores 2 --memory 2000 --disk 2000 + + # wait for vine to exit. + wait_for_file_creation $STATUS_FILE 15 + + # retrieve exit status + status=$(cat $STATUS_FILE) + if [ $status -ne 0 ] + then + # display log files in case of failure. + logfile=$(latest_vine_debug_log) + if [ -f ${logfile} ] + then + echo "master log:" + cat ${logfile} + fi + + if [ -f worker.log ] + then + echo "worker log:" + cat worker.log + fi + + exit 1 + fi + + exit 0 +} + +clean() +{ + rm -f $STATUS_FILE + rm -f $PORT_FILE + rm -rf vine-run-info + exit 0 +} + + +dispatch "$@" diff --git a/taskvine/test/vine_python_future_hof.py b/taskvine/test/vine_python_future_hof.py new file mode 100644 index 0000000000..e14b00f024 --- /dev/null +++ b/taskvine/test/vine_python_future_hof.py @@ -0,0 +1,60 @@ +#! /usr/bin/env python + +import sys +import ndcctools.taskvine as vine + +port_file = None +try: + port_file = sys.argv[1] +except IndexError: + sys.stderr.write("Usage: {} PORTFILE\n".format(sys.argv[0])) + raise + +def main(): + executor = vine.FuturesExecutor( + port=[9123, 9129], manager_name="vine_hof_test", factory=False + ) + + print("listening on port {}".format(executor.manager.port)) + with open(port_file, "w") as f: + f.write(str(executor.manager.port)) + + nums = list(range(101)) + + rows = 3 + mult_table = executor.allpairs(lambda x, y: x*y, range(rows), nums, chunk_size=11).result() + assert sum(mult_table[1]) == sum(nums) + assert sum(sum(r) for r in mult_table) == sum(sum(nums) * n for n in range(rows)) + + doubles = executor.map(lambda x: 2*x, nums, chunk_size=10).result() + assert sum(doubles) == sum(nums)*2 + + doubles = executor.map(lambda x: 2*x, nums, chunk_size=13).result() + assert sum(doubles) == sum(nums)*2 + + maximum = executor.reduce(max, nums, fn_arity=2).result() + assert maximum == 100 + + maximum = executor.reduce(max, nums, fn_arity=25).result() + assert maximum == 100 + + maximum = executor.reduce(max, nums, fn_arity=1000).result() + assert maximum == 100 + + maximum = executor.reduce(max, nums, fn_arity=2, chunk_size=50).result() + assert maximum == 100 + + minimum = executor.reduce(min, nums, fn_arity=2, chunk_size=50).result() + assert minimum == 0 + + total = executor.reduce(sum, nums, fn_arity=11, chunk_size=13).result() + assert total == sum(nums) + + + + +if __name__ == "__main__": + main() + + +# vim: set sts=4 sw=4 ts=4 expandtab ft=python: