From 8db027332e509f20523b73f9fa76f325b9e118f0 Mon Sep 17 00:00:00 2001 From: Douglas Thain Date: Tue, 28 Jan 2025 13:48:08 -0500 Subject: [PATCH 1/9] Vine: Set File Mode (#4034) * Add hooks to set file mode for non-file sources. * Made override file type apply to all file types. * format * Require mode 0600 --- .../python3/ndcctools/taskvine/file.py | 9 +++ taskvine/src/manager/taskvine.h | 10 +++ taskvine/src/manager/vine_file.c | 8 +++ taskvine/src/manager/vine_file.h | 1 + taskvine/src/manager/vine_manager_put.c | 65 +++++++++++++------ taskvine/test/vine_python.py | 7 +- 6 files changed, 77 insertions(+), 23 deletions(-) diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/file.py b/taskvine/src/bindings/python3/ndcctools/taskvine/file.py index 69d006f74e..e187f8b067 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/file.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/file.py @@ -45,6 +45,15 @@ def type(self): if self._file: return cvine.vine_file_type(self._file) + ## + # Set the Unix mode permission bits for the remote file. + # + # @param self A file object. + # @param mode Unix mode bits. + def set_mode(self, mode): + if self._file: + return cvine.vine_file_set_mode(self._file, mode) + ## # Return the contents of a file object as a string. # Typically used to return the contents of an output buffer. diff --git a/taskvine/src/manager/taskvine.h b/taskvine/src/manager/taskvine.h index d5d10a39d2..b76869de46 100644 --- a/taskvine/src/manager/taskvine.h +++ b/taskvine/src/manager/taskvine.h @@ -890,6 +890,16 @@ be delete at the manager's site after it is not needed by the workflow (@ref vin struct vine_file *vine_declare_starch( struct vine_manager *m, struct vine_file *f, vine_cache_level_t cache, vine_file_flags_t flags); +/** Set the Unix mode permission bits of a declared file. +Sets (or overrides) the Unix mode permissions of any file object, +as the application will see it. This applies to any file type, +but is particularly useful for controlling buffers, urls, and mini-tasks +that do not inherently contain mode bits. +@param f A file object of any kind. +@param mode The Unix mode bits to be applied to the file. +*/ +void vine_file_set_mode( struct vine_file *f, int mode ); + /** Fetch the contents of a file. The contents of the given file will be loaded from disk or pulled back from the cluster and loaded into manager memory. This is particularly useful for temporary files and mini-tasks diff --git a/taskvine/src/manager/vine_file.c b/taskvine/src/manager/vine_file.c index a646aab7d1..61fb0bd6b3 100644 --- a/taskvine/src/manager/vine_file.c +++ b/taskvine/src/manager/vine_file.c @@ -85,6 +85,7 @@ struct vine_file *vine_file_create(const char *source, const char *cached_name, f->source_worker = 0; f->type = type; f->size = size; + f->mode = 0; f->mini_task = mini_task; f->recovery_task = 0; f->state = VINE_FILE_STATE_CREATED; /* Assume state created until told otherwise */ @@ -371,4 +372,11 @@ const char *vine_file_source(struct vine_file *f) return f->source; } +void vine_file_set_mode(struct vine_file *f, int mode) +{ + /* The mode must contain, at a minimum, owner-rw (0600) (so that we can delete it) */ + /* And it should not contain anything beyond the standard 0777. */ + f->mode = (mode | 0600) & 0777; +} + /* vim: set noexpandtab tabstop=8: */ diff --git a/taskvine/src/manager/vine_file.h b/taskvine/src/manager/vine_file.h index a358a0b6e4..d1e75240d0 100644 --- a/taskvine/src/manager/vine_file.h +++ b/taskvine/src/manager/vine_file.h @@ -38,6 +38,7 @@ struct vine_file { char *cached_name; // Name of file in the worker's cache directory. size_t size; // Length of source data, if known. time_t mtime; // Modification time of source data, if known. + mode_t mode; // Manual override for Unix mode bits sent to worker. Zero if unset. char *data; // Raw data for an input or output buffer. struct vine_task *mini_task; // Mini task used to generate the desired output file. struct vine_task *recovery_task; // For temp files, a copy of the task that created it. diff --git a/taskvine/src/manager/vine_manager_put.c b/taskvine/src/manager/vine_manager_put.c index 2724fe945f..b415837f50 100644 --- a/taskvine/src/manager/vine_manager_put.c +++ b/taskvine/src/manager/vine_manager_put.c @@ -66,8 +66,7 @@ The transfer time is controlled by the size of the file. If the transfer takes too long, then cancel it. */ -static int vine_manager_put_file( - struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t, const char *localname, const char *remotename, struct stat info, int64_t *total_bytes) +static int vine_manager_put_file(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t, const char *localname, const char *remotename, struct stat info, int override_mode, int64_t *total_bytes) { time_t stoptime; timestamp_t effective_stoptime = 0; @@ -76,6 +75,10 @@ static int vine_manager_put_file( /* normalize the mode so as not to set up invalid permissions */ int mode = (info.st_mode | 0x600) & 0777; + /* If user provided override mode bits at the top level, use those instead. */ + if (override_mode) + mode = override_mode; + int64_t length = info.st_size; int fd = open(localname, O_RDONLY, 0); @@ -113,7 +116,7 @@ static int vine_manager_put_file( /* Need prototype here to address mutually recursive code. */ static vine_result_code_t vine_manager_put_file_or_dir( - struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t, const char *name, const char *remotename, int64_t *total_bytes, int follow_links); + struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t, const char *name, const char *remotename, int override_mode, int64_t *total_bytes, int follow_links); /* Send a directory and all of its contents using the new streaming protocol. @@ -121,8 +124,7 @@ Do this by sending a "dir" prefix, then all of the directory contents, and then an "end" marker. */ -static vine_result_code_t vine_manager_put_directory( - struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t, const char *localname, const char *remotename, int64_t *total_bytes) +static vine_result_code_t vine_manager_put_directory(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t, const char *localname, const char *remotename, int override_mode, int64_t *total_bytes) { struct stat info; if (stat(localname, &info) != 0) { @@ -130,6 +132,11 @@ static vine_result_code_t vine_manager_put_directory( return VINE_APP_FAILURE; } + /* If user provided override mode bits at the top level, use those instead. */ + int mode = info.st_mode; + if (override_mode) + mode = override_mode; + DIR *dir = opendir(localname); if (!dir) { debug(D_NOTICE, "Cannot open dir %s: %s", localname, strerror(errno)); @@ -141,7 +148,7 @@ static vine_result_code_t vine_manager_put_directory( char remotename_encoded[VINE_LINE_MAX]; url_encode(remotename, remotename_encoded, sizeof(remotename_encoded)); - vine_manager_send(q, w, "dir %s %0o %lld\n", remotename_encoded, info.st_mode, (long long)info.st_mtime); + vine_manager_send(q, w, "dir %s %0o %lld\n", remotename_encoded, mode, (long long)info.st_mtime); struct dirent *d; while ((d = readdir(dir))) { @@ -150,7 +157,7 @@ static vine_result_code_t vine_manager_put_directory( char *localpath = string_format("%s/%s", localname, d->d_name); - result = vine_manager_put_file_or_dir(q, w, t, localpath, d->d_name, total_bytes, 0); + result = vine_manager_put_file_or_dir(q, w, t, localpath, d->d_name, 0, total_bytes, 0); free(localpath); @@ -177,8 +184,7 @@ However, in recursive calls, follow_links is set to zero, and internal links are not followed, they are sent natively. */ -static vine_result_code_t vine_manager_put_file_or_dir( - struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t, const char *localpath, const char *remotepath, int64_t *total_bytes, int follow_links) +static vine_result_code_t vine_manager_put_file_or_dir(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t, const char *localpath, const char *remotepath, int override_mode, int64_t *total_bytes, int follow_links) { struct stat info; int result = VINE_SUCCESS; @@ -191,11 +197,11 @@ static vine_result_code_t vine_manager_put_file_or_dir( if (result >= 0) { if (S_ISDIR(info.st_mode)) { - result = vine_manager_put_directory(q, w, t, localpath, remotepath, total_bytes); + result = vine_manager_put_directory(q, w, t, localpath, remotepath, override_mode, total_bytes); } else if (S_ISLNK(info.st_mode)) { result = vine_manager_put_symlink(q, w, t, localpath, remotepath, total_bytes); } else if (S_ISREG(info.st_mode)) { - result = vine_manager_put_file(q, w, t, localpath, remotepath, info, total_bytes); + result = vine_manager_put_file(q, w, t, localpath, remotepath, info, override_mode, total_bytes); } else { debug(D_NOTICE, "skipping unusual file: %s", strerror(errno)); } @@ -222,9 +228,12 @@ vine_result_code_t vine_manager_put_url_now(struct vine_manager *q, struct vine_ return VINE_SUCCESS; } - /* XXX The API does not allow the user to choose the mode bits of the target file, so we make it permissive - * here.*/ - int mode = 0755; + /* A URL source does not naturally provide mode bits. */ + /* If the user provided them manually via vine_file_set_mode, use that. */ + /* Otherwise default to a permissive 0755. */ + int mode = f->mode; + if (mode == 0) + mode = 0755; char source_encoded[VINE_LINE_MAX]; char cached_name_encoded[VINE_LINE_MAX]; @@ -258,9 +267,12 @@ vine_result_code_t vine_manager_put_url(struct vine_manager *q, struct vine_work return VINE_SUCCESS; } - /* XXX The API does not allow the user to choose the mode bits of the target file, so we make it permissive - * here.*/ - int mode = 0755; + /* A URL source does not naturally provide mode bits. */ + /* If the user provided them manually via vine_file_set_mode, use that. */ + /* Otherwise default to a permissive 0755. */ + int mode = f->mode; + if (mode == 0) + mode = 0755; char source_encoded[VINE_LINE_MAX]; char cached_name_encoded[VINE_LINE_MAX]; @@ -283,8 +295,15 @@ vine_result_code_t vine_manager_put_url(struct vine_manager *q, struct vine_work vine_result_code_t vine_manager_put_buffer(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t, struct vine_file *f, int64_t *total_bytes) { + /* A buffer source does not naturally provide mode bits. */ + /* If the user provided them manually via vine_file_set_mode, use that. */ + /* Otherwise default to a permissive 0755. */ + int mode = f->mode; + if (mode == 0) + mode = 0755; + time_t stoptime = time(0) + vine_manager_transfer_time(q, w, f->size); - vine_manager_send(q, w, "file %s %lld 0755 0\n", f->cached_name, (long long)f->size); + vine_manager_send(q, w, "file %s %lld 0%o 0\n", f->cached_name, (long long)f->size, (int)mode); int64_t actual = link_putlstring(w->link, f->data, f->size, stoptime); if (actual >= 0 && (size_t)actual == f->size) { *total_bytes = actual; @@ -311,7 +330,7 @@ static vine_result_code_t vine_manager_put_input_file(struct vine_manager *q, st case VINE_FILE: debug(D_VINE, "%s (%s) needs file %s as %s", w->hostname, w->addrport, f->source, m->remote_name); vine_manager_send(q, w, "put %s %d %lld\n", f->cached_name, f->cache_level, (long long)f->size); - result = vine_manager_put_file_or_dir(q, w, t, f->source, f->cached_name, &total_bytes, 1); + result = vine_manager_put_file_or_dir(q, w, t, f->source, f->cached_name, f->mode, &total_bytes, 1); break; case VINE_BUFFER: @@ -488,8 +507,14 @@ vine_result_code_t vine_manager_put_task( return result; if (target) { - vine_manager_send(q, w, "mini_task %s %s %d %lld %o\n", target->source, target->cached_name, target->cache_level, (long long)target->size, 0777); + /* If the user provide mode bits manually, use them here. */ + int mode = target->mode; + if (mode == 0) + mode = 0755; + /* A mini-task is identified by the file it creates. */ + vine_manager_send(q, w, "mini_task %s %s %d %lld 0%o\n", target->source, target->cached_name, target->cache_level, (long long)target->size, mode); } else { + /* A regular task is simply identified by a task id. */ vine_manager_send(q, w, "task %lld\n", (long long)t->task_id); } diff --git a/taskvine/test/vine_python.py b/taskvine/test/vine_python.py index ddd40dd956..e1a41c6154 100755 --- a/taskvine/test/vine_python.py +++ b/taskvine/test/vine_python.py @@ -234,18 +234,19 @@ def next_output_name(): # Create an explicit minitask description to run curl minitask = vine.Task("curl https://www.nd.edu -o output") - intask = q.declare_minitask(minitask,"output") + infile = q.declare_minitask(minitask,"output") + infile.set_mode(0o600) # Now generate an input file from a shell command: t = vine.Task("wc -l infile") - t.add_input(intask, "infile") + t.add_input(infile, "infile") q.submit(t) t = q.wait(wait_time) report_task(t, "success", 0) # second time should have it cached (though we can't tell from here) t = vine.Task("wc -l infile") - t.add_input(intask, "infile") + t.add_input(infile, "infile") q.submit(t) t = q.wait(wait_time) report_task(t, "success", 0) From 0509ea8b46beadfb35120fb5361a1423f04e66d2 Mon Sep 17 00:00:00 2001 From: Kevin Xue Date: Thu, 30 Jan 2025 09:15:09 -0500 Subject: [PATCH 2/9] Implementation of taskvine allpairs/map/reduce (#4011) * Implementation of taskvine allpairs/map/reduce * lint * lint v2 * cleanup code * cleanup reduce * add test * remove debug print * cleanup map * format * allpairs in terms of map * format * do not create lib in map * error on lib name --------- Co-authored-by: Kevin Xue Co-authored-by: Benjamin Tovar --- .../python3/ndcctools/taskvine/futures.py | 157 +++++++++++++++--- taskvine/test/TR_vine_python_future_hof.sh | 84 ++++++++++ taskvine/test/vine_python_future_hof.py | 60 +++++++ 3 files changed, 279 insertions(+), 22 deletions(-) create mode 100755 taskvine/test/TR_vine_python_future_hof.sh create mode 100644 taskvine/test/vine_python_future_hof.py diff --git a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py index 70026d7a63..05e89dd6b9 100644 --- a/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py +++ b/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py @@ -1,30 +1,33 @@ - from . import cvine import hashlib -from collections import deque -from concurrent.futures import Executor -from concurrent.futures import Future -from concurrent.futures import FIRST_COMPLETED -from concurrent.futures import FIRST_EXCEPTION -from concurrent.futures import ALL_COMPLETED -from concurrent.futures._base import PENDING -from concurrent.futures._base import CANCELLED -from concurrent.futures._base import FINISHED +from collections import deque, namedtuple +from concurrent.futures import ( + Executor, + Future, + FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, +) +from concurrent.futures._base import PENDING, CANCELLED, FINISHED from concurrent.futures import TimeoutError -from collections import namedtuple + from .task import ( PythonTask, FunctionCall, FunctionCallNoResult, ) + from .manager import ( Factory, Manager, ) +import math import os import time import textwrap +from functools import partial +from collections.abc import Sequence RESULT_PENDING = 'result_pending' @@ -109,7 +112,7 @@ def as_completed(fs, timeout=None): f.module_manager.submit(f._task) start = time.perf_counter() - result_timeout = min(timeout, 5) if timeout is not None else 5 + result_timeout = max(1, min(timeout, 5)) if timeout else 5 def _iterator(): # iterate of queue of futures, yeilding completed futures and @@ -133,22 +136,39 @@ def _iterator(): assert result != RESULT_PENDING yield f - if ( - fs and timeout is not None - and time.perf_counter() - start > timeout - ): + if fs and timeout and time.perf_counter() - start > timeout: raise TimeoutError() return _iterator() +def run_iterable(fn, *args): + return list(map(fn, args)) + + +def reduction_tree(fn, *args, n=2): + # n is the arity of the reduction function fn + # if less than 2, we have an infinite loop + assert n > 1 + entries = [f.result() if isinstance(f, VineFuture) else f for f in args] + if len(entries) < 2: + return entries[0] + + len_multiple = int(math.ceil(len(entries) / n) * n) + new_args = map(fn, [entries[i:i + n] for i in range(0, len_multiple, n)]) + + return reduction_tree(fn, *new_args, n=n) + ## # \class FuturesExecutor # # TaskVine FuturesExecutor object # # This class acts as an interface for the creation of Futures + + class FuturesExecutor(Executor): + def __init__(self, port=9123, batch_type="local", manager=None, manager_host_port=None, manager_name=None, factory_binary=None, worker_binary=None, log_file=os.devnull, factory=True, opts={}): self.manager = Manager(port=port) self.port = self.manager.port @@ -173,6 +193,100 @@ def __init__(self, port=9123, batch_type="local", manager=None, manager_host_por else: self.factory = None + def map(self, fn, iterable, library_name=None, chunk_size=1): + assert chunk_size > 0 + assert isinstance(iterable, Sequence) + + def wait_for_map_resolution(*futures_batch): + result = [] + for f in futures_batch: + result.extend(f.result() if isinstance(f, VineFuture) else f) + return result + + tasks = [] + fn_wrapped = partial(run_iterable, fn) + while iterable: + heads, iterable = iterable[:chunk_size], iterable[chunk_size:] + + if library_name: + raise NotImplementedError("Using a library not currently supported.") + future_batch_task = self.submit(self.future_funcall(library_name, fn_wrapped, *heads)) + else: + future_batch_task = self.submit(self.future_task(fn_wrapped, *heads)) + + tasks.append(future_batch_task) + + return self.submit(self.future_task(wait_for_map_resolution, *tasks)) + + # Reduce performs a reduction tree on the iterable and currently returns a single value + # + # parameters: + # - Function + # - a function that receives fn_arity arguments + # - A sequence of parameters that function will take + # - a chunk_size to group elements in sequence to dispatch to a single task + # - arity of the function, elements of a chunk are reduce arity-wise. + # - an optional library_name for a library function call + def reduce(self, fn, iterable, library_name=None, chunk_size=2, fn_arity=2): + assert chunk_size > 1 + assert fn_arity > 1 + assert isinstance(iterable, Sequence) + chunk_size = max(fn_arity, chunk_size) + + new_iterable = [] + while iterable: + heads, iterable = iterable[:chunk_size], iterable[chunk_size:] + heads = [f.result() if isinstance(f, VineFuture) else f for f in heads] + if library_name: + raise NotImplementedError("Using a library not currently supported.") + future_batch_task = self.submit( + self.future_funcall( + library_name, reduction_tree, fn, *heads, n=fn_arity + ) + ) + else: + future_batch_task = self.submit(self.future_task(reduction_tree, fn, *heads, n=fn_arity)) + + new_iterable.append(future_batch_task) + + if len(new_iterable) > 1: + return self.reduce(fn, new_iterable, library_name, chunk_size, fn_arity) + else: + return new_iterable[0] + + def allpairs(self, fn, iterable_rows, iterable_cols, library_name=None, chunk_size=1): + assert chunk_size > 0 + assert isinstance(iterable_rows, Sequence) + assert isinstance(iterable_cols, Sequence) + + def wait_for_allpairs_resolution(row_size, col_size, mapped): + result = [] + for _ in range(row_size): + result.append([0] * col_size) + + mapped = mapped.result() if isinstance(mapped, VineFuture) else mapped + for p in mapped: + (i, j, r) = p.result() if isinstance(p, VineFuture) else p + result[i][j] = r + + return result + + def wrap_idx(args): + i, j, a, b = args + return (i, j, fn(a, b)) + + iterable = [(i, j, a, b) for (i, a) in enumerate(iterable_rows) for (j, b) in enumerate(iterable_cols)] + mapped = self.map(wrap_idx, iterable, library_name, chunk_size) + + return self.submit( + self.future_task( + wait_for_allpairs_resolution, + len(iterable_rows), + len(iterable_cols), + mapped, + ) + ) + def submit(self, fn, *args, **kwargs): if isinstance(fn, (FuturePythonTask, FutureFunctionCall)): self.manager.submit(fn) @@ -240,15 +354,15 @@ def cancelled(self): return False def running(self): - state = self._task.state - if state == "RUNNING": + state = self._task._module_manager.task_state(self._task.id) + if state == cvine.VINE_TASK_RUNNING: return True else: return False def done(self): - state = self._task.state - if state == "DONE" or state == "RETRIEVED": + state = self._task._module_manager.task_state(self._task.id) + if state == cvine.VINE_TASK_DONE: return True else: return False @@ -301,7 +415,6 @@ def __init__(self, manager, library_name, fn, *args, **kwargs): self.manager = manager self.library_name = library_name self._envs = [] - self._future = VineFuture(self) self._has_retrieved = False @@ -326,7 +439,6 @@ def output(self, timeout="wait_forever"): self._saved_output = output['Result'] else: self._saved_output = FunctionCallNoResult(output['Reason']) - except Exception as e: self._saved_output = e else: @@ -400,6 +512,7 @@ def output(self, timeout="wait_forever"): # task or the exception object of a failed task. self._output = cloudpickle.loads(self._output_file.contents()) except Exception as e: + print(self._output_file.contents()) # handle output file fetch/deserialization failures self._output = e self._output_loaded = True diff --git a/taskvine/test/TR_vine_python_future_hof.sh b/taskvine/test/TR_vine_python_future_hof.sh new file mode 100755 index 0000000000..56fe2b9066 --- /dev/null +++ b/taskvine/test/TR_vine_python_future_hof.sh @@ -0,0 +1,84 @@ +#!/bin/sh + +set -e + +. ../../dttools/test/test_runner_common.sh + +import_config_val CCTOOLS_PYTHON_TEST_EXEC +import_config_val CCTOOLS_PYTHON_TEST_DIR + +export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH + +STATUS_FILE=vine.status +PORT_FILE=vine.port + +check_needed() +{ + [ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1 + + # Poncho currently requires ast.unparse to serialize the function, + # which only became available in Python 3.9. Some older platforms + # (e.g. almalinux8) will not have this natively. + "${CCTOOLS_PYTHON_TEST_EXEC}" -c "from ast import unparse" || return 1 + + # In some limited build circumstances (e.g. macos build on github), + # poncho doesn't work due to lack of conda-pack or cloudpickle + "${CCTOOLS_PYTHON_TEST_EXEC}" -c "import conda_pack" || return 1 + "${CCTOOLS_PYTHON_TEST_EXEC}" -c "import cloudpickle" || return 1 + + return 0 +} + +prepare() +{ + rm -f $STATUS_FILE + rm -f $PORT_FILE + return 0 +} + +run() +{ + ( ${CCTOOLS_PYTHON_TEST_EXEC} vine_python_future_hof.py $PORT_FILE; echo $? > $STATUS_FILE ) & + + # wait at most 15 seconds for vine to find a port. + wait_for_file_creation $PORT_FILE 15 + + run_taskvine_worker $PORT_FILE worker.log --cores 2 --memory 2000 --disk 2000 + + # wait for vine to exit. + wait_for_file_creation $STATUS_FILE 15 + + # retrieve exit status + status=$(cat $STATUS_FILE) + if [ $status -ne 0 ] + then + # display log files in case of failure. + logfile=$(latest_vine_debug_log) + if [ -f ${logfile} ] + then + echo "master log:" + cat ${logfile} + fi + + if [ -f worker.log ] + then + echo "worker log:" + cat worker.log + fi + + exit 1 + fi + + exit 0 +} + +clean() +{ + rm -f $STATUS_FILE + rm -f $PORT_FILE + rm -rf vine-run-info + exit 0 +} + + +dispatch "$@" diff --git a/taskvine/test/vine_python_future_hof.py b/taskvine/test/vine_python_future_hof.py new file mode 100644 index 0000000000..e14b00f024 --- /dev/null +++ b/taskvine/test/vine_python_future_hof.py @@ -0,0 +1,60 @@ +#! /usr/bin/env python + +import sys +import ndcctools.taskvine as vine + +port_file = None +try: + port_file = sys.argv[1] +except IndexError: + sys.stderr.write("Usage: {} PORTFILE\n".format(sys.argv[0])) + raise + +def main(): + executor = vine.FuturesExecutor( + port=[9123, 9129], manager_name="vine_hof_test", factory=False + ) + + print("listening on port {}".format(executor.manager.port)) + with open(port_file, "w") as f: + f.write(str(executor.manager.port)) + + nums = list(range(101)) + + rows = 3 + mult_table = executor.allpairs(lambda x, y: x*y, range(rows), nums, chunk_size=11).result() + assert sum(mult_table[1]) == sum(nums) + assert sum(sum(r) for r in mult_table) == sum(sum(nums) * n for n in range(rows)) + + doubles = executor.map(lambda x: 2*x, nums, chunk_size=10).result() + assert sum(doubles) == sum(nums)*2 + + doubles = executor.map(lambda x: 2*x, nums, chunk_size=13).result() + assert sum(doubles) == sum(nums)*2 + + maximum = executor.reduce(max, nums, fn_arity=2).result() + assert maximum == 100 + + maximum = executor.reduce(max, nums, fn_arity=25).result() + assert maximum == 100 + + maximum = executor.reduce(max, nums, fn_arity=1000).result() + assert maximum == 100 + + maximum = executor.reduce(max, nums, fn_arity=2, chunk_size=50).result() + assert maximum == 100 + + minimum = executor.reduce(min, nums, fn_arity=2, chunk_size=50).result() + assert minimum == 0 + + total = executor.reduce(sum, nums, fn_arity=11, chunk_size=13).result() + assert total == sum(nums) + + + + +if __name__ == "__main__": + main() + + +# vim: set sts=4 sw=4 ts=4 expandtab ft=python: From 04724eecbf066d72ba7e8d6be577cb8c6b083c7f Mon Sep 17 00:00:00 2001 From: JinZhou5042 <142265839+JinZhou5042@users.noreply.github.com> Date: Thu, 30 Jan 2025 09:19:22 -0500 Subject: [PATCH 3/9] vine: reserve a factor of disk when allocating resources (#4035) * init * tune param * lint * lint * lint * condition cahnge: value > 1 || value <= 0 * reserve disk not only for proportional * lint * only modify disk factor when in range (0,1) --------- Co-authored-by: Benjamin Tovar --- taskvine/src/manager/vine_manager.c | 11 ++++++++++- taskvine/src/manager/vine_manager.h | 1 + taskvine/test/vine_allocations.py | 17 +++++++++++------ 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index dd9c482526..c65484fb6c 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -2771,6 +2771,11 @@ struct rmsummary *vine_manager_choose_resources_for_task(struct vine_manager *q, limits->disk = available_disk; } + /* For disk, scale the estimated disk allocation by a [0, 1] factor (by default 0.75) to intentionally + * reserve some space for data movement between the sandbox and cache, and allow extra room for potential cache growth. + * This applies to tasks except function calls. */ + limits->disk *= q->disk_proportion_available_to_task; + /* never go below specified min resources. */ rmsummary_merge_max(limits, min); @@ -4006,6 +4011,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->max_task_resources_requested = rmsummary_create(-1); q->sandbox_grow_factor = 2.0; + q->disk_proportion_available_to_task = 0.75; q->stats = calloc(1, sizeof(struct vine_stats)); q->stats_measure = calloc(1, sizeof(struct vine_stats)); @@ -5772,7 +5778,10 @@ int vine_tune(struct vine_manager *q, const char *name, double value) } else if (!strcmp(name, "max-library-retries")) { q->max_library_retries = MIN(1, value); - + } else if (!strcmp(name, "disk-proportion-available-to-task")) { + if (value < 1 && value > 0) { + q->disk_proportion_available_to_task = value; + } } else { debug(D_NOTICE | D_VINE, "Warning: tuning parameter \"%s\" not recognized\n", name); return -1; diff --git a/taskvine/src/manager/vine_manager.h b/taskvine/src/manager/vine_manager.h index 95304fedd7..8b57c8a7a1 100644 --- a/taskvine/src/manager/vine_manager.h +++ b/taskvine/src/manager/vine_manager.h @@ -229,6 +229,7 @@ struct vine_manager { int watch_library_logfiles; /* If true, watch the output files produced by each of the library processes running on the remote workers, take them back the current logging directory */ double sandbox_grow_factor; /* When task disk sandboxes are exhausted, increase the allocation using their measured valued times this factor */ + double disk_proportion_available_to_task; /* intentionally reduces disk allocation for tasks to reserve some space for cache growth. */ /*todo: confirm datatype. int or int64*/ int max_task_stdout_storage; /* Maximum size of standard output from task. (If larger, send to a separate file.) */ diff --git a/taskvine/test/vine_allocations.py b/taskvine/test/vine_allocations.py index b09e7a0cf3..2e990e995f 100755 --- a/taskvine/test/vine_allocations.py +++ b/taskvine/test/vine_allocations.py @@ -65,6 +65,10 @@ def check_task(category, category_mode, max, min, expected): with worker: q.tune("force-proportional-resources", 0) + # note that the disk is divided by a factor to reserve space for cache growth unless the users specify the disk + disk_proportion_available_to_task = 0.75 + q.tune("disk-proportion-available-to-task", disk_proportion_available_to_task) # the default factor is 0.75 + r = {"cores": 1, "memory": 2, "disk": 3, "gpus": 4} check_task("all_specified", "fixed", max=r, min={}, expected=r) @@ -72,20 +76,21 @@ def check_task(category, category_mode, max, min, expected): check_task("all_specified_no_cores", "fixed", max={"gpus": 4, "memory": 2, "disk": 3}, min={}, expected={"cores": 0, "memory": 2, "disk": 3, "gpus": 4}) - check_task("all_zero", "fixed", max={"cores": 0, "memory": 0, "disk": 0, "gpus": 0}, min={}, expected={"cores": worker_cores, "memory": worker_memory, "disk": worker_disk, "gpus": 0}) + check_task("all_zero", "fixed", max={"cores": 0, "memory": 0, "disk": 0, "gpus": 0}, min={}, expected={"cores": worker_cores, "memory": worker_memory, "disk": worker_disk * disk_proportion_available_to_task, "gpus": 0}) q.tune("force-proportional-resources", 1) - check_task("only_memory", "fixed", max={"memory": worker_memory / 2}, min={}, expected={"cores": worker_cores / 2, "memory": worker_memory / 2, "disk": worker_disk / 2, "gpus": 0}) - check_task("only_memory_w_minimum", "fixed", max={"memory": worker_memory / 2}, min={"cores": 3, "gpus": 2}, expected={"cores": 4, "memory": worker_memory, "disk": worker_disk, "gpus": 2}) + check_task("only_memory", "fixed", max={"memory": worker_memory / 2}, min={}, expected={"cores": worker_cores / 2, "memory": worker_memory / 2, "disk": worker_disk / 2 * disk_proportion_available_to_task, "gpus": 0}) + + check_task("only_memory_w_minimum", "fixed", max={"memory": worker_memory / 2}, min={"cores": 3, "gpus": 2}, expected={"cores": 4, "memory": worker_memory, "disk": worker_disk * disk_proportion_available_to_task, "gpus": 2}) - check_task("only_cores", "fixed", max={"cores": worker_cores}, min={}, expected={"cores": worker_cores, "memory": worker_memory, "disk": worker_disk, "gpus": 0}) + check_task("only_cores", "fixed", max={"cores": worker_cores}, min={}, expected={"cores": worker_cores, "memory": worker_memory, "disk": worker_disk * disk_proportion_available_to_task, "gpus": 0}) - check_task("auto_whole_worker", "min_waste", max={}, min={}, expected={"cores": worker_cores, "memory": worker_memory, "disk": worker_disk, "gpus": 0}) + check_task("auto_whole_worker", "min_waste", max={}, min={}, expected={"cores": worker_cores, "memory": worker_memory, "disk": worker_disk * disk_proportion_available_to_task, "gpus": 0}) p = 1 / worker_cores r = {"cores": 1} - e = {"cores": 1, "memory": math.floor(worker_memory * p), "disk": math.floor(worker_disk * p), "gpus": 0} + e = {"cores": 1, "memory": math.floor(worker_memory * p), "disk": math.floor(worker_disk * p) * disk_proportion_available_to_task, "gpus": 0} check_task("only_cores_proportional", "fixed", max=r, min={}, expected=e) p = 2 / worker_cores From 9e0e60a90e33d79dba1454e4629e1a996b7e5b22 Mon Sep 17 00:00:00 2001 From: JinZhou5042 <142265839+JinZhou5042@users.noreply.github.com> Date: Fri, 31 Jan 2025 06:55:09 -0500 Subject: [PATCH 4/9] init (#4049) --- taskvine/src/manager/vine_manager.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index c65484fb6c..77a55a9e88 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -993,7 +993,7 @@ static int consider_tempfile_replications(struct vine_manager *q) continue; } - debug(D_VINE, "Found %d workers holding %s, %d replicas needed", nsources, f->cached_name, to_find); + // debug(D_VINE, "Found %d workers holding %s, %d replicas needed", nsources, f->cached_name, to_find); int round_replication_request_sent = vine_file_replica_table_replicate(q, f, sources, to_find); total_replication_request_sent += round_replication_request_sent; From abb440b1ce4643f6c0dd28ee449e0b59e0a7e73e Mon Sep 17 00:00:00 2001 From: Douglas Thain Date: Mon, 3 Feb 2025 08:35:53 -0500 Subject: [PATCH 5/9] Debug: Remove Excess System Calls (#4052) * - Modify debug() to avoid making excess system calls to getpid() and localtime() for every debug event. - Rename debug_getpid -> debug_child_getpid to clarify purpose for sole use in parrot. * format --- dttools/src/debug.c | 36 ++++++++++++++++++++++++++++-------- dttools/src/debug.h | 4 ++-- parrot/src/pfs_main.cc | 2 +- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/dttools/src/debug.c b/dttools/src/debug.c index 7bff4aee7b..95cce6342d 100644 --- a/dttools/src/debug.c +++ b/dttools/src/debug.c @@ -34,9 +34,11 @@ extern int debug_file_reopen(void); extern int debug_file_close(void); static void (*debug_write)(int64_t flags, const char *str) = debug_stderr_write; -static pid_t (*debug_getpid)(void) = getpid; +static pid_t (*debug_child_getpid)(void) = 0; static char debug_program_name[PATH_MAX]; static int64_t debug_flags = D_NOTICE | D_ERROR | D_FATAL; +static pid_t debug_cached_pid = 0; +static int debug_time_zone_cached = 0; struct flag_info { const char *name; @@ -182,20 +184,37 @@ static void do_debug(int64_t flags, const char *fmt, va_list args) gettimeofday(&tv, 0); tm = localtime(&tv.tv_sec); + /* + If the TZ environment variable is not set, then every single call + to localtime() results in a stat("/etc/localtime") which impacts + the minimum latency of a debug event. + */ + + if (!debug_time_zone_cached) { + if (!getenv("TZ")) { + setenv("TZ", tm->tm_zone, 0); + } + debug_time_zone_cached = 1; + } + + /* Fetch the pid just once and use it multiple times. */ + pid_t pid = getpid(); + buffer_putfstring(&B, - "%04d/%02d/%02d %02d:%02d:%02d.%02ld ", + "%04d/%02d/%02d %02d:%02d:%02d.%02ld %s[%d]", tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec, - (long)tv.tv_usec / 10000); - buffer_putfstring(&B, "%s[%d] ", debug_program_name, getpid()); + (long)tv.tv_usec / 10000, + debug_program_name, + pid); } /* Parrot prints debug messages for children: */ - if (getpid() != debug_getpid()) { - buffer_putfstring(&B, " ", (int)debug_getpid()); + if (debug_child_getpid) { + buffer_putfstring(&B, " ", (int)debug_child_getpid()); } buffer_putfstring(&B, "%s: ", debug_flags_to_name(flags)); @@ -309,6 +328,7 @@ void debug_config_file(const char *path) void debug_config(const char *name) { strncpy(debug_program_name, path_basename(name), sizeof(debug_program_name) - 1); + debug_cached_pid = getpid(); } void debug_config_file_size(off_t size) @@ -316,9 +336,9 @@ void debug_config_file_size(off_t size) debug_file_size(size); } -void debug_config_getpid(pid_t (*getpidf)(void)) +void debug_config_child_getpid(pid_t (*getpidf)(void)) { - debug_getpid = getpidf; + debug_child_getpid = getpidf; } int64_t debug_flags_clear() diff --git a/dttools/src/debug.h b/dttools/src/debug.h index de590c17ab..211e4f94d4 100644 --- a/dttools/src/debug.h +++ b/dttools/src/debug.h @@ -124,7 +124,7 @@ modify the linker namespace we are using. #define debug_config_file cctools_debug_config_file #define debug_config_file_size cctools_debug_config_file_size #define debug_config_fatal cctools_debug_config_fatal -#define debug_config_getpid cctools_debug_config_getpid +#define debug_config_child_getpid cctools_debug_config_child_getpid #define debug_flags_set cctools_debug_flags_set #define debug_flags_print cctools_debug_flags_print #define debug_flags_clear cctools_debug_flags_clear @@ -206,7 +206,7 @@ void debug_config_file_size(off_t size); void debug_config_fatal(void (*callback) (void)); -void debug_config_getpid (pid_t (*getpidf)(void)); +void debug_config_child_getpid (pid_t (*getpidf)(void)); /** Set debugging flags to enable output. Accepts a debug flag in ASCII form, and enables that subsystem. For example: debug_flags_set("chirp"); diff --git a/parrot/src/pfs_main.cc b/parrot/src/pfs_main.cc index a952146f36..549ed28ea3 100644 --- a/parrot/src/pfs_main.cc +++ b/parrot/src/pfs_main.cc @@ -612,7 +612,7 @@ int main( int argc, char *argv[] ) debug_config(argv[0]); debug_config_file_size(0); /* do not rotate debug file by default */ debug_config_fatal(pfs_process_killall); - debug_config_getpid(pfs_process_getpid); + debug_config_child_getpid(pfs_process_getpid); /* Special file descriptors (currently the channel and the Parrot * directory) are allocated from the top of our file descriptor pool. After From 345f58213774fc1dc4deb36ad03829801c8e3631 Mon Sep 17 00:00:00 2001 From: Douglas Thain Date: Tue, 4 Feb 2025 13:24:24 -0500 Subject: [PATCH 6/9] Fix bug: An async message larger than the total window size should be sent regardless, otherwise it would wait forever. (#4053) --- taskvine/src/worker/vine_worker.c | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c index 61a1033b1b..52213e0b77 100644 --- a/taskvine/src/worker/vine_worker.c +++ b/taskvine/src/worker/vine_worker.c @@ -228,6 +228,12 @@ we break once a message would overflow the window. void deliver_async_messages(struct link *l) { + /* If no pending messages, return right away. */ + int messages = list_size(pending_async_messages); + if (messages < 1) + return; + + /* Determine how much space is available for sending */ int recv_window; int send_window; link_window_get(l, &send_window, &recv_window); @@ -235,14 +241,19 @@ void deliver_async_messages(struct link *l) int bytes_in_buffer = link_get_buffer_bytes(l); int bytes_available = send_window - bytes_in_buffer; - int messages = list_size(pending_async_messages); int visited; - char *message; + /* Consider each message in the pending queue: */ for (visited = 0; visited < messages; visited++) { - message = list_peek_head(pending_async_messages); + char *message = list_peek_head(pending_async_messages); int message_size = strlen(message); - if (message_size < bytes_available) { + /* + If the message fits in the available space, send it. + OR: If it is larger than the whole window, send it anyway + because we will have to block one way or the other. + Otherwise, stop here and return later. + */ + if (message_size < bytes_available || message_size > send_window) { message = list_pop_head(pending_async_messages); bytes_available -= message_size; debug(D_VINE, "tx: %s", message); From 8c9088e0182de25cc6503e73928a430a36c17ec1 Mon Sep 17 00:00:00 2001 From: JinZhou5042 <142265839+JinZhou5042@users.noreply.github.com> Date: Wed, 5 Feb 2025 12:35:08 -0500 Subject: [PATCH 7/9] vine: track running libraries on the worker (#4047) * init * delete and clean current_libraries --- taskvine/src/manager/vine_manager.c | 9 +++++++++ taskvine/src/manager/vine_worker_info.c | 2 ++ taskvine/src/manager/vine_worker_info.h | 1 + 3 files changed, 12 insertions(+) diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index 77a55a9e88..ea99eb81eb 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -930,6 +930,7 @@ static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w) } itable_clear(w->current_tasks, 0); + itable_clear(w->current_libraries, 0); w->finished_tasks = 0; @@ -2972,6 +2973,10 @@ static vine_result_code_t commit_task_to_worker(struct vine_manager *q, struct v /* If start_one_task_fails, this will be decremented in handle_failure below. */ t->library_task->function_slots_inuse++; } + /* If this is a library task, bookkeep it on the worker's side */ + if (t->provides_library) { + itable_insert(w->current_libraries, t->task_id, t); + } t->hostname = xxstrdup(w->hostname); t->addrport = xxstrdup(w->addrport); @@ -3156,6 +3161,10 @@ static void reap_task_from_worker(struct vine_manager *q, struct vine_worker_inf itable_remove(w->current_tasks, t->task_id); + if (t->provides_library) { + itable_remove(w->current_libraries, t->task_id); + } + /* If this was a function call assigned to a library, then decrease the count of functions assigned, diff --git a/taskvine/src/manager/vine_worker_info.c b/taskvine/src/manager/vine_worker_info.c index ce9158959f..fef94e8b97 100644 --- a/taskvine/src/manager/vine_worker_info.c +++ b/taskvine/src/manager/vine_worker_info.c @@ -34,6 +34,7 @@ struct vine_worker_info *vine_worker_create(struct link *lnk) w->current_files = hash_table_create(0, 0); w->current_tasks = itable_create(0); + w->current_libraries = itable_create(0); w->start_time = timestamp_get(); w->end_time = -1; @@ -69,6 +70,7 @@ void vine_worker_delete(struct vine_worker_info *w) hash_table_clear(w->current_files, (void *)vine_file_replica_delete); hash_table_delete(w->current_files); itable_delete(w->current_tasks); + itable_delete(w->current_libraries); free(w); diff --git a/taskvine/src/manager/vine_worker_info.h b/taskvine/src/manager/vine_worker_info.h index c52fa91103..e7e67122ad 100644 --- a/taskvine/src/manager/vine_worker_info.h +++ b/taskvine/src/manager/vine_worker_info.h @@ -62,6 +62,7 @@ struct vine_worker_info { /* Current files and tasks that have been transfered to this worker */ struct hash_table *current_files; struct itable *current_tasks; + struct itable *current_libraries; /* The number of tasks running last reported by the worker */ int dynamic_tasks_running; From 23059fd82e5474d7308cd3edc4d7c0252212a205 Mon Sep 17 00:00:00 2001 From: JinZhou5042 <142265839+JinZhou5042@users.noreply.github.com> Date: Wed, 5 Feb 2025 12:36:43 -0500 Subject: [PATCH 8/9] dttools: priority queue data structure fix (#4042) * init * enqueue on priority update * lint --- dttools/src/priority_queue.c | 71 +++++++++++++++++++------------ dttools/src/priority_queue.h | 24 +++++------ dttools/src/priority_queue_test.c | 6 +-- 3 files changed, 58 insertions(+), 43 deletions(-) diff --git a/dttools/src/priority_queue.c b/dttools/src/priority_queue.c index 8d416eeaba..d7ee734386 100644 --- a/dttools/src/priority_queue.c +++ b/dttools/src/priority_queue.c @@ -13,8 +13,6 @@ See the file COPYING for details. #include #define DEFAULT_CAPACITY 127 -#define MAX_PRIORITY DBL_MAX -#define MIN_PRIORITY DBL_MIN struct element { void *data; @@ -27,8 +25,8 @@ struct priority_queue { struct element **elements; /* The following three cursors are used to iterate over the elements in the numerical order they are stored in the array, which is - different from the order of priorities. Each of them has different concerns when traverse the queue Though the tipical priority-based - traversal is done by the repeated invocation of priority_queue_peak_top and priority_queue_pop APIs, rather than using any cursors. */ + different from the order of priorities. Each of them has different concerns when traverse the queue Though the typical priority-based + traversal is done by the repeated invocation of priority_queue_peek_top and priority_queue_pop APIs, rather than using any cursors. */ int base_cursor; // Used in PRIORITY_QUEUE_BASE_ITERATE. It iterates from the first position and never be reset automatically. int static_cursor; // Used in PRIORITY_QUEUE_STATIC_ITERATE. It iterates from the last position and never be reset automatically. int rotate_cursor; // Used in PRIORITY_QUEUE_ROTATE_ITERATE. It iterates from the last position and can be reset when certain events happen. @@ -101,7 +99,7 @@ static int priority_queue_double_capacity(struct priority_queue *pq) /****** External Methods ******/ -struct priority_queue *priority_queue_create(double init_capacity) +struct priority_queue *priority_queue_create(int init_capacity) { struct priority_queue *pq = (struct priority_queue *)malloc(sizeof(struct priority_queue)); if (!pq) { @@ -142,17 +140,17 @@ int priority_queue_size(struct priority_queue *pq) int priority_queue_push(struct priority_queue *pq, void *data, double priority) { if (!pq) { - return 0; + return -1; } if (pq->size >= pq->capacity) { if (!priority_queue_double_capacity(pq)) { - return 0; + return -1; } } struct element *e = (struct element *)malloc(sizeof(struct element)); if (!e) { - return 0; + return -1; } e->data = data; e->priority = priority; @@ -185,7 +183,7 @@ void *priority_queue_pop(struct priority_queue *pq) return data; } -void *priority_queue_peak_top(struct priority_queue *pq) +void *priority_queue_peek_top(struct priority_queue *pq) { if (!pq || pq->size == 0) { return NULL; @@ -203,7 +201,7 @@ double priority_queue_get_priority(struct priority_queue *pq, int idx) return pq->elements[idx]->priority; } -void *priority_queue_peak_at(struct priority_queue *pq, int idx) +void *priority_queue_peek_at(struct priority_queue *pq, int idx) { if (!pq || pq->size < 1 || idx < 0 || idx > pq->size - 1) { return NULL; @@ -215,7 +213,7 @@ void *priority_queue_peak_at(struct priority_queue *pq, int idx) int priority_queue_update_priority(struct priority_queue *pq, void *data, double new_priority) { if (!pq) { - return 0; + return -1; } int idx = -1; @@ -226,8 +224,9 @@ int priority_queue_update_priority(struct priority_queue *pq, void *data, double } } + /* If the data isn’t already in the queue, enqueue it. */ if (idx == -1) { - return 0; + return priority_queue_push(pq, data, new_priority); } double old_priority = pq->elements[idx]->priority; @@ -247,7 +246,7 @@ int priority_queue_update_priority(struct priority_queue *pq, void *data, double int priority_queue_find_idx(struct priority_queue *pq, void *data) { if (!pq) { - return 0; + return -1; } for (int i = 0; i < pq->size; i++) { @@ -256,13 +255,13 @@ int priority_queue_find_idx(struct priority_queue *pq, void *data) } } - return 0; + return -1; } int priority_queue_static_next(struct priority_queue *pq) { if (!pq || pq->size == 0) { - return 0; + return -1; } int static_idx = pq->static_cursor; @@ -291,7 +290,7 @@ Advance the base cursor and return it, should be used only in PRIORITY_QUEUE_BAS int priority_queue_base_next(struct priority_queue *pq) { if (!pq || pq->size == 0) { - return 0; + return -1; } int base_idx = pq->base_cursor; @@ -316,7 +315,7 @@ void priority_queue_rotate_reset(struct priority_queue *pq) int priority_queue_rotate_next(struct priority_queue *pq) { if (!pq || pq->size == 0) { - return 0; + return -1; } int rotate_idx = pq->rotate_cursor; @@ -335,26 +334,40 @@ int priority_queue_remove(struct priority_queue *pq, int idx) return 0; } - struct element *e = pq->elements[idx]; - pq->size--; - pq->elements[idx] = pq->elements[pq->size]; - pq->elements[pq->size] = NULL; + struct element *to_delete = pq->elements[idx]; + struct element *last_elem = pq->elements[pq->size - 1]; + + double old_priority = to_delete->priority; + double new_priority = last_elem->priority; - sink(pq, idx); + free(to_delete); - if (pq->static_cursor == idx) { + pq->size--; + if (idx != pq->size) { + pq->elements[idx] = last_elem; + pq->elements[pq->size] = NULL; + + if (new_priority > old_priority) { + swim(pq, idx); + } else if (new_priority < old_priority) { + sink(pq, idx); + } + } else { + pq->elements[pq->size] = NULL; + } + + if (pq->static_cursor == idx && pq->static_cursor > 0) { pq->static_cursor--; } - if (pq->base_cursor == idx) { + if (pq->base_cursor == idx && pq->base_cursor > 0) { pq->base_cursor--; } - if (pq->rotate_cursor == idx) { + if (pq->rotate_cursor == idx && pq->rotate_cursor > 0) { pq->rotate_cursor--; } - free(e); + // reset the rotate cursor if the removed element is before/equal to it if (idx <= pq->rotate_cursor) { - // reset the rotate cursor if the removed element is before/equal to it priority_queue_rotate_reset(pq); } @@ -368,7 +381,9 @@ void priority_queue_delete(struct priority_queue *pq) } for (int i = 0; i < pq->size; i++) { - free(pq->elements[i]); + if (pq->elements[i]) { + free(pq->elements[i]); + } } free(pq->elements); free(pq); diff --git a/dttools/src/priority_queue.h b/dttools/src/priority_queue.h index 6b433fd5bb..d1ca7bf106 100644 --- a/dttools/src/priority_queue.h +++ b/dttools/src/priority_queue.h @@ -64,13 +64,13 @@ void *data = someDataPointer; priority_queue_push(pq, data, priority); data = priority_queue_pop(pq); -void *headData = priority_queue_peak_top(pq); +void *headData = priority_queue_peek_top(pq); To list all of the items in a priority queue, use a simple loop:
 for (int i = 0; i < priority_queue_size(pq); i++) {
-    void *data = priority_queue_peak_at(pq, i);
+    void *data = priority_queue_peek_at(pq, i);
     printf("Priority queue contains: %p\n", data);
 }
 
@@ -94,7 +94,7 @@ Element with a higher priority is at the top of the heap. @param init_capacity The initial number of elements in the queue. If zero, a default value will be used. @return A pointer to a new priority queue. */ -struct priority_queue *priority_queue_create(double init_capacity); +struct priority_queue *priority_queue_create(int init_capacity); /** Count the elements in a priority queue. @param pq A pointer to a priority queue. @@ -122,7 +122,7 @@ Similar to @ref priority_queue_pop, but the element is not removed. @param pq A pointer to a priority queue. @return The pointer to the top of the queue if any, failure otherwise */ -void *priority_queue_peak_top(struct priority_queue *pq); +void *priority_queue_peek_top(struct priority_queue *pq); /** Get an element from a priority queue by a specified index. The first accessible element is at index 0. @@ -130,7 +130,7 @@ The first accessible element is at index 0. @param index The index of the element to get. @return The pointer to the element if any, failure otherwise */ -void *priority_queue_peak_at(struct priority_queue *pq, int index); +void *priority_queue_peek_at(struct priority_queue *pq, int index); /** Get the priority of an element at a specified index. @param pq A pointer to a priority queue. @@ -150,7 +150,7 @@ int priority_queue_update_priority(struct priority_queue *pq, void *data, double /** Find the index of an element in a priority queue. @param pq A pointer to a priority queue. @param data The pointer to the element to find. -@return The index of the element if found, 0 on failure. +@return The index of the element if found, -1 on failure. */ int priority_queue_find_idx(struct priority_queue *pq, void *data); @@ -158,7 +158,7 @@ int priority_queue_find_idx(struct priority_queue *pq, void *data); The static_cursor is used to globally iterate over the elements by sequential index. The position of the static_cursor is automatically remembered and never reset. @param pq A pointer to a priority queue. -@return The index of the next element if any, 0 on failure. +@return The index of the next element if any, -1 on failure. */ int priority_queue_static_next(struct priority_queue *pq); @@ -170,7 +170,7 @@ void priority_queue_base_reset(struct priority_queue *pq); /** Advance the base_cursor to the next element and return the index. @param pq A pointer to a priority queue. -@return The index of the next element if any, 0 on failure. +@return The index of the next element if any, -1 on failure. */ int priority_queue_base_next(struct priority_queue *pq); @@ -185,7 +185,7 @@ void priority_queue_rotate_reset(struct priority_queue *pq); /** Advance the rotate_cursor to the next element and return the index. @param pq A pointer to a priority queue. -@return The index of the next element if any, 0 on failure. +@return The index of the next element if any, -1 on failure. */ int priority_queue_rotate_next(struct priority_queue *pq); @@ -235,16 +235,16 @@ PRIORITY_QUEUE_ROTATE_ITERATE( pq, idx, data, iter_count, iter_depth ) { #define PRIORITY_QUEUE_BASE_ITERATE( pq, idx, data, iter_count, iter_depth ) \ iter_count = 0; \ priority_queue_base_reset(pq); \ - while ((iter_count < iter_depth) && ((idx = priority_queue_base_next(pq)) >= 0) && (data = priority_queue_peak_at(pq, idx)) && (iter_count += 1)) + while ((iter_count < iter_depth) && ((idx = priority_queue_base_next(pq)) >= 0) && (data = priority_queue_peek_at(pq, idx)) && (iter_count += 1)) /* Iterate from last position, never reset. */ #define PRIORITY_QUEUE_STATIC_ITERATE( pq, idx, data, iter_count, iter_depth ) \ iter_count = 0; \ - while ((iter_count < iter_depth) && ((idx = priority_queue_static_next(pq)) >= 0) && (data = priority_queue_peak_at(pq, idx)) && (iter_count += 1)) + while ((iter_count < iter_depth) && ((idx = priority_queue_static_next(pq)) >= 0) && (data = priority_queue_peek_at(pq, idx)) && (iter_count += 1)) /* Iterate from last position, reset to the begining when needed. */ #define PRIORITY_QUEUE_ROTATE_ITERATE( pq, idx, data, iter_count, iter_depth ) \ iter_count = 0; \ - while ((iter_count < iter_depth) && ((idx = priority_queue_rotate_next(pq)) >= 0) && (data = priority_queue_peak_at(pq, idx)) && (iter_count += 1)) + while ((iter_count < iter_depth) && ((idx = priority_queue_rotate_next(pq)) >= 0) && (data = priority_queue_peek_at(pq, idx)) && (iter_count += 1)) #endif diff --git a/dttools/src/priority_queue_test.c b/dttools/src/priority_queue_test.c index 01eca7a7cf..2632417f4d 100644 --- a/dttools/src/priority_queue_test.c +++ b/dttools/src/priority_queue_test.c @@ -47,7 +47,7 @@ int main() } // Get the head of the priority queue - char *head = (char *)priority_queue_peak_top(pq); + char *head = (char *)priority_queue_peek_top(pq); if (head) { printf("\nElement at the head of the queue: %s\n", head); } else { @@ -56,7 +56,7 @@ int main() // Access an element by index idx = 4; - char *element = (char *)priority_queue_peak_at(pq, idx); + char *element = (char *)priority_queue_peek_at(pq, idx); if (element) { printf("\nElement at index %d: %s\n", idx, element); } else { @@ -164,7 +164,7 @@ int main() // Pop elements from the priority queue using priority_queue_pop printf("\nPopping elements from the priority queue:\n"); - while ((item = (char *)priority_queue_peak_top(pq)) != NULL) { + while ((item = (char *)priority_queue_peek_top(pq)) != NULL) { printf("Popped element: %s Priority: %d\n", item, (int)priority_queue_get_priority(pq, 0)); priority_queue_pop(pq); } From 1875bd26104b6544039ff8c5c63df95a0482a1a9 Mon Sep 17 00:00:00 2001 From: JinZhou5042 <142265839+JinZhou5042@users.noreply.github.com> Date: Wed, 5 Feb 2025 13:58:28 -0500 Subject: [PATCH 9/9] init (#4050) --- taskvine/src/worker/vine_transfer_server.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/taskvine/src/worker/vine_transfer_server.h b/taskvine/src/worker/vine_transfer_server.h index 45a998f0b7..54120b23b7 100644 --- a/taskvine/src/worker/vine_transfer_server.h +++ b/taskvine/src/worker/vine_transfer_server.h @@ -10,7 +10,11 @@ See the file COPYING for details. #include "vine_cache.h" #include "link.h" -#define VINE_TRANSFER_PROC_MAX_CHILD 8 +/* This number defines the maximum allowable concurrent forking processes for file transfers. However, it is the manager's + * responsibility to allocate transfer tasks efficiently among workers, to ensure that no worker excessively forks processes + * to complete the job. In this case, this value serves more as a theoretical safety threshold and should never be reached under + * normal conditions. If a worker reaches this limit, it indicates a bug on the manager's side. */ +#define VINE_TRANSFER_PROC_MAX_CHILD 128 void vine_transfer_server_start( struct vine_cache *cache, int port_min, int port_max ); void vine_transfer_server_stop();