Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TL/UCP: add local copy option #1077

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/components/tl/ucp/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#

if TL_UCP_ENABLED
Expand Down Expand Up @@ -116,6 +116,7 @@ sources = \
tl_ucp_service_coll.c \
tl_ucp_dpu_offload.h \
tl_ucp_dpu_offload.c \
tl_ucp_copy.c \
$(allgather) \
$(allgatherv) \
$(alltoall) \
Expand Down
65 changes: 31 additions & 34 deletions src/components/tl/ucp/allgather/allgather_knomial.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand All @@ -8,6 +8,7 @@
#include "tl_ucp.h"
#include "tl_ucp_coll.h"
#include "tl_ucp_sendrecv.h"
#include "tl_ucp_copy.h"
#include "core/ucc_progress_queue.h"
#include "components/mc/ucc_mc.h"
#include "coll_patterns/sra_knomial.h"
Expand Down Expand Up @@ -58,6 +59,7 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
ucc_tl_ucp_task_t);
ucc_coll_args_t *args = &TASK_ARGS(task);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team);
ucc_kn_radix_t radix = task->allgather_kn.p.radix;
uint8_t node_type = task->allgather_kn.p.node_type;
ucc_knomial_pattern_t *p = &task->allgather_kn.p;
Expand All @@ -78,9 +80,8 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)
ucc_status_t status;
size_t extra_count;

EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test",
task->allgather_kn.etask);
task->allgather_kn.etask = NULL;
COPY_TASK_TEST(UCC_KN_PHASE_INIT, ctx, "failed during copy task test",
task->allgather_kn.copy_task);
UCC_KN_GOTO_PHASE(task->allgather_kn.phase);
if (KN_NODE_EXTRA == node_type) {
peer = ucc_knomial_pattern_get_proxy(p, rank);
Expand Down Expand Up @@ -201,42 +202,37 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
ucc_tl_ucp_task_t);
ucc_coll_args_t *args = &TASK_ARGS(task);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team);
ucc_coll_type_t ct = args->coll_type;
ucc_rank_t size = task->subset.map.ep_num;
ucc_kn_radix_t radix = task->allgather_kn.p.radix;
ucc_knomial_pattern_t *p = &task->allgather_kn.p;
ucc_rank_t rank = VRANK(task->subset.myrank,
ct == UCC_COLL_TYPE_BCAST ?
args->root : 0, size);
ucc_ee_executor_task_args_t eargs = {0};
ucc_status_t status;
ptrdiff_t offset;
ucc_ee_executor_t *exec;
void *rbuf;

UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_kn_start", 0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
task->allgather_kn.etask = NULL;
task->allgather_kn.phase = UCC_KN_PHASE_INIT;
task->allgather_kn.copy_task = NULL;
task->allgather_kn.phase = UCC_KN_PHASE_INIT;
if (ct == UCC_COLL_TYPE_ALLGATHER) {
ucc_kn_ag_pattern_init(size, rank, radix, args->dst.info.count,
&task->allgather_kn.p);
offset = ucc_buffer_block_offset(args->dst.info.count, size, rank) *
ucc_dt_size(args->dst.info.datatype);
rbuf = args->dst.info.buffer;
if (!UCC_IS_INPLACE(*args)) {
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY;
eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset);
eargs.copy.src = args->src.info.buffer;
eargs.copy.len = args->src.info.count *
ucc_dt_size(args->src.info.datatype);
status = ucc_ee_executor_task_post(exec, &eargs,
&task->allgather_kn.etask);
status = ctx->copy.post(PTR_OFFSET(args->dst.info.buffer, offset),
args->dst.info.mem_type,
args->src.info.buffer,
args->src.info.mem_type,
args->src.info.count *
ucc_dt_size(args->src.info.datatype),
task,
&task->allgather_kn.copy_task);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
Expand All @@ -252,18 +248,14 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
ucc_dt_size(args->dst.info_v.datatype);
rbuf = args->dst.info_v.buffer;
if (!UCC_IS_INPLACE(*args)) {
status = ucc_coll_task_get_executor(&task->super, &exec);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
}
eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY;
eargs.copy.dst = PTR_OFFSET(args->dst.info_v.buffer, offset);
eargs.copy.src = args->src.info.buffer;
eargs.copy.len = args->src.info.count *
ucc_dt_size(args->src.info.datatype);
status = ucc_ee_executor_task_post(exec, &eargs,
&task->allgather_kn.etask);
status = ctx->copy.post(PTR_OFFSET(args->dst.info_v.buffer, offset),
args->dst.info_v.mem_type,
args->src.info.buffer,
args->src.info.mem_type,
args->src.info.count *
ucc_dt_size(args->src.info.datatype),
task,
&task->allgather_kn.copy_task);
if (ucc_unlikely(status != UCC_OK)) {
task->super.status = status;
return status;
Expand All @@ -290,7 +282,8 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init_r(
ucc_base_coll_args_t *coll_args, ucc_base_team_t *team,
ucc_coll_task_t **task_h, ucc_kn_radix_t radix)
{
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(tl_team);
ucc_tl_ucp_task_t *task;
ucc_sbgp_t *sbgp;

Expand All @@ -302,7 +295,11 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init_r(
task->subset.map = sbgp->map;
}
task->allgather_kn.p.radix = radix;
task->super.flags |= UCC_COLL_TASK_FLAG_EXECUTOR;
if (!UCC_IS_INPLACE(coll_args->args)) {
if (ctx->cfg.local_copy_type == UCC_TL_UCP_LOCAL_COPY_TYPE_EC) {
task->super.flags |= UCC_COLL_TASK_FLAG_EXECUTOR;
}
}
task->super.post = ucc_tl_ucp_allgather_knomial_start;
task->super.progress = ucc_tl_ucp_allgather_knomial_progress;
*task_h = &task->super;
Expand Down
16 changes: 15 additions & 1 deletion src/components/tl/ucp/tl_ucp.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2020-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand Down Expand Up @@ -208,6 +208,14 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = {

{NULL}};

const char* ucc_tl_ucp_local_copy_names[] = {
[UCC_TL_UCP_LOCAL_COPY_TYPE_UCP] = "ucp",
[UCC_TL_UCP_LOCAL_COPY_TYPE_MC] = "mc",
[UCC_TL_UCP_LOCAL_COPY_TYPE_EC] = "ec",
[UCC_TL_UCP_LOCAL_COPY_TYPE_AUTO] = "auto",
[UCC_TL_UCP_LOCAL_COPY_TYPE_LAST] = NULL
};

static ucs_config_field_t ucc_tl_ucp_context_config_table[] = {
{"", "", NULL, ucc_offsetof(ucc_tl_ucp_context_config_t, super),
UCC_CONFIG_TYPE_TABLE(ucc_tl_context_config_table)},
Expand Down Expand Up @@ -246,6 +254,12 @@ static ucs_config_field_t ucc_tl_ucp_context_config_table[] = {
ucc_offsetof(ucc_tl_ucp_context_config_t, service_throttling_thresh),
UCC_CONFIG_TYPE_UINT},

{"LOCAL_COPY_TYPE", "auto",
"Determines what component is responsible for doing local copy "
"during collective execution",
ucc_offsetof(ucc_tl_ucp_context_config_t, local_copy_type),
UCC_CONFIG_TYPE_ENUM(ucc_tl_ucp_local_copy_names)},

{NULL}};

UCC_CLASS_DEFINE_NEW_FUNC(ucc_tl_ucp_lib_t, ucc_base_lib_t,
Expand Down
46 changes: 37 additions & 9 deletions src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2020-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand Down Expand Up @@ -79,14 +79,23 @@ typedef struct ucc_tl_ucp_lib_config {
int use_reordering;
} ucc_tl_ucp_lib_config_t;

typedef enum ucc_tl_ucp_local_copy_type {
UCC_TL_UCP_LOCAL_COPY_TYPE_UCP,
UCC_TL_UCP_LOCAL_COPY_TYPE_MC,
UCC_TL_UCP_LOCAL_COPY_TYPE_EC,
UCC_TL_UCP_LOCAL_COPY_TYPE_AUTO,
UCC_TL_UCP_LOCAL_COPY_TYPE_LAST
} ucc_tl_ucp_local_copy_type_t;

typedef struct ucc_tl_ucp_context_config {
ucc_tl_context_config_t super;
uint32_t preconnect;
uint32_t n_polls;
uint32_t oob_npolls;
uint32_t pre_reg_mem;
uint32_t service_worker;
uint32_t service_throttling_thresh;
ucc_tl_context_config_t super;
uint32_t preconnect;
uint32_t n_polls;
uint32_t oob_npolls;
uint32_t pre_reg_mem;
uint32_t service_worker;
uint32_t service_throttling_thresh;
ucc_tl_ucp_local_copy_type_t local_copy_type;
} ucc_tl_ucp_context_config_t;

typedef ucc_tl_ucp_lib_config_t ucc_tl_ucp_team_config_t;
Expand Down Expand Up @@ -116,6 +125,21 @@ typedef struct ucc_tl_ucp_worker {
ucp_ep_h * eps;
} ucc_tl_ucp_worker_t;

typedef struct ucc_tl_ucp_task ucc_tl_ucp_task_t;
typedef struct ucc_tl_ucp_context ucc_tl_ucp_context_t;
typedef union ucc_tl_ucp_copy_task ucc_tl_ucp_copy_task_t;

typedef ucc_status_t (*ucc_tl_ucp_copy_post_fn_t)(void *dst,
ucc_memory_type_t dst_mtype,
void *src,
ucc_memory_type_t src_mtype,
size_t size,
ucc_tl_ucp_task_t *coll_task,
ucc_tl_ucp_copy_task_t **copy_task);
typedef ucc_status_t (*ucc_tl_ucp_copy_test_fn_t)(ucc_tl_ucp_context_t *ctx,
ucc_tl_ucp_copy_task_t *copy_task);
typedef ucc_status_t (*ucc_tl_ucp_copy_finalize_fn_t)(ucc_tl_ucp_copy_task_t *copy_task);

typedef struct ucc_tl_ucp_context {
ucc_tl_context_t super;
ucc_tl_ucp_context_config_t cfg;
Expand All @@ -128,11 +152,15 @@ typedef struct ucc_tl_ucp_context {
uint64_t n_rinfo_segs;
uint64_t ucp_memory_types;
int topo_required;
struct {
ucc_tl_ucp_copy_post_fn_t post;
ucc_tl_ucp_copy_test_fn_t test;
ucc_tl_ucp_copy_finalize_fn_t finalize;
} copy;
} ucc_tl_ucp_context_t;
UCC_CLASS_DECLARE(ucc_tl_ucp_context_t, const ucc_base_context_params_t *,
const ucc_base_config_t *);

typedef struct ucc_tl_ucp_task ucc_tl_ucp_task_t;
typedef struct ucc_tl_ucp_team {
ucc_tl_team_t super;
ucc_status_t status;
Expand Down
4 changes: 2 additions & 2 deletions src/components/tl/ucp/tl_ucp_coll.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) Meta Platforms, Inc. and affiliates. 2022.
*
* See file LICENSE for terms.
Expand Down Expand Up @@ -178,7 +178,7 @@ typedef struct ucc_tl_ucp_task {
int phase;
ucc_knomial_pattern_t p;
void *sbuf;
ucc_ee_executor_task_t *etask;
ucc_tl_ucp_copy_task_t *copy_task;
ucc_rank_t recv_dist;
} allgather_kn;
struct {
Expand Down
34 changes: 33 additions & 1 deletion src/components/tl/ucp/tl_ucp_context.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2020-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand All @@ -12,6 +12,7 @@
#include "utils/ucc_string.h"
#include "utils/arch/cpu.h"
#include "schedule/ucc_schedule_pipelined.h"
#include "tl_ucp_copy.h"
#include <limits.h>

#define UCP_CHECK(function, msg, go, ctx) \
Expand Down Expand Up @@ -278,6 +279,37 @@ UCC_CLASS_INIT_FUNC(ucc_tl_ucp_context_t,
ucc_free(prefix);
prefix = NULL;


switch (self->cfg.local_copy_type) {
case UCC_TL_UCP_LOCAL_COPY_TYPE_MC:
self->copy.post = ucc_tl_ucp_mc_copy_post;
self->copy.test = ucc_tl_ucp_mc_copy_test;
self->copy.finalize = ucc_tl_ucp_mc_copy_finalize;
tl_debug(self->super.super.lib, "using MC for local copy");
break;
case UCC_TL_UCP_LOCAL_COPY_TYPE_EC:
case UCC_TL_UCP_LOCAL_COPY_TYPE_AUTO:
self->cfg.local_copy_type = UCC_TL_UCP_LOCAL_COPY_TYPE_EC;
self->copy.post = ucc_tl_ucp_ec_copy_post;
self->copy.test = ucc_tl_ucp_ec_copy_test;
self->copy.finalize = ucc_tl_ucp_ec_copy_finalize;
tl_debug(self->super.super.lib, "using EC for local copy");
break;
case UCC_TL_UCP_LOCAL_COPY_TYPE_UCP:
self->copy.post = ucc_tl_ucp_ucp_copy_post;
self->copy.test = ucc_tl_ucp_ucp_copy_test;
self->copy.finalize = ucc_tl_ucp_ucp_copy_finalize;
tl_debug(self->super.super.lib, "using UCP for local copy");
break;
default:
self->copy.post = ucc_tl_ucp_ec_copy_post;
self->copy.test = ucc_tl_ucp_ec_copy_test;
self->copy.finalize = ucc_tl_ucp_ec_copy_finalize;
tl_error(self->super.super.lib,
"not valid copy type: %d, using EC copy instead",
self->cfg.local_copy_type);
};

tl_debug(self->super.super.lib, "initialized tl context: %p", self);
return UCC_OK;

Expand Down
Loading
Loading