diff --git a/doc/manuals/taskvine/index.md b/doc/manuals/taskvine/index.md index 9620ff4ab6..cf8f0e2bfd 100644 --- a/doc/manuals/taskvine/index.md +++ b/doc/manuals/taskvine/index.md @@ -1507,8 +1507,47 @@ function definitions into a library task `libtask` libtask = m.create_library_from_functions("my_library", my_sum, my_mul) ``` -You can optionally specify the number of functions the library can -run concurrently by setting the number of function slots (default to 1): +We strongly recommend to specify the modules the function needs inside the function itself. This ensures that the correct modules and their aliases will be available when the functions are executed in isolation at the worker: + +You can certainly embed `import` statements within the function and install any necessary packages: + +=== Python + ```python + def divide(dividend, divisor): + import math + return dividend / math.sqrt(divisor) + + libtask = m.create_library_from_functions("my_library", divide) + ``` + +If the overhead of importing modules per function is noticeable, modules can be optionally imported as a common preamble to the function executions. Common modules can be specified with the `import_modules` argument to `create_library_from_functions`. This reduces the overhead by eliminating redundant imports: + + +=== Python + ```python + import numpy + import math + + import_modules = [numpy, math] + ``` + +`import_modules` only accepts modules as arguments (e.g. it can't be used to import functions, or select particular names with `from ... import ...` statements. Such statements should be made inside functions after specifying the modules with `import_modules`. + +=== Python + ```python + def cube(x): + # whenever using FromImport statments, put them inside of functions + from random import uniform + from time import sleep as time_sleep + + random_delay = uniform(0.00001, 0.0001) + time_sleep(random_delay) + + return math.pow(x, 3) + ``` + + +After installing the packages and functions, you can optionally specify the number of functions the library can run concurrently by setting the number of function slots (default to 1): === "Python" ```python diff --git a/dttools/test/test_runner_common.sh b/dttools/test/test_runner_common.sh index c2f2f4686a..7816be7642 100755 --- a/dttools/test/test_runner_common.sh +++ b/dttools/test/test_runner_common.sh @@ -138,7 +138,7 @@ run_taskvine_worker() exit 1 fi echo "Running worker." - if ! "$TASKVINE_WORKER" --single-shot --timeout=10s --cores ${cores:-1} --memory ${memory:-250} --disk ${disk:-250} --gpus ${gpus:-0} --debug=all --debug-file="$log" $* localhost $(cat "$port_file"); then + if ! "$TASKVINE_WORKER" --single-shot --timeout=10 --cores ${cores:-1} --memory ${memory:-250} --disk ${disk:-250} --gpus ${gpus:-0} --debug=all --debug-file="$log" $* localhost $(cat "$port_file"); then echo "ERROR: could not start worker" exit 1 fi diff --git a/poncho/src/poncho/package_serverize.py b/poncho/src/poncho/package_serverize.py index 71c5d3b73d..51cd13c30e 100755 --- a/poncho/src/poncho/package_serverize.py +++ b/poncho/src/poncho/package_serverize.py @@ -10,11 +10,11 @@ from ndcctools.poncho.wq_network_code import wq_network_code from ndcctools.poncho.library_network_code import library_network_code -import argparse import json import os import stat import ast +import types import tarfile import hashlib import inspect @@ -23,69 +23,93 @@ default_name_func = \ '''def name(): - return "my_coprocess" + return "my_coprocess" ''' init_function = \ '''if __name__ == "__main__": - main() + main() ''' + +# Generates a list of import statements based on the given argument. +# @param import_modules A list of modules imported at the preamble of library +def generate_import_statements(import_modules): + if not import_modules: + return + + if not isinstance(import_modules, list): + raise ValueError("Expected 'import_modules' to be a list.") + + import_statements = [] + for module in import_modules: + if not isinstance(module, types.ModuleType): + raise ValueError("Expected ModuleType in 'import_modules'.") + + import_statements.append(f"import {module.__name__}") + + return import_statements + + # Create the library driver code that will be run as a normal task # on workers and execute function invocations upon workers' instructions. -# @param path Path to the temporary Python script containing functions. -# @param funcs A list of relevant function names. -# @param dest Path to the final library script. -# @param version Whether this is for workqueue or taskvine serverless code. -def create_library_code(path, funcs, dest, version): - import_modules = [] - function_source_code = [] - name_source_code = "" - absolute_path = os.path.abspath(path) - # open the source file, parse the code into an ast, and then unparse the ast import statements and functions back into python code - with open(absolute_path, 'r') as source: - code = ast.parse(source.read(), filename=absolute_path) - for stmt in ast.walk(code): - if isinstance(stmt, ast.Import) or isinstance(stmt, ast.ImportFrom): - import_modules.append(ast.unparse(stmt)) - if isinstance(stmt, ast.FunctionDef): - if stmt.name == "name": - name_source_code = ast.unparse(stmt) - elif stmt.name in funcs: - function_source_code.append(ast.unparse(stmt)) - funcs.remove(stmt.name) - if name_source_code == "": - print("No name function found, defaulting to my_coprocess") - name_source_code = default_name_func - for func in funcs: - print(f"No function found named {func}, skipping") - # create output file - output_file = open(dest, "w") - # write shebang to file - output_file.write(shebang) - # write imports to file - for import_module in import_modules: - output_file.write(f"{import_module}\n") - # write network code into it - if version == "work_queue": - raw_source_fnc = wq_network_code - elif version == "taskvine": - raw_source_fnc = library_network_code - raw_source_code = inspect.getsource(raw_source_fnc) - network_code = "\n".join([line[4:] for line in raw_source_code.split("\n")[1:]]) - output_file.write(network_code) - # write name function code into it - output_file.write(f"{name_source_code}\n") - # iterate over every function the user requested and attempt to put it into the library code - for function_code in function_source_code: - output_file.write("@remote_execute\n") - output_file.write(function_code) - output_file.write("\n") - output_file.write(init_function) - output_file.close() - st = os.stat(dest) - os.chmod(dest, st.st_mode | stat.S_IEXEC) +# @param path Path to the temporary Python script containing functions. +# @param funcs A list of relevant function names. +# @param dest Path to the final library script. +# @param version Whether this is for workqueue or taskvine serverless code. +# @param import_modules A list of modules to be imported at the preamble of library +def create_library_code(path, funcs, dest, version, import_modules=None): + + # create output file + with open(dest, "w") as output_file: + # write shebang to file + output_file.write(shebang) + # write imports to file + import_statements = generate_import_statements(import_modules) + if import_statements: + for import_statement in import_statements: + output_file.write(f"{import_statement}\n") + + function_source_code = [] + name_source_code = "" + absolute_path = os.path.abspath(path) + # open the source file, parse the code into an ast, and then unparse functions back into python code + with open(absolute_path, 'r') as source: + code = ast.parse(source.read(), filename=absolute_path) + for stmt in ast.walk(code): + if isinstance(stmt, ast.FunctionDef): + if stmt.name == "name": + name_source_code = ast.unparse(stmt) + elif stmt.name in funcs: + function_source_code.append(ast.unparse(stmt)) + funcs.remove(stmt.name) + if name_source_code == "": + print("No name function found, defaulting to my_coprocess") + name_source_code = default_name_func + for func in funcs: + print(f"No function found named {func}, skipping") + + # write network code into it + if version == "work_queue": + raw_source_fnc = wq_network_code + elif version == "taskvine": + raw_source_fnc = library_network_code + raw_source_code = inspect.getsource(raw_source_fnc) + network_code = "\n".join([line[4:] for line in raw_source_code.split("\n")[1:]]) + output_file.write(network_code) + + # write name function code into it + output_file.write(f"{name_source_code}\n") + # iterate over every function the user requested and attempt to put it into the library code + for function_code in function_source_code: + output_file.write("@remote_execute\n") + output_file.write(function_code) + output_file.write("\n") + output_file.write(init_function) + + st = os.stat(dest) + os.chmod(dest, st.st_mode | stat.S_IEXEC) def sort_spec(spec): sorted_spec = json.load(spec) @@ -150,9 +174,10 @@ def generate_functions_hash(functions: list) -> str: # Create a library file and a poncho environment tarball from a list of functions as needed. # The functions in the list must have source code for this code to work. -# @param path path to directory to create the library python file and the environment tarball. -# @param functions list of functions to include in the -def serverize_library_from_code(path, functions, name, need_pack=True): +# @param path path to directory to create the library python file and the environment tarball. +# @param functions list of functions to include in the +# @param import_modules a list of modules to be imported at the preamble of library +def serverize_library_from_code(path, functions, name, need_pack=True, import_modules=None): tmp_library_path = f"{path}/tmp_library.py" # Write out functions into a temporary python file. @@ -162,7 +187,7 @@ def serverize_library_from_code(path, functions, name, need_pack=True): temp_source_file.write(f"def name():\n\treturn '{name}'") # create the final library code from that temporary file - create_library_code(tmp_library_path, [fnc.__name__ for fnc in functions], path + "/library_code.py", "taskvine") + create_library_code(tmp_library_path, [fnc.__name__ for fnc in functions], path + "/library_code.py", "taskvine", import_modules=import_modules) # remove the temp library file os.remove(tmp_library_path) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py b/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py index 967302a9b2..3c1645d09c 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/manager.py @@ -861,7 +861,8 @@ def remove_library(self, name): # @param init_command A string describing a shell command to execute before the library task is run # @param add_env Whether to automatically create and/or add environment to the library # @returns A task to be used with @ref ndcctools.taskvine.manager.Manager.install_library. - def create_library_from_functions(self, name, *function_list, poncho_env=None, init_command=None, add_env=True): + # @param import_modules A list of modules to be imported at the preamble of library + def create_library_from_functions(self, name, *function_list, poncho_env=None, init_command=None, add_env=True, import_modules=None): # Delay loading of poncho until here, to avoid bringing in poncho dependencies unless needed. # Ensure poncho python library is available. try: @@ -872,7 +873,7 @@ def create_library_from_functions(self, name, *function_list, poncho_env=None, i # Positional arguments are the list of functions to include in the library. # Create a unique hash of a combination of function names and bodies. functions_hash = package_serverize.generate_functions_hash(function_list) - + # Create path for caching library code and environment based on function hash. library_cache_path = f"{self.cache_directory}/vine-library-cache/{functions_hash}" library_code_path = f"{library_cache_path}/library_code.py" @@ -896,7 +897,7 @@ def create_library_from_functions(self, name, *function_list, poncho_env=None, i need_pack=False # create library code and environment, if appropriate - package_serverize.serverize_library_from_code(library_cache_path, function_list, name, need_pack=need_pack) + package_serverize.serverize_library_from_code(library_cache_path, function_list, name, need_pack=need_pack, import_modules=import_modules) # enable correct permissions for library code os.chmod(library_code_path, 0o775) diff --git a/taskvine/src/examples/vine_example_function_call.py b/taskvine/src/examples/vine_example_function_call.py index 899e7eb502..1cde70bdf2 100755 --- a/taskvine/src/examples/vine_example_function_call.py +++ b/taskvine/src/examples/vine_example_function_call.py @@ -5,17 +5,31 @@ # using FunctionCall tasks. import ndcctools.taskvine as vine -import json import argparse +import math +import json + +# The library will consist of the following three functions: + +def cube(x): + # whenever using FromImport statments, put them inside of functions + from random import uniform + from time import sleep as time_sleep -# The library will consist of the following two functions: + random_delay = uniform(0.00001, 0.0001) + time_sleep(random_delay) + + return math.pow(x, 3) def divide(dividend, divisor): - import math - return dividend/math.sqrt(divisor) + # straightfoward usage of preamble import statements + return dividend / math.sqrt(divisor) def double(x): - return x*2 + import math as m + # use alias inside of functions + return m.prod([x, 2]) + def main(): parser = argparse.ArgumentParser( @@ -30,7 +44,7 @@ def main(): default=False, ) - q = vine.Manager(9123) + q = vine.Manager(port=9123) print(f"TaskVine manager listening on port {q.port}") @@ -41,35 +55,41 @@ def main(): else: q.enable_peer_transfers() - print("Creating library from functions...") + print("Creating library from packages and functions...") + + # This format shows how tocd create package import statements for the library + import_modules = [math] + libtask = q.create_library_from_functions('test-library', divide, double, cube, import_modules=import_modules, add_env=False) - libtask = q.create_library_from_functions('test-library', divide, double) q.install_library(libtask) print("Submitting function call tasks...") tasks = 100 - for i in range(0,tasks): + for _ in range(0, tasks): s_task = vine.FunctionCall('test-library', 'divide', 2, 2**2) q.submit(s_task) - + s_task = vine.FunctionCall('test-library', 'double', 3) q.submit(s_task) + s_task = vine.FunctionCall('test-library', 'cube', 4) + q.submit(s_task) + print("Waiting for results...") total_sum = 0 - x = 0 + while not q.empty(): t = q.wait(5) if t: - x = t.output + x = t.output total_sum += x print(f"task {t.id} completed with result {x}") # Check that we got the right result. - expected = tasks * ( divide(2, 2**2) + double(3) ) + expected = tasks * (divide(2, 2**2) + double(3) + cube(4)) print(f"Total: {total_sum}") print(f"Expected: {expected}") @@ -78,4 +98,6 @@ def main(): if __name__ == '__main__': main() + + # vim: set sts=4 sw=4 ts=4 expandtab ft=python: diff --git a/taskvine/src/examples/vine_example_future_matrix.py b/taskvine/src/examples/vine_example_future_matrix.py index ae114abb36..d764d52412 100644 --- a/taskvine/src/examples/vine_example_future_matrix.py +++ b/taskvine/src/examples/vine_example_future_matrix.py @@ -44,7 +44,7 @@ def load_matrices(levels): def write_matrices(levels, n): for x in range(2**(levels+1)): - matrix = generateRandomMatrix(n) + matrix = generate_random_matrix(n) with open('matrices/matrix-{}'.format(x), 'wb') as f: cloudpickle.dump(matrix, f) diff --git a/taskvine/test/vine_python_serverless.py b/taskvine/test/vine_python_serverless.py index 356fb3c6c9..f0cf6d7f57 100755 --- a/taskvine/test/vine_python_serverless.py +++ b/taskvine/test/vine_python_serverless.py @@ -1,15 +1,35 @@ #!/usr/bin/env python3 +# This example shows how to install a library of functions once +# as a LibraryTask, and then invoke that library remotely by +# using FunctionCall tasks. + import ndcctools.taskvine as vine -import json import argparse +import math +import json + +# The library will consist of the following three functions: + +def cube(x): + # whenever using FromImport statments, put them inside of functions + from random import uniform + from time import sleep as time_sleep + + random_delay = uniform(0.00001, 0.0001) + time_sleep(random_delay) + + return math.pow(x, 3) def divide(dividend, divisor): - import math - return dividend/math.sqrt(divisor) + # straightfoward usage of preamble import statements + return dividend / math.sqrt(divisor) def double(x): - return x*2 + import math as m + # use alias inside of functions + return m.prod([x, 2]) + def main(): parser = argparse.ArgumentParser("Test for taskvine python bindings.") @@ -25,9 +45,12 @@ def main(): print("Writing port {port} to file {file}".format(port=q.port, file=args.port_file)) f.write(str(q.port)) - print("Creating library from functions...") + print("Creating library from packages and functions...") - libtask = q.create_library_from_functions('test-library', divide, double, add_env=False) + # This format shows how to create package import statements for the library + import_modules = [math] + libtask = q.create_library_from_functions('test-library', divide, double, cube, import_modules=import_modules, add_env=False) + libtask.set_cores(1) libtask.set_memory(1000) libtask.set_disk(1000) @@ -38,27 +61,30 @@ def main(): tasks = 100 - for i in range(0,tasks): + for _ in range(0, tasks): s_task = vine.FunctionCall('test-library', 'divide', 2, 2**2) q.submit(s_task) s_task = vine.FunctionCall('test-library', 'double', 3) q.submit(s_task) + s_task = vine.FunctionCall('test-library', 'cube', 4) + q.submit(s_task) + print("Waiting for results...") total_sum = 0 - x = 0 + while not q.empty(): t = q.wait(5) if t: - x = t.output + x = t.output total_sum += x print(f"task {t.id} completed with result {x}") # Check that we got the right result. - expected = tasks * ( divide(2, 2**2) + double(3) ) - + expected = tasks * (divide(2, 2**2) + double(3) + cube(4)) + print(f"Total: {total_sum}") print(f"Expected: {expected}") @@ -67,4 +93,5 @@ def main(): if __name__ == '__main__': main() + # vim: set sts=4 sw=4 ts=4 expandtab ft=python: