Skip to content

Commit

Permalink
TL/UCP: add local copy option
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergei-Lebedev committed Feb 24, 2025
1 parent bc996dd commit cc999a4
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 48 deletions.
3 changes: 2 additions & 1 deletion src/components/tl/ucp/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#

if TL_UCP_ENABLED
Expand Down Expand Up @@ -116,6 +116,7 @@ sources = \
tl_ucp_service_coll.c \
tl_ucp_dpu_offload.h \
tl_ucp_dpu_offload.c \
tl_ucp_copy.c \
$(allgather) \
$(allgatherv) \
$(alltoall) \
Expand Down
65 changes: 31 additions & 34 deletions src/components/tl/ucp/allgather/allgather_knomial.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand All @@ -8,6 +8,7 @@
#include "tl_ucp.h"
#include "tl_ucp_coll.h"
#include "tl_ucp_sendrecv.h"
#include "tl_ucp_copy.h"
#include "core/ucc_progress_queue.h"
#include "components/mc/ucc_mc.h"
#include "coll_patterns/sra_knomial.h"
Expand Down Expand Up @@ -58,6 +59,7 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
ucc_tl_ucp_task_t);
ucc_coll_args_t *args = &TASK_ARGS(task);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team);
ucc_kn_radix_t radix = task->allgather_kn.p.radix;
uint8_t node_type = task->allgather_kn.p.node_type;
ucc_knomial_pattern_t *p = &task->allgather_kn.p;
Expand All @@ -78,9 +80,8 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
ucc_status_t status;
size_t extra_count;

EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test",
task->allgather_kn.etask);
task->allgather_kn.etask = NULL;
COPY_TASK_TEST(UCC_KN_PHASE_INIT, ctx, "failed during copy task test",
task->allgather_kn.copy_task);
UCC_KN_GOTO_PHASE(task->allgather_kn.phase);
if (KN_NODE_EXTRA == node_type) {
peer = ucc_knomial_pattern_get_proxy(p, rank);
Expand Down Expand Up @@ -201,42 +202,37 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
ucc_tl_ucp_task_t);
ucc_coll_args_t *args = &TASK_ARGS(task);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team);
ucc_coll_type_t ct = args->coll_type;
ucc_rank_t size = task->subset.map.ep_num;
ucc_kn_radix_t radix = task->allgather_kn.p.radix;
ucc_knomial_pattern_t *p = &task->allgather_kn.p;
ucc_rank_t rank = VRANK(task->subset.myrank,
ct == UCC_COLL_TYPE_BCAST ?
args->root : 0, size);
ucc_ee_executor_task_args_t eargs = {0};
ucc_status_t status;
ptrdiff_t offset;
ucc_ee_executor_t *exec;
void *rbuf;

UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_kn_start", 0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
task->allgather_kn.etask = NULL;
task->allgather_kn.phase = UCC_KN_PHASE_INIT;
task->allgather_kn.copy_task = NULL;
task->allgather_kn.phase = UCC_KN_PHASE_INIT;
if (ct == UCC_COLL_TYPE_ALLGATHER) {
ucc_kn_ag_pattern_init(size, rank, radix, args->dst.info.count,
&task->allgather_kn.p);
offset = ucc_buffer_block_offset(args->dst.info.count, size, rank) *
ucc_dt_size(args->dst.info.datatype);
rbuf = args->dst.info.buffer;
if (!UCC_IS_INPLACE(*args)) {
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY;
eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset);
eargs.copy.src = args->src.info.buffer;
eargs.copy.len = args->src.info.count *
ucc_dt_size(args->src.info.datatype);
status = ucc_ee_executor_task_post(exec, &eargs,
&task->allgather_kn.etask);
status = ctx->copy.post(PTR_OFFSET(args->dst.info.buffer, offset),
args->dst.info.mem_type,
args->src.info.buffer,
args->src.info.mem_type,
args->src.info.count *
ucc_dt_size(args->src.info.datatype),
task,
&task->allgather_kn.copy_task);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
Expand All @@ -252,18 +248,14 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
ucc_dt_size(args->dst.info_v.datatype);
rbuf = args->dst.info_v.buffer;
if (!UCC_IS_INPLACE(*args)) {
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY;
eargs.copy.dst = PTR_OFFSET(args->dst.info_v.buffer, offset);
eargs.copy.src = args->src.info.buffer;
eargs.copy.len = args->src.info.count *
ucc_dt_size(args->src.info.datatype);
status = ucc_ee_executor_task_post(exec, &eargs,
&task->allgather_kn.etask);
status = ctx->copy.post(PTR_OFFSET(args->dst.info_v.buffer, offset),
args->dst.info_v.mem_type,
args->src.info.buffer,
args->src.info.mem_type,
args->src.info.count *
ucc_dt_size(args->src.info.datatype),
task,
&task->allgather_kn.copy_task);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
Expand All @@ -290,7 +282,8 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init_r(
ucc_base_coll_args_t *coll_args, ucc_base_team_t *team,
ucc_coll_task_t **task_h, ucc_kn_radix_t radix)
{
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team);
ucc_tl_ucp_task_t *task;
ucc_sbgp_t *sbgp;

Expand All @@ -302,7 +295,11 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init_r(
task->subset.map = sbgp->map;
}
task->allgather_kn.p.radix = radix;
task->super.flags |= UCC_COLL_TASK_FLAG_EXECUTOR;
if (!UCC_IS_INPLACE(coll_args->args)) {
if (ctx->cfg.local_copy_type == UCC_TL_UCP_LOCAL_COPY_TYPE_EC) {
task->super.flags |= UCC_COLL_TASK_FLAG_EXECUTOR;
}
}
task->super.post = ucc_tl_ucp_allgather_knomial_start;
task->super.progress = ucc_tl_ucp_allgather_knomial_progress;
*task_h = &task->super;
Expand Down
16 changes: 15 additions & 1 deletion src/components/tl/ucp/tl_ucp.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2020-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand Down Expand Up @@ -208,6 +208,14 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = {

{NULL}};

const char* ucc_tl_ucp_local_copy_names[] = {
[UCC_TL_UCP_LOCAL_COPY_TYPE_UCP] = "ucp",
[UCC_TL_UCP_LOCAL_COPY_TYPE_MC] = "mc",
[UCC_TL_UCP_LOCAL_COPY_TYPE_EC] = "ec",
[UCC_TL_UCP_LOCAL_COPY_TYPE_AUTO] = "auto",
[UCC_TL_UCP_LOCAL_COPY_TYPE_LAST] = NULL
};

static ucs_config_field_t ucc_tl_ucp_context_config_table[] = {
{"", "", NULL, ucc_offsetof(ucc_tl_ucp_context_config_t, super),
UCC_CONFIG_TYPE_TABLE(ucc_tl_context_config_table)},
Expand Down Expand Up @@ -246,6 +254,12 @@ static ucs_config_field_t ucc_tl_ucp_context_config_table[] = {
ucc_offsetof(ucc_tl_ucp_context_config_t, service_throttling_thresh),
UCC_CONFIG_TYPE_UINT},

{"LOCAL_COPY_TYPE", "auto",
"Determines what component is responsible for doing local copy "
"during collective execution",
ucc_offsetof(ucc_tl_ucp_context_config_t, local_copy_type),
UCC_CONFIG_TYPE_ENUM(ucc_tl_ucp_local_copy_names)},

{NULL}};

UCC_CLASS_DEFINE_NEW_FUNC(ucc_tl_ucp_lib_t, ucc_base_lib_t,
Expand Down
45 changes: 36 additions & 9 deletions src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2020-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand Down Expand Up @@ -79,14 +79,23 @@ typedef struct ucc_tl_ucp_lib_config {
int use_reordering;
} ucc_tl_ucp_lib_config_t;

typedef enum ucc_tl_ucp_local_copy_type {
UCC_TL_UCP_LOCAL_COPY_TYPE_UCP,
UCC_TL_UCP_LOCAL_COPY_TYPE_MC,
UCC_TL_UCP_LOCAL_COPY_TYPE_EC,
UCC_TL_UCP_LOCAL_COPY_TYPE_AUTO,
UCC_TL_UCP_LOCAL_COPY_TYPE_LAST
} ucc_tl_ucp_local_copy_type_t;

typedef struct ucc_tl_ucp_context_config {
ucc_tl_context_config_t super;
uint32_t preconnect;
uint32_t n_polls;
uint32_t oob_npolls;
uint32_t pre_reg_mem;
uint32_t service_worker;
uint32_t service_throttling_thresh;
ucc_tl_context_config_t super;
uint32_t preconnect;
uint32_t n_polls;
uint32_t oob_npolls;
uint32_t pre_reg_mem;
uint32_t service_worker;
uint32_t service_throttling_thresh;
ucc_tl_ucp_local_copy_type_t local_copy_type;
} ucc_tl_ucp_context_config_t;

typedef ucc_tl_ucp_lib_config_t ucc_tl_ucp_team_config_t;
Expand Down Expand Up @@ -116,6 +125,20 @@ typedef struct ucc_tl_ucp_worker {
ucp_ep_h * eps;
} ucc_tl_ucp_worker_t;

typedef struct ucc_tl_ucp_task ucc_tl_ucp_task_t;
typedef struct ucc_tl_ucp_context ucc_tl_ucp_context_t;

typedef ucc_status_t (*ucc_tl_ucp_copy_post_fn_t)(void *dst,
ucc_memory_type_t dst_mtype,
void *src,
ucc_memory_type_t src_mtype,
size_t size,
ucc_tl_ucp_task_t *coll_task,
void **copy_task);
typedef ucc_status_t (*ucc_tl_ucp_copy_test_fn_t)(ucc_tl_ucp_context_t *ctx,
void *copy_task);
typedef ucc_status_t (*ucc_tl_ucp_copy_finalize_fn_t)(void *copy_task);

typedef struct ucc_tl_ucp_context {
ucc_tl_context_t super;
ucc_tl_ucp_context_config_t cfg;
Expand All @@ -128,11 +151,15 @@ typedef struct ucc_tl_ucp_context {
uint64_t n_rinfo_segs;
uint64_t ucp_memory_types;
int topo_required;
struct {
ucc_tl_ucp_copy_post_fn_t post;
ucc_tl_ucp_copy_test_fn_t test;
ucc_tl_ucp_copy_finalize_fn_t finalize;
} copy;
} ucc_tl_ucp_context_t;
UCC_CLASS_DECLARE(ucc_tl_ucp_context_t, const ucc_base_context_params_t *,
const ucc_base_config_t *);

typedef struct ucc_tl_ucp_task ucc_tl_ucp_task_t;
typedef struct ucc_tl_ucp_team {
ucc_tl_team_t super;
ucc_status_t status;
Expand Down
4 changes: 2 additions & 2 deletions src/components/tl/ucp/tl_ucp_coll.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) Meta Platforms, Inc. and affiliates. 2022.
*
* See file LICENSE for terms.
Expand Down Expand Up @@ -178,7 +178,7 @@ typedef struct ucc_tl_ucp_task {
int phase;
ucc_knomial_pattern_t p;
void *sbuf;
ucc_ee_executor_task_t *etask;
void *copy_task;
ucc_rank_t recv_dist;
} allgather_kn;
struct {
Expand Down
34 changes: 33 additions & 1 deletion src/components/tl/ucp/tl_ucp_context.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2020-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand All @@ -12,6 +12,7 @@
#include "utils/ucc_string.h"
#include "utils/arch/cpu.h"
#include "schedule/ucc_schedule_pipelined.h"
#include "tl_ucp_copy.h"
#include <limits.h>

#define UCP_CHECK(function, msg, go, ctx) \
Expand Down Expand Up @@ -278,6 +279,37 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_context_t,
ucc_free(prefix);
prefix = NULL;


switch (self->cfg.local_copy_type) {
case UCC_TL_UCP_LOCAL_COPY_TYPE_MC:
self->copy.post = ucc_tl_ucp_mc_copy_post;
self->copy.test = ucc_tl_ucp_mc_copy_test;
self->copy.finalize = ucc_tl_ucp_mc_copy_finalize;
tl_debug(self->super.super.lib, "using MC for local copy");
break;
case UCC_TL_UCP_LOCAL_COPY_TYPE_EC:
case UCC_TL_UCP_LOCAL_COPY_TYPE_AUTO:
self->cfg.local_copy_type = UCC_TL_UCP_LOCAL_COPY_TYPE_EC;
self->copy.post = ucc_tl_ucp_ec_copy_post;
self->copy.test = ucc_tl_ucp_ec_copy_test;
self->copy.finalize = ucc_tl_ucp_ec_copy_finalize;
tl_debug(self->super.super.lib, "using EC for local copy");
break;
case UCC_TL_UCP_LOCAL_COPY_TYPE_UCP:
self->copy.post = ucc_tl_ucp_ucp_copy_post;
self->copy.test = ucc_tl_ucp_ucp_copy_test;
self->copy.finalize = ucc_tl_ucp_ucp_copy_finalize;
tl_debug(self->super.super.lib, "using UCP for local copy");
break;
default:
self->copy.post = ucc_tl_ucp_ec_copy_post;
self->copy.test = ucc_tl_ucp_ec_copy_test;
self->copy.finalize = ucc_tl_ucp_ec_copy_finalize;
tl_error(self->super.super.lib,
"not valid copy type: %d, using EC copy instead",
self->cfg.local_copy_type);
};

tl_debug(self->super.super.lib, "initialized tl context: %p", self);
return UCC_OK;

Expand Down
Loading

0 comments on commit cc999a4

Please sign in to comment.