Skip to content

Commit

Permalink
REVIEW: add copy task type
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergei-Lebedev committed Feb 26, 2025
1 parent 231ba2b commit 59f06c8
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 25 deletions.
7 changes: 4 additions & 3 deletions src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,18 @@ typedef struct ucc_tl_ucp_worker {

typedef struct ucc_tl_ucp_task ucc_tl_ucp_task_t;
typedef struct ucc_tl_ucp_context ucc_tl_ucp_context_t;
typedef union ucc_tl_ucp_copy_task ucc_tl_ucp_copy_task_t;

typedef ucc_status_t (*ucc_tl_ucp_copy_post_fn_t)(void *dst,
ucc_memory_type_t dst_mtype,
void *src,
ucc_memory_type_t src_mtype,
size_t size,
ucc_tl_ucp_task_t *coll_task,
void **copy_task);
ucc_tl_ucp_copy_task_t **copy_task);
typedef ucc_status_t (*ucc_tl_ucp_copy_test_fn_t)(ucc_tl_ucp_context_t *ctx,
void *copy_task);
typedef ucc_status_t (*ucc_tl_ucp_copy_finalize_fn_t)(void *copy_task);
ucc_tl_ucp_copy_task_t *copy_task);
typedef ucc_status_t (*ucc_tl_ucp_copy_finalize_fn_t)(ucc_tl_ucp_copy_task_t *copy_task);

typedef struct ucc_tl_ucp_context {
ucc_tl_context_t super;
Expand Down
2 changes: 1 addition & 1 deletion src/components/tl/ucp/tl_ucp_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ typedef struct ucc_tl_ucp_task {
int phase;
ucc_knomial_pattern_t p;
void *sbuf;
void *copy_task;
ucc_tl_ucp_copy_task_t *copy_task;
ucc_rank_t recv_dist;
} allgather_kn;
struct {
Expand Down
25 changes: 13 additions & 12 deletions src/components/tl/ucp/tl_ucp_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,25 @@
#include "tl_ucp_tag.h"
#include "tl_ucp_ep.h"
#include "tl_ucp_sendrecv.h"
#include "tl_ucp_copy.h"

ucc_status_t ucc_tl_ucp_mc_copy_post(void *dst, ucc_memory_type_t dst_mtype,
void *src, ucc_memory_type_t src_mtype,
size_t size,
ucc_tl_ucp_task_t *coll_task, //NOLINT
void **copy_task) //NOLINT
ucc_tl_ucp_copy_task_t **copy_task) //NOLINT
{
return ucc_mc_memcpy(dst, src, size, dst_mtype, src_mtype);
}

ucc_status_t ucc_tl_ucp_mc_copy_test(ucc_tl_ucp_context_t *ctx, //NOLINT
void *copy_task) //NOLINT
ucc_tl_ucp_copy_task_t *copy_task) //NOLINT
{
/* mc copy is blocking, test always returns UCC_OK */
return UCC_OK;
}

ucc_status_t ucc_tl_ucp_mc_copy_finalize(void *copy_task) //NOLINT
ucc_status_t ucc_tl_ucp_mc_copy_finalize(ucc_tl_ucp_copy_task_t *copy_task) //NOLINT
{
return UCC_OK;
}
Expand All @@ -35,10 +36,10 @@ ucc_status_t ucc_tl_ucp_ec_copy_post(void *dst, ucc_memory_type_t dst_mtype, //N
void *src, ucc_memory_type_t src_mtype, //NOLINT
size_t size,
ucc_tl_ucp_task_t *coll_task,
void **copy_task)
ucc_tl_ucp_copy_task_t **copy_task)
{
ucc_ee_executor_task_args_t eargs = {0};
ucc_ee_executor_task_t **eee_task = (ucc_ee_executor_task_t **)copy_task;
ucc_ee_executor_task_t **eee_task = (ucc_ee_executor_task_t **)copy_task;
ucc_status_t status;
ucc_ee_executor_t *exec;

Expand All @@ -56,14 +57,14 @@ ucc_status_t ucc_tl_ucp_ec_copy_post(void *dst, ucc_memory_type_t dst_mtype, //N
}

ucc_status_t ucc_tl_ucp_ec_copy_test(ucc_tl_ucp_context_t *ctx, //NOLINT
void *copy_task)
ucc_tl_ucp_copy_task_t *copy_task)
{
ucc_ee_executor_task_t *eee_task = (ucc_ee_executor_task_t *)copy_task;

return ucc_ee_executor_task_test(eee_task);
}

ucc_status_t ucc_tl_ucp_ec_copy_finalize(void *copy_task)
ucc_status_t ucc_tl_ucp_ec_copy_finalize(ucc_tl_ucp_copy_task_t *copy_task)
{
ucc_ee_executor_task_t *eee_task = (ucc_ee_executor_task_t *)copy_task;

Expand Down Expand Up @@ -97,7 +98,7 @@ void ucc_tl_ucp_copy_recv_completion_cb(void *request, ucs_status_t status, //NO
ucc_status_t ucc_tl_ucp_ucp_copy_post(void *dst, ucc_memory_type_t dst_mtype,
void *src, ucc_memory_type_t src_mtype,
size_t size, ucc_tl_ucp_task_t *task,
void **copy_task)
ucc_tl_ucp_copy_task_t **copy_task)
{
ucc_coll_args_t *args = &TASK_ARGS(task);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
Expand All @@ -109,7 +110,7 @@ ucc_status_t ucc_tl_ucp_ucp_copy_post(void *dst, ucc_memory_type_t dst_mtype,
ucp_tag_t ucp_tag, ucp_tag_mask;
ucs_status_ptr_t ucp_status;

UCC_TL_UCP_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask,
UCC_TL_UCP_MAKE_RECV_TAG(ucp_tag, ucp_tag_mask,
(args->mask & UCC_COLL_ARGS_FIELD_TAG),
task->tagged.tag, trank,
team->super.super.params.id,
Expand All @@ -125,7 +126,7 @@ ucc_status_t ucc_tl_ucp_ucp_copy_post(void *dst, ucc_memory_type_t dst_mtype,
ucp_status = ucp_tag_recv_nbx(team->worker->ucp_worker, dst, 1, ucp_tag,
ucp_tag_mask, &req_param);
UCC_TL_UCP_CHECK_REQ_STATUS();
*copy_task = ucp_status;
(*copy_task)= ucp_status;

status = ucc_tl_ucp_get_ep(team, trank, &ep);
if (ucc_unlikely(UCC_OK != status)) {
Expand All @@ -148,15 +149,15 @@ ucc_status_t ucc_tl_ucp_ucp_copy_post(void *dst, ucc_memory_type_t dst_mtype,
}

ucc_status_t ucc_tl_ucp_ucp_copy_test(ucc_tl_ucp_context_t *ctx,
void *copy_task)
ucc_tl_ucp_copy_task_t *copy_task)
{
ucs_status_ptr_t req_status = (ucs_status_ptr_t)copy_task;

ucp_worker_progress(ctx->worker.ucp_worker);
return ucs_status_to_ucc_status(ucp_request_check_status(req_status));
}

ucc_status_t ucc_tl_ucp_ucp_copy_finalize(void *copy_task)
ucc_status_t ucc_tl_ucp_ucp_copy_finalize(ucc_tl_ucp_copy_task_t *copy_task)
{
ucs_status_ptr_t req_status = (ucs_status_ptr_t)copy_task;
ucp_request_free(req_status);
Expand Down
23 changes: 14 additions & 9 deletions src/components/tl/ucp/tl_ucp_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,43 @@
} \
} while(0)

typedef union ucc_tl_ucp_copy_task {
ucc_ee_executor_task_t ee_task;
ucs_status_ptr_t ucp_task;
} ucc_tl_ucp_copy_task_t;

/* copies based on MC */
ucc_status_t ucc_tl_ucp_mc_copy_post(void *dst, ucc_memory_type_t dst_mtype,
void *src, ucc_memory_type_t src_mtype,
size_t size,
ucc_tl_ucp_task_t *coll_task,
void **copy_task);
ucc_tl_ucp_copy_task_t **copy_task);

ucc_status_t ucc_tl_ucp_mc_copy_test(ucc_tl_ucp_context_t *ctx,
void *copy_task);
ucc_tl_ucp_copy_task_t *copy_task);

ucc_status_t ucc_tl_ucp_mc_copy_finalize(void *copy_task);
ucc_status_t ucc_tl_ucp_mc_copy_finalize(ucc_tl_ucp_copy_task_t *copy_task);

/* copies based on EC */
ucc_status_t ucc_tl_ucp_ec_copy_post(void *dst, ucc_memory_type_t dst_mtype,
void *src, ucc_memory_type_t src_mtype,
size_t size,
ucc_tl_ucp_task_t *coll_task,
void **copy_task);
ucc_tl_ucp_copy_task_t **copy_task);

ucc_status_t ucc_tl_ucp_ec_copy_test(ucc_tl_ucp_context_t *ctx,
void *copy_task);
ucc_tl_ucp_copy_task_t *copy_task);

ucc_status_t ucc_tl_ucp_ec_copy_finalize(void *copy_task);
ucc_status_t ucc_tl_ucp_ec_copy_finalize(ucc_tl_ucp_copy_task_t *copy_task);

/* copies based on UCX */
ucc_status_t ucc_tl_ucp_ucp_copy_post(void *dst, ucc_memory_type_t dst_mtype,
void *src, ucc_memory_type_t src_mtype,
size_t size,
ucc_tl_ucp_task_t *coll_task,
void **copy_task);
ucc_tl_ucp_copy_task_t **copy_task);

ucc_status_t ucc_tl_ucp_ucp_copy_test(ucc_tl_ucp_context_t *ctx,
void *copy_task);
ucc_tl_ucp_copy_task_t *copy_task);

ucc_status_t ucc_tl_ucp_ucp_copy_finalize(void *copy_task);
ucc_status_t ucc_tl_ucp_ucp_copy_finalize(ucc_tl_ucp_copy_task_t *copy_task);

0 comments on commit 59f06c8

Please sign in to comment.