Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ferrol aderholdt committed Feb 19, 2025
1 parent 4f039a2 commit f02340e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 99 deletions.
115 changes: 22 additions & 93 deletions src/components/tl/ucp/tl_ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -558,30 +558,27 @@ ucc_status_t ucc_tl_ucp_mem_map_memhbuf(ucc_tl_ucp_context_t *ctx,
{
ucp_mem_map_params_t mmap_params;
ucs_status_t status;
void *packed_memh;
void *packed_memh;

*mh = NULL;
/* unpack here */
packed_memh = UCC_TL_UCP_MEMH_TL_PACKED_MEMH(pack_buffer);
// size_t *key_size = (size_t *)PTR_OFFSET(pack_buffer, sizeof(size_t) * 2);
// void *packed_memh = PTR_OFFSET(pack_buffer, sizeof(size_t) * 4 + *key_size);
// void *packed_memh = PTR_OFFSET(pack_buffer, sizeof(size_t) * 4 + *key_size);

mmap_params.field_mask = UCP_MEM_MAP_PARAM_FIELD_EXPORTED_MEMH_BUFFER;
mmap_params.exported_memh_buffer = packed_memh;

status = ucp_mem_map(ctx->worker.ucp_context, &mmap_params, mh);
if (UCS_OK != status) {
if (status == UCS_ERR_INVALID_PARAM) {
return UCC_OK; //it's ok.
}
if (status != UCS_ERR_UNREACHABLE) {
tl_error(ctx->super.super.lib,
"ucp_mem_map failed with error code: %s",
ucs_status_string(status));
return ucs_status_to_ucc_status(status);
} else {
tl_debug(ctx->super.super.lib,
"ucp_mem_map could not map exported memory handle");
tl_warn(ctx->super.super.lib,
"ucp_mem_map cannot map exported memory handles");
}
}
return UCC_OK;
Expand Down Expand Up @@ -615,6 +612,9 @@ ucc_status_t ucc_tl_ucp_mem_map_export(ucc_tl_ucp_context_t *ctx, void *address,
m_data->rinfo.va_base = address;
m_data->rinfo.len = len;

pack_params.field_mask = UCP_MEMH_PACK_PARAM_FIELD_FLAGS;
pack_params.flags = UCP_MEMH_PACK_FLAG_EXPORT;

status = ucp_memh_pack(mh, &pack_params, &m_data->packed_memh,
&m_data->packed_memh_len);
if (status != UCS_OK) {
Expand All @@ -626,10 +626,16 @@ ucc_status_t ucc_tl_ucp_mem_map_export(ucc_tl_ucp_context_t *ctx, void *address,
}
}

// pack rkey
if (!m_data->rinfo.mem_h) {
tl_error(ctx->super.super.lib,
"attempting to pack keys with invalid memory handle");
return UCC_ERR_INVALID_PARAM;
}

// pack rkey (mem_h should be valid by this point)
status =
ucp_rkey_pack(ctx->worker.ucp_context, mh, &m_data->rinfo.packed_key,
&m_data->rinfo.packed_key_len);
ucp_rkey_pack(ctx->worker.ucp_context, m_data->rinfo.mem_h,
&m_data->rinfo.packed_key, &m_data->rinfo.packed_key_len);
if (status != UCS_OK) {
tl_error(ctx->super.super.lib, "unable to pack rkey with error %s",
ucs_status_string(status));
Expand All @@ -647,9 +653,9 @@ ucc_status_t ucc_tl_ucp_mem_map_dpu_import(ucc_tl_ucp_context_t *ctx,
ucc_mem_map_memh_t *l_memh,
void *tl_h)
{
size_t offset = 0;
ucp_mem_h mh;
ucc_status_t ucc_status;
size_t offset = 0;

for (int i = 0; i < l_memh->num_tls; i++) {
size_t *p = (size_t *)PTR_OFFSET(l_memh->pack_buffer, offset);
Expand Down Expand Up @@ -684,12 +690,7 @@ ucc_status_t ucc_tl_ucp_mem_map(const ucc_base_context_t *context, int type,
ucc_mem_map_tl_t *p_memh = (ucc_mem_map_tl_t *)tl_h;
ucc_tl_ucp_memh_data_t *m_data = (ucc_tl_ucp_memh_data_t *)p_memh->tl_data;
ucc_mem_map_memh_t *l_memh = (ucc_mem_map_memh_t *)memh;
// size_t offset = 0;
// ucp_mem_h mh = NULL;
ucc_status_t ucc_status = UCC_OK;
// ucp_mem_map_params_t mmap_params;
// ucs_status_t status;
// ucp_memh_pack_params_t pack_params;

/* technically, only need to do this on import */
if (type == UCC_MEM_MAP_TYPE_GLOBAL ||
Expand All @@ -705,93 +706,21 @@ ucc_status_t ucc_tl_ucp_mem_map(const ucc_base_context_t *context, int type,

if (type == UCC_MEM_MAP_TYPE_LOCAL || type == UCC_MEM_MAP_TYPE_DPU_EXPORT) {
ucc_status = ucc_tl_ucp_mem_map_export(ctx, address, len, type, m_data);
} else if (type == UCC_MEM_MAP_TYPE_DPU_IMPORT) {
ucc_status = ucc_tl_ucp_mem_map_dpu_import(ctx, address, len, m_data, l_memh, tl_h);
}

p_memh->packed_size =
m_data->packed_memh_len + m_data->rinfo.packed_key_len;

return ucc_status;
}
#if 0
if (type == UCC_MEM_MAP_TYPE_LOCAL) { /* this is an export */
mmap_params.field_mask =
UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH;
mmap_params.address = address;
mmap_params.length = len;

status = ucp_mem_map(ctx->worker.ucp_context, &mmap_params, &mh);
if (UCS_OK != status) {
tl_error(ctx->super.super.lib,
"ucp_mem_map failed with error code: %d", status);
ucc_status = ucs_status_to_ucc_status(status);
if (UCC_OK != ucc_status) {
tl_error(ctx->super.super.lib, "failed to export memory handles");
}
m_data->rinfo.mem_h = mh;
m_data->rinfo.va_base = address;
m_data->rinfo.len = len;
} else if (type == UCC_MEM_MAP_TYPE_DPU_IMPORT) {
for (int i = 0; i < l_memh->num_tls; i++) {
size_t *p = (size_t *)PTR_OFFSET(l_memh->pack_buffer, offset);

if (tl_h == (void *)&l_memh->tl_h[i]) {
break;
}
/* this is not the index, skip this section of buffer if exists */
if (p[0] == i) {
offset += p[1];
}
}

ucc_status = ucc_tl_ucp_mem_map_memhbuf(
ctx, PTR_OFFSET(l_memh->pack_buffer, offset), &mh);
if (ucc_status != UCC_OK) {
tl_error(ctx->super.super.lib, "ucp_mem_map failed to map memh");
return ucc_status;
ucc_status = ucc_tl_ucp_mem_map_dpu_import(ctx, address, len, m_data, l_memh, tl_h);
if (UCC_OK != ucc_status) {
tl_error(ctx->super.super.lib, "failed to import memory handle");
}
m_data->rinfo.mem_h = mh; /* used for xgvmi */
m_data->rinfo.va_base = address;
m_data->rinfo.len = len;
}

if (type == UCC_MEM_MAP_TYPE_LOCAL) {
status = ucp_memh_pack(mh, &pack_params, &m_data->packed_memh,
&m_data->packed_memh_len);
if (status != UCS_OK) {
// we don't support memory pack, or it failed
tl_debug(ctx->super.super.lib, "ucp_memh_pack() returned error %s",
ucs_status_string(status));
m_data->packed_memh = 0;
m_data->packed_memh_len = 0;
}

// pack rkey
status = ucp_rkey_pack(ctx->worker.ucp_context, mh,
&m_data->rinfo.packed_key,
&m_data->rinfo.packed_key_len);
if (status != UCS_OK) {
tl_error(ctx->super.super.lib, "unable to pack rkey with error %s",
ucs_status_string(status));
}
}
if (type == UCC_MEM_MAP_TYPE_DPU_EXPORT) {
// pack rkey
status = ucp_rkey_pack(ctx->worker.ucp_context, m_data->rinfo.mem_h,
&m_data->rinfo.packed_key,
&m_data->rinfo.packed_key_len);
if (status != UCS_OK) {
tl_error(ctx->super.super.lib, "unable to pack rkey with error %s",
ucs_status_string(status));
}
/* memh is already there.. reuse it */
l_memh->type = UCC_MEM_MAP_TYPE_DPU_EXPORT;
}
p_memh->packed_size =
m_data->packed_memh_len + m_data->rinfo.packed_key_len;

return ucc_status;
}
#endif

ucc_status_t ucc_tl_ucp_mem_unmap(const ucc_base_context_t *context, int type,
void *memh)
Expand Down
21 changes: 15 additions & 6 deletions src/core/ucc_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,8 @@ ucc_status_t ucc_mem_map_import(ucc_context_h context,
}

local_memh = *memh;
/* memh should have been used in exchanges or from a remote process, addresses, etc likely garbage. fix it */
/* memh should have been used in exchanges or from a remote process,
addresses, etc. likely garbage. fix it */
local_memh->tl_h = (ucc_mem_map_tl_t *)ucc_calloc(
ctx->n_tl_ctx, sizeof(ucc_mem_map_tl_t), "tl memh");
for (i = 0; i < ctx->n_tl_ctx; i++) {
Expand Down Expand Up @@ -1254,7 +1255,8 @@ ucc_status_t ucc_mem_map_export(ucc_context_h context,
"exported memh");
if (!exported_memh) {
ucc_error("failed to allocate handle for exported buffers");
return UCC_ERR_NO_MEMORY;
status = UCC_ERR_NO_MEMORY;
goto failed_pack;
}

/* copying */
Expand Down Expand Up @@ -1286,16 +1288,23 @@ ucc_status_t ucc_mem_map_export(ucc_context_h context,
exported_memh->my_ctx_rank = ctx->rank;
exported_memh->num_tls = ctx->n_tl_ctx;
*memh = exported_memh;
*memh_size = sizeof(ucc_mem_map_memh_t) + offset;// total_pack_size + 2 * sizeof(size_t) * ctx->n_tl_ctx;
*memh_size = sizeof(ucc_mem_map_memh_t) + offset;
return UCC_OK;
failed_pack:
for (int j = 0; j < i; j++) {
ucc_free(packed_buffers[j]);
}
i = ctx->n_tl_ctx;
failed_mem_map:
for (int j = 0; j < i; j++) {
tl_lib = ucc_derived_of(ctx->tl_ctx[i]->super.lib, ucc_tl_lib_t);
tl_lib->iface->context.mem_unmap((const ucc_base_context_t *)ctx,
UCC_MEM_MAP_TYPE_LOCAL,
&local_memh->tl_h[j]);
}
ucc_free(local_memh);
*memh = NULL;
*memh_size = 0;
return status;
}

Expand All @@ -1305,14 +1314,14 @@ ucc_status_t ucc_mem_map_dpu_export(ucc_context_h context,
{
ucc_context_t *ctx = (ucc_context_t *)context;
size_t total_pack_size = 0;
ucc_mem_map_memh_t *local_memh = *memh;
ucc_mem_map_memh_t *local_memh = *memh;
ucc_config_names_array_t *tls = &ctx->all_tls;
ucc_mem_map_memh_t *exported_memh;
void **packed_buffers;
ucc_status_t status;
ucc_tl_lib_t *tl_lib;
size_t offset;
int i;
ucc_config_names_array_t *tls = &ctx->all_tls;

packed_buffers =
(void **)ucc_calloc(ctx->n_tl_ctx, sizeof(void *), "packed buffers");
Expand Down Expand Up @@ -1364,7 +1373,7 @@ ucc_status_t ucc_mem_map_dpu_export(ucc_context_h context,
ucc_error("failed to allocate handle for exported buffers");
return UCC_ERR_NO_MEMORY;
}
exported_memh = local_memh;
// exported_memh = local_memh;

/* copying */
exported_memh->tl_h = local_memh->tl_h;
Expand Down

0 comments on commit f02340e

Please sign in to comment.