Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vine: Function Calls Should Fail on Missing Library #4022

Merged
4 changes: 3 additions & 1 deletion taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ typedef enum {
location input file requirements. */
VINE_RESULT_CANCELLED = 11 << 3, /**< The task was cancelled by the caller. */
VINE_RESULT_LIBRARY_EXIT = 12 << 3, /**< Task is a library that has terminated. **/
VINE_RESULT_SANDBOX_EXHAUSTION = 13 << 3 /**< The task used more disk than the allowed sandbox. **/
VINE_RESULT_SANDBOX_EXHAUSTION = 13 << 3, /**< The task used more disk than the allowed sandbox. **/
VINE_RESULT_MISSING_LIBRARY = 14 << 3 /**< The task is a function requiring a library that does not exist. */
} vine_result_t;

/** Select how to allocate resources for similar tasks with @ref vine_set_category_mode */
Expand Down Expand Up @@ -1435,6 +1436,7 @@ a times series, if this feature is enabled. See @ref vine_enable_monitoring.
- "large_task_check_interval" How frequently to check for tasks that do not fit any worker. (default=180000000)
- "option_blocklist_slow_workers_timeout" Timeout for slow workers to come back to the pool. (default=900)
- "watch-library-logfiles" If 1, watch the output files produced by each of the library processes running on the remote workers, take them back the current logging directory. (default=0)
- "max-library-retries" The number of times a library task can fail and be retried before it is permanently removed.
@param value The value to set the parameter to.
@return 0 on succes, -1 on failure.
*/
Expand Down
2 changes: 2 additions & 0 deletions taskvine/src/manager/vine_file_replica.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ struct vine_file_replica *vine_file_replica_create(vine_file_type_t type, vine_c

void vine_file_replica_delete(struct vine_file_replica *r)
{
if (!r)
return;
free(r);
vine_counters.replica.deleted++;
}
94 changes: 76 additions & 18 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ static void vine_manager_consider_recovery_task(struct vine_manager *q, struct v
static void delete_uncacheable_files(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t);
static int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w, const char *filename, vine_cache_level_t cache_level, vine_cache_level_t delete_upto_level);

struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name, vine_result_code_t *result);
struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name);

/* Return the number of workers matching a given type: WORKER, STATUS, etc */

Expand Down Expand Up @@ -1138,7 +1138,9 @@ static int delete_worker_file(struct vine_manager *q, struct vine_worker_info *w
vine_manager_send(q, w, "unlink %s\n", filename);
struct vine_file_replica *replica;
replica = vine_file_replica_table_remove(q, w, filename);
vine_file_replica_delete(replica);
if (replica) {
vine_file_replica_delete(replica);
}
return 1;
}

Expand Down Expand Up @@ -1387,8 +1389,10 @@ static int fetch_outputs_from_worker(struct vine_manager *q, struct vine_worker_

/*
Consider the set of tasks that are waiting but not running.
Cancel those that have exceeded their expressed end time,
exceeded the maximum number of retries, or other policy issues.
Cancel those that cannot run for unfixable policy reasons,
such as exceeded the absolute end time, no library task available, etc.
This is done in a separate iteration outside of scheduling
to avoid the cost of these checks in the critical path.
*/

static int expire_waiting_tasks(struct vine_manager *q)
Expand All @@ -1397,27 +1401,46 @@ static int expire_waiting_tasks(struct vine_manager *q)
int t_idx;
int expired = 0;

/* Measure the current time once for the whole iteration. */
double current_time = timestamp_get() / ONE_SECOND;

/* Only work through the queue up to iter_depth. */
int iter_count = 0;
int iter_depth = q->attempt_schedule_depth;
int iter_depth = MIN(priority_queue_size(q->ready_tasks), q->attempt_schedule_depth);

PRIORITY_QUEUE_STATIC_ITERATE(q->ready_tasks, t_idx, t, iter_count, iter_depth)
{
/* In this loop, use VINE_RESULT_SUCCESS as an indication of "still ok to run". */
vine_result_t result = VINE_RESULT_SUCCESS;

/* Consider each of the possible task expiration reasons. */

if (t->resources_requested->end > 0 && t->resources_requested->end <= current_time) {
vine_task_set_result(t, VINE_RESULT_MAX_END_TIME);
debug(D_VINE, "task %d has exceeded its end time", t->task_id);
result = VINE_RESULT_MAX_END_TIME;
} else if (t->needs_library && !hash_table_lookup(q->library_templates, t->needs_library)) {
debug(D_VINE, "task %d does not match any submitted library named \"%s\"", t->task_id, t->needs_library);
result = VINE_RESULT_MISSING_LIBRARY;
}

/* If any of the reasons fired, then expire the task and put in the retrieved queue. */
if (result != VINE_RESULT_SUCCESS) {
vine_task_set_result(t, result);
priority_queue_remove(q->ready_tasks, t_idx);
change_task_state(q, t, VINE_TASK_RETRIEVED);
expired++;
}
}

/* Return the number of tasks expired. */
return expired;
}

/*
Consider the set of tasks that are waiting with strict inputs
Terminate those to which no such worker exists.
*/

static int enforce_waiting_fixed_locations(struct vine_manager *q)
{
int t_idx;
Expand Down Expand Up @@ -2901,6 +2924,10 @@ static void kill_empty_libraries_on_worker(struct vine_manager *q, struct vine_w
Commit a given task to a worker by sending the task details,
then updating all auxiliary data structures to note the
assignment and the new task state.

If the commit should fail for any reason, then this function
is responsible for invoke handle_failure in order to put the
task/worker back into the proper state.
*/

static vine_result_code_t commit_task_to_worker(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
Expand All @@ -2918,12 +2945,21 @@ static vine_result_code_t commit_task_to_worker(struct vine_manager *q, struct v
if (!t->library_task) {
/* Otherwise send the library to the worker. */
/* Note that this call will re-enter commit_task_to_worker. */
t->library_task = send_library_to_worker(q, w, t->needs_library, &result);
t->library_task = send_library_to_worker(q, w, t->needs_library);

/*
Careful, this is an ugly special case.
The library dispatch could have failed for any number of reasons,
including a problem with the worker, or a problem or limitation
with the library task. The function call task doesn't necessarily
have a problem, but we haven't committed it to anything either.
So, we fail with code VINE_MGR_FAILURE, which tells the caller
to put it back in the queue without doing anything.
*/

/* Careful: if the above failed, then w may no longer be valid */
/* In that case return immediately without making further changes. */
if (!t->library_task)
return result;
if (!t->library_task) {
return VINE_MGR_FAILURE;
}
}
/* If start_one_task_fails, this will be decremented in handle_failure below. */
t->library_task->function_slots_inuse++;
Expand Down Expand Up @@ -3406,8 +3442,24 @@ static int send_one_task(struct vine_manager *q)
w = consider_task(q, t);
if (w) {
priority_queue_remove(q->ready_tasks, t_idx);
commit_task_to_worker(q, w, t);
return 1;
vine_result_code_t result = commit_task_to_worker(q, w, t);
switch (result) {
case VINE_SUCCESS:
/* return on successful commit. */
return 1;
break;
case VINE_APP_FAILURE:
case VINE_WORKER_FAILURE:
/* failed to dispatch, commit put the task back in the right place. */
break;
case VINE_MGR_FAILURE:
/* special case, commit had a chained failure. */
priority_queue_push(q->ready_tasks, t, t->priority);
break;
case VINE_END_OF_LIST:
/* shouldn't happen, keep going */
break;
}
}
}

Expand Down Expand Up @@ -4566,6 +4618,9 @@ const char *vine_result_string(vine_result_t result)
case VINE_RESULT_SANDBOX_EXHAUSTION:
str = "SANDBOX_EXHAUSTION";
break;
case VINE_RESULT_MISSING_LIBRARY:
str = "MISSING_LIBRARY";
break;
}

return str;
Expand Down Expand Up @@ -4639,10 +4694,9 @@ int vine_submit(struct vine_manager *q, struct vine_task *t)
* @param q The manager structure.
* @param w The worker info structure.
* @param name The name of the library to be sent.
* @param result A pointer to a result reflecting the reason for any failure.
* @return pointer to the library task if the operation succeeds, 0 otherwise.
*/
struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name, vine_result_code_t *result)
struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name)
{
/* Find the original prototype library task by name, if it exists. */
struct vine_task *original = hash_table_lookup(q->library_templates, name);
Expand Down Expand Up @@ -4706,10 +4760,10 @@ struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_wor

/* Send the task to the worker in the usual way. */
/* Careful: If this failed, then the worker object or task object may no longer be valid! */
*result = commit_task_to_worker(q, w, t);
vine_result_code_t result = commit_task_to_worker(q, w, t);

/* Careful again: If commit_task_to_worker failed the worker object or task object may no longer be valid! */
if (*result == VINE_SUCCESS) {
if (result == VINE_SUCCESS) {
vine_txn_log_write_library_update(q, w, t->task_id, VINE_LIBRARY_SENT);
return t;
} else {
Expand Down Expand Up @@ -5097,7 +5151,7 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
} while (q->max_retrievals < 0 || retrieved_this_cycle < q->max_retrievals || !priority_queue_size(q->ready_tasks));
END_ACCUM_TIME(q, time_receive);

// expired tasks
// check for tasks that cannot run at all
BEGIN_ACCUM_TIME(q, time_internal);
result = expire_waiting_tasks(q);
END_ACCUM_TIME(q, time_internal);
Expand Down Expand Up @@ -5649,6 +5703,9 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
} else if (!strcmp(name, "sandbox-grow-factor")) {
q->sandbox_grow_factor = MAX(1.1, value);

} else if (!strcmp(name, "max-library-retries")) {
q->max_library_retries = MIN(1, value);

} else {
debug(D_NOTICE | D_VINE, "Warning: tuning parameter \"%s\" not recognized\n", name);
return -1;
Expand Down Expand Up @@ -5961,6 +6018,7 @@ void vine_accumulate_task(struct vine_manager *q, struct vine_task *t)
case VINE_RESULT_FORSAKEN:
case VINE_RESULT_MAX_RETRIES:
case VINE_RESULT_LIBRARY_EXIT:
case VINE_RESULT_MISSING_LIBRARY:
break;
}
}
Expand Down
12 changes: 9 additions & 3 deletions taskvine/src/worker/vine_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,16 @@ int vine_process_library_get_result(struct vine_process *p, uint64_t *done_task_

/* null terminate the buffer before treating it as a string. */
buffer_data[ok] = 0;
sscanf(buffer_data, "%" SCNu64 " %d", done_task_id, done_exit_code);
debug(D_VINE, "Received result for function %" PRIu64 ", exit code %d", *done_task_id, *done_exit_code);

return ok;
/* is the received message properly formatted as two integers? */
ok = sscanf(buffer_data, "%" SCNu64 " %d", done_task_id, done_exit_code);
if (ok != 2) {
debug(D_VINE, "Invalid message received from library: %s", buffer_data);
return 0;
}

debug(D_VINE, "Received result for function %" PRIu64 ", exit code %d", *done_task_id, *done_exit_code);
return 1;
}

/*
Expand Down
12 changes: 9 additions & 3 deletions taskvine/src/worker/vine_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,9 @@ static void handle_failed_library_process(struct vine_process *p, struct link *m
ITABLE_ITERATE(procs_running, task_id, p_running)
{
if (p_running->library_process == p) {
debug(D_VINE, "killing function task %d running on library task %d", (int)task_id, p->task->task_id);
finish_running_task(p_running, VINE_RESULT_FORSAKEN);
reap_process(p_running, manager);
}
}
}
Expand Down Expand Up @@ -1426,20 +1428,25 @@ Return true if this process can run eventually, supposing that other processes w

static int process_can_run_eventually(struct vine_process *p, struct vine_cache *cache, struct link *manager)
{
if (!task_resources_fit_eventually(p->task))
if (!task_resources_fit_eventually(p->task)) {
debug(D_VINE, "task %d does not fit the total resources", p->task->task_id);
return 0;
}

if (p->task->needs_library) {
/* Note that we check for *some* library but do not bind to it. */
struct vine_process *p_future = find_future_library_for_function(p->task->needs_library);
if (!p || p_future->result == VINE_RESULT_LIBRARY_EXIT)
if (!p || p_future->result == VINE_RESULT_LIBRARY_EXIT) {
debug(D_VINE, "task %d does not match any library \"%s\"", p->task->task_id, p->task->needs_library);
return 0;
}
}

vine_cache_status_t status = vine_sandbox_ensure(p, cache, manager);
switch (status) {
case VINE_CACHE_STATUS_FAILED:
case VINE_CACHE_STATUS_UNKNOWN:
debug(D_VINE, "task %d requires failed input cache transfers", p->task->task_id);
return 0;
default:
break;
Expand Down Expand Up @@ -1738,7 +1745,6 @@ static void vine_worker_serve_manager(struct link *manager)
} else if (process_can_run_eventually(p, cache_manager, manager)) {
list_push_tail(procs_waiting, p);
} else {
debug(D_VINE, "task does not have necessary resources to run %d", p->task->task_id);
forsake_waiting_process(manager, p);
task_event++;
}
Expand Down
87 changes: 87 additions & 0 deletions taskvine/test/TR_vine_python_serverless_failure.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/bin/sh

set -e

. ../../dttools/test/test_runner_common.sh

import_config_val CCTOOLS_PYTHON_TEST_EXEC
import_config_val CCTOOLS_PYTHON_TEST_DIR

export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH

STATUS_FILE=vine.status
PORT_FILE=vine.port
PYTHON_SCRIPT=vine_python_serverless_failure.py

check_needed()
{
[ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1

# Poncho currently requires ast.unparse to serialize the function,
# which only became available in Python 3.9. Some older platforms
# (e.g. almalinux8) will not have this natively.
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "from ast import unparse" || return 1

# In some limited build circumstances (e.g. macos build on github),
# poncho doesn't work due to lack of conda-pack or cloudpickle
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "import conda_pack" || return 1
"${CCTOOLS_PYTHON_TEST_EXEC}" -c "import cloudpickle" || return 1

return 0
}

prepare()
{
rm -f $STATUS_FILE
rm -f $PORT_FILE
return 0
}

run()
{
( ${CCTOOLS_PYTHON_TEST_EXEC} ${PYTHON_SCRIPT} $PORT_FILE; echo $? > $STATUS_FILE ) &

# wait at most 15 seconds for vine to find a port.
wait_for_file_creation $PORT_FILE 15

# run the worker in the foreground
run_taskvine_worker $PORT_FILE worker.log

# wait for vine to exit.
wait_for_file_creation $STATUS_FILE 30

# retrieve exit status
status=$(cat $STATUS_FILE)
if [ $status -ne 0 ]
then
# display log files in case of failure.
logfile=$(latest_vine_debug_log)
if [ -f ${logfile} ]
then
echo "master log:"
cat ${logfile}
fi

if [ -f worker.log ]
then
echo "worker log:"
cat worker.log
fi

exit 1
fi

exit 0
}

clean()
{
rm -f $STATUS_FILE
rm -f $PORT_FILE
rm -rf vine-run-info
rm worker.log
exit 0
}


dispatch "$@"
Loading
Loading