diff --git a/src/components/tl/ucp/allgather/allgather.c b/src/components/tl/ucp/allgather/allgather.c index 0e7ee2fb3f..769c4fb981 100644 --- a/src/components/tl/ucp/allgather/allgather.c +++ b/src/components/tl/ucp/allgather/allgather.c @@ -9,26 +9,6 @@ #define ALLGATHER_MAX_PATTERN_SIZE (sizeof(UCC_TL_UCP_ALLGATHER_DEFAULT_ALG_SELECT_STR)) - -ucc_status_t new_ucp_tl_self_copy_nb(void *dst, void *src, size_t len, ucc_memory_type_t dst_mem,ucc_memory_type_t src_mem, ucc_rank_t rank, ucc_tl_ucp_team_t *team, ucc_tl_ucp_task_t *task){ - ucc_status_t status; - status = ucc_tl_ucp_send_nb(src, len, src_mem, rank, team, task); - // check here all occurances of returns (if this is ok) - if (ucc_unlikely(UCC_OK != status)) { - printf("\n allgather.c line 41 \n"); - task->super.status = status; - return status; - } - status = ucc_tl_ucp_recv_nb(dst, len, dst_mem, rank, team, task); - if (ucc_unlikely(UCC_OK != status)) { - printf("\n allgather.c line 47 \n"); - task->super.status = status; - return status; - } - return UCC_OK; -} - -/*--------------YAELIS FUNCTION---------------------*/ ucc_base_coll_alg_info_t ucc_tl_ucp_allgather_algs[UCC_TL_UCP_ALLGATHER_ALG_LAST + 1] = { [UCC_TL_UCP_ALLGATHER_ALG_KNOMIAL] = diff --git a/src/components/tl/ucp/allgather/allgather.h b/src/components/tl/ucp/allgather/allgather.h index 6c123831df..b9269aacdd 100644 --- a/src/components/tl/ucp/allgather/allgather.h +++ b/src/components/tl/ucp/allgather/allgather.h @@ -10,13 +10,6 @@ #include "tl_ucp_sendrecv.h" - -#define NEW_MEMCPY(use_cuda, dst, src, len, dst_mem_type, src_mem_type, rank, team, task) \ - ((use_cuda) ? ucc_mc_memcpy(dst, src, len, dst_mem_type, src_mem_type) : \ - new_ucp_tl_self_copy_nb(dst, src, len, dst_mem_type, src_mem_type, rank, team, task)) - - - enum { UCC_TL_UCP_ALLGATHER_ALG_KNOMIAL, UCC_TL_UCP_ALLGATHER_ALG_RING, diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c index 9ed1d929ba..a1cb01cfc1 100644 --- a/src/components/tl/ucp/allgather/allgather_bruck.c +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -240,28 +240,35 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_start(ucc_coll_task_t *coll_task) /* initial step: copy data on non root ranks to the beginning of buffer */ uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda; + if (!UCC_IS_INPLACE(TASK_ARGS(task))) { // not inplace: copy chunk from source buff to beginning of receive - /* - status = ucc_mc_memcpy(rbuf, sbuf, data_size, rmem, smem); - if (ucc_unlikely(UCC_OK != status)) { - return status; - } - */ - status = NEW_MEMCPY(USE_CUDA, rbuf, sbuf, data_size, rmem, smem, trank, team, task); - if (ucc_unlikely(UCC_OK != status)) { - printf("error bruck line 254\n"); - return status; + if(USE_CUDA){ + status = ucc_mc_memcpy(rbuf, sbuf, data_size, rmem, smem); + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else { + /* Loopback */ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, enqueue); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(rbuf, data_size, rmem, trank, team, task),task, enqueue); } } else if (trank != 0) { + printf(" inplace\n"); // inplace: copy chunk to the begin - status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, data_size * trank), + if(USE_CUDA){ + status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, rmem); - if (ucc_unlikely(UCC_OK != status)) { - return status; + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else { + /* Loopback */ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, enqueue); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(rbuf, data_size, rmem, trank, team, task),task, enqueue); } } - +enqueue: return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); } diff --git a/src/components/tl/ucp/allgather/allgather_knomial.c b/src/components/tl/ucp/allgather/allgather_knomial.c index 5795dbca45..5ae41af193 100644 --- a/src/components/tl/ucp/allgather/allgather_knomial.c +++ b/src/components/tl/ucp/allgather/allgather_knomial.c @@ -15,6 +15,8 @@ #include "utils/ucc_coll_utils.h" #include "allgather.h" + + #define SAVE_STATE(_phase) \ do { \ task->allgather_kn.phase = _phase; \ @@ -57,11 +59,19 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task) ucc_rank_t peer, peer_dist; ucc_kn_radix_t loop_step; size_t peer_seg_count, local_seg_count; - //ucc_status_t status; + ucc_status_t status; size_t extra_count; - //EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test", task->allgather_kn.etask); - //task->allgather_kn.etask = NULL; + uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda; + + if(!USE_CUDA){ + if (UCC_INPROGRESS == ucc_tl_ucp_test(task)){ + return; + } + } + if(USE_CUDA) EXEC_TASK_TEST(UCC_KN_PHASE_INIT, "failed during ee task test", task->allgather_kn.etask); + task->allgather_kn.etask = NULL; + UCC_KN_GOTO_PHASE(task->allgather_kn.phase); if (KN_NODE_EXTRA == node_type) { peer = ucc_knomial_pattern_get_proxy(p, rank); @@ -176,6 +186,7 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task) ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task) { + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); ucc_coll_args_t *args = &TASK_ARGS(task); @@ -205,39 +216,33 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task) offset = ucc_buffer_block_offset(args->dst.info.count, size, rank) * ucc_dt_size(args->dst.info.datatype); - if (!UCC_IS_INPLACE(*args) && USE_CUDA) { - status = ucc_coll_task_get_executor(&task->super, &exec); - if (ucc_unlikely(status != UCC_OK)) { - task->super.status = status; - return status; - } - eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY; - eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset); - eargs.copy.src = args->src.info.buffer; - eargs.copy.len = args->src.info.count * - ucc_dt_size(args->src.info.datatype); - status = ucc_ee_executor_task_post(exec, &eargs, - &task->allgather_kn.etask); - if (ucc_unlikely(status != UCC_OK)) { - task->super.status = status; - return status; - } - } - if (!UCC_IS_INPLACE(*args) && !USE_CUDA){ - status = new_ucp_tl_self_copy_nb(PTR_OFFSET(args->dst.info.buffer, offset), args->src.info.buffer, - args->src.info.count * ucc_dt_size(args->src.info.datatype), args->dst.info.mem_type, args->src.info.mem_type, - rank, team, task); - if (ucc_unlikely(UCC_OK != status)) { - printf("error knomial line 231\n"); - return status; + if (!UCC_IS_INPLACE(*args)){ + if(USE_CUDA){ + status = ucc_coll_task_get_executor(&task->super, &exec); + if (ucc_unlikely(status != UCC_OK)) { + task->super.status = status; + return status; + } + eargs.task_type = UCC_EE_EXECUTOR_TASK_COPY; + eargs.copy.dst = PTR_OFFSET(args->dst.info.buffer, offset); + eargs.copy.src = args->src.info.buffer; + eargs.copy.len = args->src.info.count * + ucc_dt_size(args->src.info.datatype); + status = ucc_ee_executor_task_post(exec, &eargs, + &task->allgather_kn.etask); + if (ucc_unlikely(status != UCC_OK)) { + task->super.status = status; + return status; + } + } else { + /* Loopback */ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(args->src.info.buffer, args->src.info.count * ucc_dt_size(args->src.info.datatype), + args->src.info.mem_type, rank, team, task),task, enqueue); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(args->dst.info.buffer, offset), args->src.info.count * ucc_dt_size(args->src.info.datatype), + args->dst.info.mem_type, rank, team, task),task, enqueue); } } - - - - - - + } else { ucc_kn_agx_pattern_init(size, rank, radix, args->dst.info.count, &task->allgather_kn.p); @@ -249,8 +254,8 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task) ucc_knomial_pattern_loop_rank(p, rank), p->radix, 0); } +enqueue: task->allgather_kn.sbuf = PTR_OFFSET(args->dst.info.buffer, offset); - return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); } @@ -284,7 +289,6 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init(ucc_base_coll_args_t *coll_args, ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); ucc_rank_t size = UCC_TL_TEAM_SIZE(tl_team); ucc_kn_radix_t radix; - radix = ucc_min(UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.allgather_kn_radix, size); return ucc_tl_ucp_allgather_knomial_init_r(coll_args, team, task_h, radix); } diff --git a/src/components/tl/ucp/allgather/allgather_neighbor.c b/src/components/tl/ucp/allgather/allgather_neighbor.c index 846b6d250f..8a2a4636f0 100644 --- a/src/components/tl/ucp/allgather/allgather_neighbor.c +++ b/src/components/tl/ucp/allgather/allgather_neighbor.c @@ -145,27 +145,27 @@ ucc_status_t ucc_tl_ucp_allgather_neighbor_start(ucc_coll_task_t *coll_task) ucc_rank_t neighbor; void *tmprecv, *tmpsend; + UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_neighbor_start", 0); ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda; - + if (!UCC_IS_INPLACE(TASK_ARGS(task))) { - /* - status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf, + if(!USE_CUDA){ + status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf, data_size, rmem, smem); - if (ucc_unlikely(UCC_OK != status)) { - return status; - } - */ - status = NEW_MEMCPY(USE_CUDA, PTR_OFFSET(rbuf, data_size * trank), sbuf, data_size, rmem, smem, trank, team, task); - if (ucc_unlikely(UCC_OK != status)) { - printf("error neighbor line 162\n"); - return status; + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else { + /* Loopback */ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, tmp); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, tmp); } } - +tmp: if (trank % 2) { neighbor = (trank - 1 + tsize) % tsize; } else { diff --git a/src/components/tl/ucp/allgather/allgather_ring.c b/src/components/tl/ucp/allgather/allgather_ring.c index 08585bb20a..97ac6d74d0 100644 --- a/src/components/tl/ucp/allgather/allgather_ring.c +++ b/src/components/tl/ucp/allgather/allgather_ring.c @@ -96,22 +96,20 @@ ucc_status_t ucc_tl_ucp_allgather_ring_start(ucc_coll_task_t *coll_task) uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda; if (!UCC_IS_INPLACE(TASK_ARGS(task))) { - block = task->allgather_ring.get_send_block(&task->subset, trank, tsize, - 0); - /* - status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block), + block = task->allgather_ring.get_send_block(&task->subset, trank, tsize, 0); + if(USE_CUDA){ + status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block), sbuf, data_size, rmem, smem); - if (ucc_unlikely(UCC_OK != status)) { - return status; - } - */ - status = NEW_MEMCPY(USE_CUDA, PTR_OFFSET(rbuf, data_size * block), sbuf, data_size, rmem, smem, trank, team, task); - if (ucc_unlikely(UCC_OK != status)) { - printf("error ring line 110\n"); - return status; + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else { + /* Loopback */ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, enqueue); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * block), data_size, rmem, trank, team, task),task, enqueue); } } - +enqueue: return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); } diff --git a/src/components/tl/ucp/allgather/allgather_sparbit.c b/src/components/tl/ucp/allgather/allgather_sparbit.c index 7e8486a725..508c839245 100644 --- a/src/components/tl/ucp/allgather/allgather_sparbit.c +++ b/src/components/tl/ucp/allgather/allgather_sparbit.c @@ -131,21 +131,27 @@ ucc_status_t ucc_tl_ucp_allgather_sparbit_start(ucc_coll_task_t *coll_task) task->allgather_sparbit.data_expected = 1; uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda; - + if(trank == 0){ + printf("\nin sparbit using: "); + if(USE_CUDA){ + printf("cuda\n"); + } else { + printf("loop\n"); + } + } if (!UCC_IS_INPLACE(TASK_ARGS(task))) { - /* - status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf, + if(USE_CUDA){ + status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf, data_size, rmem, smem); - if (ucc_unlikely(UCC_OK != status)) { - return status; - } - */ - status = NEW_MEMCPY(USE_CUDA, PTR_OFFSET(rbuf, data_size * trank), sbuf, data_size, rmem, smem, trank, team, task); - if (ucc_unlikely(UCC_OK != status)) { - printf("error bruck line 254\n"); - return status; + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else { + /* Loopback */ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, enqueue); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, enqueue); } } - +enqueue: return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); }