Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskVine: Fix library deployment method #3560

Merged
merged 9 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 78 additions & 73 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,13 @@ static void aggregate_workers_resources(struct vine_manager *q, struct vine_reso
static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout, const char *tag, int task_id);
static void release_all_workers(struct vine_manager *q);

static void vine_manager_send_library_to_workers(struct vine_manager *q, const char *name, time_t stoptime);
static void vine_manager_send_libraries_to_workers(struct vine_manager *q, time_t stoptime);

static int vine_manager_check_inputs_available(struct vine_manager *q, struct vine_task *t);

static void delete_worker_file(
struct vine_manager *q, struct vine_worker_info *w, const char *filename, int flags, int except_flags);

static struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name);

/* Return the number of workers matching a given type: WORKER, STATUS, etc */

static int count_workers(struct vine_manager *q, vine_worker_type_t type)
Expand Down Expand Up @@ -2716,6 +2715,24 @@ static void find_max_worker(struct vine_manager *q)
}
}

/* Tell worker to kill all empty libraries except the case where
* the task is a function call and the library can run it.
* This code corresponds to the assumption of
* @vine.schedule.c:check_worker_have_enough_resources() - empty libraries
* are not counted towards the resources in use and will be killed if needed. */
static void kill_empty_libraries_on_worker(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
uint64_t task_id;
struct vine_task *task;
ITABLE_ITERATE(w->current_tasks, task_id, task)
{
if (task->provides_library && task->function_slots_inuse == 0 &&
(!t->needs_library || strcmp(t->needs_library, task->provides_library))) {
reset_task_to_state(q, task, VINE_TASK_RETRIEVED);
}
}
}

/*
Commit a given task to a worker by sending the task details,
then updating all auxiliary data structures to note the
Expand All @@ -2724,6 +2741,9 @@ assignment and the new task state.

static void commit_task_to_worker(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
/* Kill empty libraries to reclaim resources. Match the assumption of
* @vine.schedule.c:check_worker_have_enough_resources() */
kill_empty_libraries_on_worker(q, w, t);
t->hostname = xxstrdup(w->hostname);
t->addrport = xxstrdup(w->addrport);

Expand All @@ -2733,23 +2753,17 @@ static void commit_task_to_worker(struct vine_manager *q, struct vine_worker_inf

itable_insert(w->current_tasks, t->task_id, t);

/* Increment the function count if this is a function task.
* If the manager fails to send this function task to the worker however,
* then the count will be decremented properly in @handle_failure() below. */
if (t->needs_library) {
t->library_task->function_slots_inuse++;
}

t->worker = w;

change_task_state(q, t, VINE_TASK_RUNNING);

/*
If this is a function call assigned to a library,
then increase the count of functions assigned.
t->library_task was assigned in the scheduler.
*/

if (t->library_task) {
/* Add a reference to the library, mirror in reap_task_from_worker */
/* Needed in case the library fails or is removed before this task. */
vine_task_clone(t->library_task);
t->library_task->function_slots_inuse++;
}

t->try_count += 1;
q->stats->tasks_dispatched += 1;

Expand Down Expand Up @@ -2848,11 +2862,8 @@ static void reap_task_from_worker(
and disassociate the task from the library.
*/

if (t->library_task) {
t->library_task->function_slots_inuse--;
/* Remove a reference to the library, mirror in reap_task_from_worker */
vine_task_delete(t->library_task);
t->library_task = 0;
if (t->needs_library) {
t->library_task->function_slots_inuse = MAX(0, t->library_task->function_slots_inuse - 1);
}

t->worker = 0;
Expand Down Expand Up @@ -3076,6 +3087,43 @@ static int vine_manager_check_inputs_available(struct vine_manager *q, struct vi
return 1;
}

/* Find a library task running on a specific worker that has an available slot.
* @return pointer to the library task if there's one, 0 otherwise. */

static struct vine_task *find_library_on_worker_for_task(struct vine_worker_info *w, const char *library_name)
{
uint64_t task_id;
struct vine_task *task;

ITABLE_ITERATE(w->current_tasks, task_id, task)
{
if (task->provides_library && !strcmp(task->provides_library, library_name) &&
task->function_slots_inuse < task->function_slots) {
return task;
}
}

return 0;
}

/* Check if this worker can run the function task.
* @return 1 if it can, 0 otherwise.
*/
static int check_worker_can_run_function_task(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
struct vine_task *library = find_library_on_worker_for_task(w, t->needs_library);
if (!library) {
library = send_library_to_worker(q, w, t->needs_library);
}
if (!library) {
return 0;
}

// Mark that this function task will be run on this library.
t->library_task = library;
return 1;
}

/*
Advance the state of the system by selecting one task available
to run, finding the best worker for that task, and then committing
Expand Down Expand Up @@ -3124,6 +3172,12 @@ static int send_one_task(struct vine_manager *q)
continue;
}

// If this is a function task, check if the worker can run it.
// May require the manager to send a library to the worker first.
if (t->needs_library && !check_worker_can_run_function_task(q, w, t)) {
continue;
}

// Otherwise, remove it from the ready list and start it:
list_pop_tail(q->ready_list);
commit_task_to_worker(q, w, t);
Expand Down Expand Up @@ -4322,10 +4376,10 @@ int vine_submit(struct vine_manager *q, struct vine_task *t)
* @param q The manager structure.
* @param w The worker info structure.
* @param name The name of the library to be sent.
* @return 1 if the operation succeeds, 0 otherwise.
* @return pointer to the library task if the operation succeeds, 0 otherwise.
*/

static int vine_manager_send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name)
static struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name)
{
/* Find the original prototype library task by name, if it exists. */
struct vine_task *original = hash_table_lookup(q->libraries, name);
Expand Down Expand Up @@ -4353,7 +4407,7 @@ static int vine_manager_send_library_to_worker(struct vine_manager *q, struct vi
/* Make the special log recordings for the library. */
vine_txn_log_write_library_update(q, w, t->task_id, VINE_LIBRARY_SENT);

return 1;
return t;
}

struct vine_task *vine_manager_find_library_on_worker(
Expand All @@ -4372,50 +4426,6 @@ struct vine_task *vine_manager_find_library_on_worker(
return 0;
}

/* Send the library task to all known workers.
* @param q The manager structure.
* @param name The name of the library task.
* @param stoptime When to stop sending libraries to workers. */
static void vine_manager_send_library_to_workers(struct vine_manager *q, const char *name, time_t stoptime)
{
char *worker_key;
struct vine_worker_info *w;

HASH_TABLE_ITERATE(q->worker_table, worker_key, w)
{
if (stoptime < time(0)) {
return;
}

/* If the worker id is not 0, then it is ready to receive work from the manager.
* See @report_worker_ready in ../worker/vine_worker.c */
if (!w->workerid) {
continue;
}

/* Send the library task to the worker if possible. */
if (!vine_manager_find_library_on_worker(q, w, name)) {
if (vine_manager_send_library_to_worker(q, w, name)) {
debug(D_VINE, "Sending library %s to worker %s\n", name, w->workerid);
} else {
/* No error here, library might not match the worker. */
}
}
}
}

static void vine_manager_send_libraries_to_workers(struct vine_manager *q, time_t stoptime)
{
char *library;
struct vine_task *t;
HASH_TABLE_ITERATE(q->libraries, library, t)
{
if (stoptime < time(0))
return;
vine_manager_send_library_to_workers(q, library, stoptime);
}
}

void vine_manager_install_library(struct vine_manager *q, struct vine_task *t, const char *name)
{
t->type = VINE_TASK_TYPE_LIBRARY;
Expand Down Expand Up @@ -4828,11 +4838,6 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
}
}

// attempt to send libraries to connected workers
BEGIN_ACCUM_TIME(q, time_send);
vine_manager_send_libraries_to_workers(q, stoptime);
END_ACCUM_TIME(q, time_send);

// return if manager is empty and something interesting already happened
// in this wait.
if (events > 0) {
Expand Down
12 changes: 12 additions & 0 deletions taskvine/src/manager/vine_resources.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ struct vine_resources *vine_resources_create()

void vine_resources_delete(struct vine_resources *r) { free(r); }

struct vine_resources *vine_resources_copy(struct vine_resources *r)
{
struct vine_resources *t = vine_resources_create();
t->tag = r->tag;
t->workers = r->workers;
t->disk = r->disk;
t->cores = r->cores;
t->memory = r->memory;
t->gpus = r->gpus;
return t;
}

void vine_resources_measure_locally(struct vine_resources *r, const char *disk_path)
{
static int gpu_check = 0;
Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct vine_resources {

struct vine_resources * vine_resources_create();
void vine_resources_delete( struct vine_resources *r );
struct vine_resources* vine_resources_copy( struct vine_resources* r);
void vine_resources_debug( struct vine_resources *r );
void vine_resources_measure_locally( struct vine_resources *r, const char *workspace );
void vine_resources_send( struct link *manager, struct vine_resources *r, time_t stoptime );
Expand Down
Loading