Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 85 additions & 37 deletions ompi/mca/pml/ubcl/pml_ubcl_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ static int mca_pml_ubcl_export_local_endpoint_handle(const int type)

err = ubcl_export_local_endpoint_handle(type, endpoint_h, &remote_rank_u64);
if (UBCL_SUCCESS != err) {
return OMPI_ERROR;
return ubcl_error_to_ompi(err);
}

mca_pml_ubcl_endpoint_modex_put(type, (void *) endpoint_h, size);
Expand All @@ -120,10 +120,10 @@ static int mca_pml_ubcl_export_local_endpoint_handle(const int type)
* The actual recv rank will be allocated during add_procs calls */
err = ubcl_close_local_endpoint_channel(type, remote_rank_u64);
if (UBCL_SUCCESS != err) {
mca_pml_ubcl_warn(OMPI_ERROR,
mca_pml_ubcl_warn(ubcl_error_to_ompi(err),
"PML/UBCL failed to clean local endpoint (very unlikely error)."
" For safety reason PML will be disabled.");
return OMPI_ERROR;
return ubcl_error_to_ompi(err);
}

return OMPI_SUCCESS;
Expand All @@ -133,35 +133,31 @@ int mca_pml_ubcl_create_local_endpoint(void)
{
int type;
ubcl_error_t err;
int ompi_error;

type = UBCL_ENDPOINT_TYPE_SELF;
err = ubcl_create_local_endpoint(type);
if (UBCL_SUCCESS != err) {
mca_pml_ubcl_error(OMPI_ERROR, "Failed ubcl_create_local_endpoint %d (%d)", type, err);
mca_pml_ubcl_warn(ubcl_error_to_ompi(err), "Failed ubcl_create_local_endpoint %d (%d)", type, err);
}

/* UBCL_ENDPOINT_SHM */
if (!mca_pml_ubcl_component.force_intranode_bxi) {
type = UBCL_ENDPOINT_TYPE_SHMEM;
err = ubcl_create_local_endpoint(type);
if (UBCL_SUCCESS != err) {
mca_pml_ubcl_error(OMPI_ERROR, "Failed ubcl_create_local_endpoint %d (%d)", type, err);
if (UBCL_SUCCESS == err) {
err = mca_pml_ubcl_export_local_endpoint_handle(type);
}
ompi_error = mca_pml_ubcl_export_local_endpoint_handle(type);
if (OMPI_SUCCESS != ompi_error) {
return ompi_error;
if (UBCL_SUCCESS != err) {
mca_pml_ubcl_warn(ubcl_error_to_ompi(err), "Failed ubcl_create_local_endpoint %d (%d)", type, err);
}
}

type = UBCL_ENDPOINT_TYPE_BXI;
err = ubcl_create_local_endpoint(type);
if (UBCL_SUCCESS != err) {
mca_pml_ubcl_error(OMPI_ERROR, "Failed ubcl_create_local_endpoint %d (%d)", type, err);
if (UBCL_SUCCESS == err) {
err = mca_pml_ubcl_export_local_endpoint_handle(type);
}
ompi_error = mca_pml_ubcl_export_local_endpoint_handle(type);
if (OMPI_SUCCESS != ompi_error) {
return ompi_error;
if (UBCL_SUCCESS != err) {
mca_pml_ubcl_warn(ubcl_error_to_ompi(err), "Failed ubcl_create_local_endpoint %d (%d)", type, err);
}

return OMPI_SUCCESS;
Expand All @@ -170,20 +166,23 @@ int mca_pml_ubcl_create_local_endpoint(void)
int mca_pml_ubcl_free_local_endpoints()
{
int ret;
/* Finalize BXI */
ret = ubcl_free_local_endpoint(UBCL_ENDPOINT_TYPE_BXI);
if (UBCL_SUCCESS != ret) {
return OMPI_ERROR;
if (UBCL_SUCCESS != ret && UBCL_ERR_NOT_AVAILABLE != ret) {
/* If the transport was unavailable we silence the error,
* we're closing it anyway */
return ubcl_error_to_ompi(ret);
}

if (!mca_pml_ubcl_component.force_intranode_bxi) {
ret = ubcl_free_local_endpoint(UBCL_ENDPOINT_TYPE_SHMEM);
if (UBCL_SUCCESS != ret) {
return OMPI_ERROR;
if (UBCL_SUCCESS != ret && UBCL_ERR_NOT_AVAILABLE != ret) {
return ubcl_error_to_ompi(ret);
}
}

ret = ubcl_free_local_endpoint(UBCL_ENDPOINT_TYPE_SELF);
if (UBCL_SUCCESS != ret) {
return OMPI_ERROR;
if (UBCL_SUCCESS != ret && UBCL_ERR_NOT_AVAILABLE != ret) {
return ubcl_error_to_ompi(ret);
}

return OMPI_SUCCESS;
Expand Down Expand Up @@ -255,7 +254,7 @@ static int mca_pml_ubcl_create_recv_endpoint(uint64_t sender_rank, const int typ

err = ubcl_export_local_endpoint_handle(type, endpoint_h, &remote_rank_u64);
if (UBCL_SUCCESS != err) {
return OMPI_ERROR;
return ubcl_error_to_ompi(err);
}

return OMPI_SUCCESS;
Expand All @@ -270,11 +269,11 @@ static int mca_pml_ubcl_create_self_endpoints(uint64_t remote_rank)

err = ubcl_export_local_endpoint_handle(type, endpoint_h, &my_rank);
if (UBCL_SUCCESS != err) {
return OMPI_ERROR;
return ubcl_error_to_ompi(err);
}
err = ubcl_create_remote_endpoint(my_rank, my_rank, type, endpoint_h);
if (UBCL_SUCCESS != err) {
return OMPI_ERROR;
return ubcl_error_to_ompi(err);
}

return OMPI_SUCCESS;
Expand All @@ -296,6 +295,25 @@ static int get_endpoint_type(ompi_proc_t *proc)
}
}

static enum ubcl_endpoint_type_t mca_pml_ubcl_get_higher_transport(
enum ubcl_endpoint_type_t type)
{
switch ((int) type) {
case UBCL_ENDPOINT_TYPE_SELF:
case UBCL_ENDPOINT_TYPE_SHMEM:
type++;
break;
/* There are no valid higher transport */
case UBCL_ENDPOINT_TYPE_BXI:
default:
type = UBCL_ENDPOINT_TYPE_NONE;
/* Not a valid transport */
break;
}

return type;
}

void mca_pml_ubcl_endpoint_retain(ompi_proc_t *proc)
{
mca_common_ubcl_endpoint_t *endpoint = NULL;
Expand All @@ -312,6 +330,7 @@ void mca_pml_ubcl_endpoint_retain(ompi_proc_t *proc)
static int mca_pml_ubcl_create_endpoints(ompi_proc_t *proc)
{
int err = OMPI_SUCCESS;
enum ubcl_endpoint_type_t type;
mca_common_ubcl_endpoint_t *new_endpoint;

new_endpoint = malloc(sizeof(mca_common_ubcl_endpoint_t));
Expand All @@ -322,29 +341,58 @@ static int mca_pml_ubcl_create_endpoints(ompi_proc_t *proc)

new_endpoint->refcount = 0; //we increment it to 1 in endpoint_retain
new_endpoint->rank = mca_pml_forge_rank(proc);
new_endpoint->type = get_endpoint_type(proc);
type = get_endpoint_type(proc);

if (UBCL_ENDPOINT_TYPE_SELF == new_endpoint->type) {
if (UBCL_ENDPOINT_TYPE_SELF == type) {
err = mca_pml_ubcl_create_self_endpoints((uint64_t) new_endpoint->rank);
goto end;
}

err = mca_pml_ubcl_create_recv_endpoint(new_endpoint->rank, new_endpoint->type);
if (OMPI_SUCCESS != err) {
mca_pml_ubcl_error(err, "Failed to create recv endpoint for rank %zu\n",
new_endpoint->rank);
/* If the transport is unvailable (either explicitely disabled,
* or just unavailable) we do not return any error
* If UBCL encountered another error we return it */
if (OMPI_SUCCESS == err) {
goto end;
} else if (OMPI_ERR_NOT_AVAILABLE != err) {
goto error;
}
}

err = mca_pml_ubcl_create_send_endpoint(proc, new_endpoint->rank, new_endpoint->type);
/* If a transport is unavailable only a higher transport can take its place,
* ie. if SHM is unavailable, SELF cannot replace it but BXI can */
do {
err = mca_pml_ubcl_create_recv_endpoint(new_endpoint->rank, type);

if (OMPI_ERR_NOT_AVAILABLE == err) {
type = mca_pml_ubcl_get_higher_transport(type);
if (UBCL_ENDPOINT_TYPE_NONE == type) {
mca_pml_ubcl_warn(err, "Failed to create recv endpoint for rank %zu\n",
new_endpoint->rank);
goto error;
}
} else if (OMPI_SUCCESS != err) {
mca_pml_ubcl_warn(err, "Failed to create recv endpoint for rank %zu\n",
new_endpoint->rank);
goto error;
}
} while (OMPI_SUCCESS != err);

/* No need to loop again, if the transport became unavailable between
* the last operation and this one we can consider this a error */
err = mca_pml_ubcl_create_send_endpoint(proc, new_endpoint->rank, type);
if (OMPI_SUCCESS != err) {
mca_pml_ubcl_error(err, "Failed to create send endpoint for rank %zu\n",
new_endpoint->rank);
mca_pml_ubcl_warn(err, "Failed to create send endpoint for rank %zu\n",
new_endpoint->rank);
goto error;
}

end:
new_endpoint->type = type;
(proc)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_PML] = new_endpoint;
mca_pml_ubcl_endpoint_retain(proc);

return UBCL_SUCCESS;

error:
free(new_endpoint);
return err;
}

Expand Down