Skip to content

Commit

Permalink
TaskVine: Fix library deployment method (#3560)
Browse files Browse the repository at this point in the history
* better library dispatch

* lint

* delete table

* send lib when needed

* remove lib as needed

* lint

* fix valgrind

* fix
  • Loading branch information
tphung3 authored Dec 5, 2023
1 parent 9847ea6 commit f0248ca
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 134 deletions.
151 changes: 78 additions & 73 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,13 @@ static void aggregate_workers_resources(struct vine_manager *q, struct vine_reso
static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout, const char *tag, int task_id);
static void release_all_workers(struct vine_manager *q);

static void vine_manager_send_library_to_workers(struct vine_manager *q, const char *name, time_t stoptime);
static void vine_manager_send_libraries_to_workers(struct vine_manager *q, time_t stoptime);

static int vine_manager_check_inputs_available(struct vine_manager *q, struct vine_task *t);

static void delete_worker_file(
struct vine_manager *q, struct vine_worker_info *w, const char *filename, int flags, int except_flags);

static struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name);

/* Return the number of workers matching a given type: WORKER, STATUS, etc */

static int count_workers(struct vine_manager *q, vine_worker_type_t type)
Expand Down Expand Up @@ -2716,6 +2715,24 @@ static void find_max_worker(struct vine_manager *q)
}
}

/* Tell worker to kill all empty libraries except the case where
* the task is a function call and the library can run it.
* This code corresponds to the assumption of
* @vine.schedule.c:check_worker_have_enough_resources() - empty libraries
* are not counted towards the resources in use and will be killed if needed. */
static void kill_empty_libraries_on_worker(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
uint64_t task_id;
struct vine_task *task;
ITABLE_ITERATE(w->current_tasks, task_id, task)
{
if (task->provides_library && task->function_slots_inuse == 0 &&
(!t->needs_library || strcmp(t->needs_library, task->provides_library))) {
reset_task_to_state(q, task, VINE_TASK_RETRIEVED);
}
}
}

/*
Commit a given task to a worker by sending the task details,
then updating all auxiliary data structures to note the
Expand All @@ -2724,6 +2741,9 @@ assignment and the new task state.

static void commit_task_to_worker(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
/* Kill empty libraries to reclaim resources. Match the assumption of
* @vine.schedule.c:check_worker_have_enough_resources() */
kill_empty_libraries_on_worker(q, w, t);
t->hostname = xxstrdup(w->hostname);
t->addrport = xxstrdup(w->addrport);

Expand All @@ -2733,23 +2753,17 @@ static void commit_task_to_worker(struct vine_manager *q, struct vine_worker_inf

itable_insert(w->current_tasks, t->task_id, t);

/* Increment the function count if this is a function task.
* If the manager fails to send this function task to the worker however,
* then the count will be decremented properly in @handle_failure() below. */
if (t->needs_library) {
t->library_task->function_slots_inuse++;
}

t->worker = w;

change_task_state(q, t, VINE_TASK_RUNNING);

/*
If this is a function call assigned to a library,
then increase the count of functions assigned.
t->library_task was assigned in the scheduler.
*/

if (t->library_task) {
/* Add a reference to the library, mirror in reap_task_from_worker */
/* Needed in case the library fails or is removed before this task. */
vine_task_clone(t->library_task);
t->library_task->function_slots_inuse++;
}

t->try_count += 1;
q->stats->tasks_dispatched += 1;

Expand Down Expand Up @@ -2848,11 +2862,8 @@ static void reap_task_from_worker(
and disassociate the task from the library.
*/

if (t->library_task) {
t->library_task->function_slots_inuse--;
/* Remove a reference to the library, mirror in reap_task_from_worker */
vine_task_delete(t->library_task);
t->library_task = 0;
if (t->needs_library) {
t->library_task->function_slots_inuse = MAX(0, t->library_task->function_slots_inuse - 1);
}

t->worker = 0;
Expand Down Expand Up @@ -3076,6 +3087,43 @@ static int vine_manager_check_inputs_available(struct vine_manager *q, struct vi
return 1;
}

/* Find a library task running on a specific worker that has an available slot.
* @return pointer to the library task if there's one, 0 otherwise. */

static struct vine_task *find_library_on_worker_for_task(struct vine_worker_info *w, const char *library_name)
{
uint64_t task_id;
struct vine_task *task;

ITABLE_ITERATE(w->current_tasks, task_id, task)
{
if (task->provides_library && !strcmp(task->provides_library, library_name) &&
task->function_slots_inuse < task->function_slots) {
return task;
}
}

return 0;
}

/* Check if this worker can run the function task.
* @return 1 if it can, 0 otherwise.
*/
static int check_worker_can_run_function_task(struct vine_manager *q, struct vine_worker_info *w, struct vine_task *t)
{
struct vine_task *library = find_library_on_worker_for_task(w, t->needs_library);
if (!library) {
library = send_library_to_worker(q, w, t->needs_library);
}
if (!library) {
return 0;
}

// Mark that this function task will be run on this library.
t->library_task = library;
return 1;
}

/*
Advance the state of the system by selecting one task available
to run, finding the best worker for that task, and then committing
Expand Down Expand Up @@ -3124,6 +3172,12 @@ static int send_one_task(struct vine_manager *q)
continue;
}

// If this is a function task, check if the worker can run it.
// May require the manager to send a library to the worker first.
if (t->needs_library && !check_worker_can_run_function_task(q, w, t)) {
continue;
}

// Otherwise, remove it from the ready list and start it:
list_pop_tail(q->ready_list);
commit_task_to_worker(q, w, t);
Expand Down Expand Up @@ -4322,10 +4376,10 @@ int vine_submit(struct vine_manager *q, struct vine_task *t)
* @param q The manager structure.
* @param w The worker info structure.
* @param name The name of the library to be sent.
* @return 1 if the operation succeeds, 0 otherwise.
* @return pointer to the library task if the operation succeeds, 0 otherwise.
*/

static int vine_manager_send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name)
static struct vine_task *send_library_to_worker(struct vine_manager *q, struct vine_worker_info *w, const char *name)
{
/* Find the original prototype library task by name, if it exists. */
struct vine_task *original = hash_table_lookup(q->libraries, name);
Expand Down Expand Up @@ -4353,7 +4407,7 @@ static int vine_manager_send_library_to_worker(struct vine_manager *q, struct vi
/* Make the special log recordings for the library. */
vine_txn_log_write_library_update(q, w, t->task_id, VINE_LIBRARY_SENT);

return 1;
return t;
}

struct vine_task *vine_manager_find_library_on_worker(
Expand All @@ -4372,50 +4426,6 @@ struct vine_task *vine_manager_find_library_on_worker(
return 0;
}

/* Send the library task to all known workers.
* @param q The manager structure.
* @param name The name of the library task.
* @param stoptime When to stop sending libraries to workers. */
static void vine_manager_send_library_to_workers(struct vine_manager *q, const char *name, time_t stoptime)
{
char *worker_key;
struct vine_worker_info *w;

HASH_TABLE_ITERATE(q->worker_table, worker_key, w)
{
if (stoptime < time(0)) {
return;
}

/* If the worker id is not 0, then it is ready to receive work from the manager.
* See @report_worker_ready in ../worker/vine_worker.c */
if (!w->workerid) {
continue;
}

/* Send the library task to the worker if possible. */
if (!vine_manager_find_library_on_worker(q, w, name)) {
if (vine_manager_send_library_to_worker(q, w, name)) {
debug(D_VINE, "Sending library %s to worker %s\n", name, w->workerid);
} else {
/* No error here, library might not match the worker. */
}
}
}
}

static void vine_manager_send_libraries_to_workers(struct vine_manager *q, time_t stoptime)
{
char *library;
struct vine_task *t;
HASH_TABLE_ITERATE(q->libraries, library, t)
{
if (stoptime < time(0))
return;
vine_manager_send_library_to_workers(q, library, stoptime);
}
}

void vine_manager_install_library(struct vine_manager *q, struct vine_task *t, const char *name)
{
t->type = VINE_TASK_TYPE_LIBRARY;
Expand Down Expand Up @@ -4828,11 +4838,6 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
}
}

// attempt to send libraries to connected workers
BEGIN_ACCUM_TIME(q, time_send);
vine_manager_send_libraries_to_workers(q, stoptime);
END_ACCUM_TIME(q, time_send);

// return if manager is empty and something interesting already happened
// in this wait.
if (events > 0) {
Expand Down
12 changes: 12 additions & 0 deletions taskvine/src/manager/vine_resources.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ struct vine_resources *vine_resources_create()

void vine_resources_delete(struct vine_resources *r) { free(r); }

struct vine_resources *vine_resources_copy(struct vine_resources *r)
{
struct vine_resources *t = vine_resources_create();
t->tag = r->tag;
t->workers = r->workers;
t->disk = r->disk;
t->cores = r->cores;
t->memory = r->memory;
t->gpus = r->gpus;
return t;
}

void vine_resources_measure_locally(struct vine_resources *r, const char *disk_path)
{
static int gpu_check = 0;
Expand Down
1 change: 1 addition & 0 deletions taskvine/src/manager/vine_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct vine_resources {

struct vine_resources * vine_resources_create();
void vine_resources_delete( struct vine_resources *r );
struct vine_resources* vine_resources_copy( struct vine_resources* r);
void vine_resources_debug( struct vine_resources *r );
void vine_resources_measure_locally( struct vine_resources *r, const char *workspace );
void vine_resources_send( struct link *manager, struct vine_resources *r, time_t stoptime );
Expand Down
Loading

0 comments on commit f0248ca

Please sign in to comment.