Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions gloo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ list(APPEND GLOO_SRCS
"${CMAKE_CURRENT_SOURCE_DIR}/allgatherv.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/allreduce.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/allreduce_local.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/alltoall.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/alltoallv.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/barrier.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/broadcast.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/context.cc"
Expand All @@ -32,6 +34,8 @@ list(APPEND GLOO_HDRS
"${CMAKE_CURRENT_SOURCE_DIR}/allreduce_local.h"
"${CMAKE_CURRENT_SOURCE_DIR}/allreduce_ring.h"
"${CMAKE_CURRENT_SOURCE_DIR}/allreduce_ring_chunked.h"
"${CMAKE_CURRENT_SOURCE_DIR}/alltoall.h"
"${CMAKE_CURRENT_SOURCE_DIR}/alltoallv.h"
"${CMAKE_CURRENT_SOURCE_DIR}/barrier.h"
"${CMAKE_CURRENT_SOURCE_DIR}/barrier_all_to_all.h"
"${CMAKE_CURRENT_SOURCE_DIR}/barrier_all_to_one.h"
Expand Down
56 changes: 56 additions & 0 deletions gloo/alltoall.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright (c) 2018-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "gloo/alltoall.h"

#include <cstring>

#include "gloo/common/logging.h"
#include "gloo/types.h"

namespace gloo {

void alltoall(AlltoallOptions& opts) {
const auto& context = opts.context;
transport::UnboundBuffer* in = opts.in.get();
transport::UnboundBuffer* out = opts.out.get();
const auto slot = Slot::build(kAlltoallSlotPrefix, opts.tag);

// Sanity checks.
// Number of elements should be evenly split in input and output buffers.
GLOO_ENFORCE(opts.elementSize > 0);
GLOO_ENFORCE(in != nullptr);
GLOO_ENFORCE(out != nullptr);
GLOO_ENFORCE(in->size % context->size == 0);
GLOO_ENFORCE(in->size == out->size);

size_t chunkSize = in->size / context->size;
int myRank = context->rank;
int worldSize = context->size;

// Local copy.
memcpy(
static_cast<char*>(out->ptr) + myRank * chunkSize,
static_cast<char*>(in->ptr) + myRank * chunkSize,
chunkSize);

// Remote copy.
for (int i = 1; i < worldSize; i++) {
int sendRank = (myRank + i) % worldSize;
int recvRank = (myRank + worldSize - i) % worldSize;
in->send(sendRank, slot, sendRank * chunkSize, chunkSize);
out->recv(recvRank, slot, recvRank * chunkSize, chunkSize);
}

for (int i = 1; i < worldSize; i++) {
in->waitSend(opts.timeout);
out->waitRecv(opts.timeout);
}
}

} // namespace gloo
75 changes: 75 additions & 0 deletions gloo/alltoall.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Copyright (c) 2018-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include "gloo/common/logging.h"
#include "gloo/context.h"
#include "gloo/transport/unbound_buffer.h"

namespace gloo {

class AlltoallOptions {
public:
explicit AlltoallOptions(const std::shared_ptr<Context>& context)
: context(context), timeout(context->getTimeout()) {}

template <typename T>
void setInput(std::unique_ptr<transport::UnboundBuffer> buf) {
elementSize = sizeof(T);
in = std::move(buf);
}

template <typename T>
void setInput(T* ptr, size_t elements) {
elementSize = sizeof(T);
in = context->createUnboundBuffer(ptr, elements * sizeof(T));
}

template <typename T>
void setOutput(std::unique_ptr<transport::UnboundBuffer> buf) {
elementSize = sizeof(T);
out = std::move(buf);
}

template <typename T>
void setOutput(T* ptr, size_t elements) {
elementSize = sizeof(T);
out = context->createUnboundBuffer(ptr, elements * sizeof(T));
}

void setTag(uint32_t tag) {
this->tag = tag;
}

void setTimeout(std::chrono::milliseconds timeout) {
GLOO_ENFORCE(timeout.count() > 0);
this->timeout = timeout;
}

protected:
std::shared_ptr<Context> context;
std::unique_ptr<transport::UnboundBuffer> in;
std::unique_ptr<transport::UnboundBuffer> out;

// Number of bytes per element.
size_t elementSize = 0;

// Tag for this operation.
// Must be unique across operations executing in parallel.
uint32_t tag = 0;

// End-to-end timeout for this operation.
std::chrono::milliseconds timeout;

friend void alltoall(AlltoallOptions&);
};

void alltoall(AlltoallOptions& opts);

} // namespace gloo
163 changes: 163 additions & 0 deletions gloo/alltoallv.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/**
* Copyright (c) 2018-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#include "gloo/alltoallv.h"

#include <cstring>
#include <numeric>

#include "gloo/common/logging.h"
#include "gloo/types.h"

namespace gloo {

static void splitOffsetsAndLengths(
const std::vector<int64_t>& elementsPerRank,
size_t elementSize,
std::vector<size_t>& offsets,
std::vector<size_t>& lengths) {
size_t offset = 0;
for (size_t elements : elementsPerRank) {
size_t length = elements * elementSize;
offsets.push_back(offset);
lengths.push_back(length);
offset += length;
}
}

void AlltoallvOptions::setElementSize(size_t elementSize) {
if (this->elementSize == 0) {
this->elementSize = elementSize;
} else {
GLOO_ENFORCE_EQ(
elementSize,
this->elementSize,
"Element size does not match existing value. ",
"Please double check that the input and output types match.");
}
}

void AlltoallvOptions::setInput(
std::unique_ptr<transport::UnboundBuffer> buf,
std::vector<int64_t> elementsPerRank,
size_t elementSize) {
const auto totalElements = std::accumulate(
elementsPerRank.begin(), elementsPerRank.end(), size_t(0));
this->setElementSize(elementSize);
GLOO_ENFORCE_EQ(elementsPerRank.size(), context->size);
this->inOffsetPerRank.reserve(elementsPerRank.size());
this->inLengthPerRank.reserve(elementsPerRank.size());
splitOffsetsAndLengths(
elementsPerRank,
elementSize,
this->inOffsetPerRank,
this->inLengthPerRank);
GLOO_ENFORCE_EQ(totalElements * elementSize, buf->size);
this->in = std::move(buf);
}

void AlltoallvOptions::setInput(
void* ptr,
std::vector<int64_t> elementsPerRank,
size_t elementSize) {
const auto totalElements = std::accumulate(
elementsPerRank.begin(), elementsPerRank.end(), size_t(0));
this->setElementSize(elementSize);
GLOO_ENFORCE_EQ(elementsPerRank.size(), context->size);
this->inOffsetPerRank.reserve(elementsPerRank.size());
this->inLengthPerRank.reserve(elementsPerRank.size());
splitOffsetsAndLengths(
elementsPerRank,
elementSize,
this->inOffsetPerRank,
this->inLengthPerRank);
this->in = context->createUnboundBuffer(ptr, totalElements * elementSize);
}

void AlltoallvOptions::setOutput(
std::unique_ptr<transport::UnboundBuffer> buf,
std::vector<int64_t> elementsPerRank,
size_t elementSize) {
const auto totalElements = std::accumulate(
elementsPerRank.begin(), elementsPerRank.end(), size_t(0));
this->setElementSize(elementSize);
GLOO_ENFORCE_EQ(elementsPerRank.size(), context->size);
this->outOffsetPerRank.reserve(elementsPerRank.size());
this->outLengthPerRank.reserve(elementsPerRank.size());
splitOffsetsAndLengths(
elementsPerRank,
elementSize,
this->outOffsetPerRank,
this->outLengthPerRank);
GLOO_ENFORCE_EQ(totalElements * elementSize, buf->size);
this->out = std::move(buf);
}

void AlltoallvOptions::setOutput(
void* ptr,
std::vector<int64_t> elementsPerRank,
size_t elementSize) {
const auto totalElements = std::accumulate(
elementsPerRank.begin(), elementsPerRank.end(), size_t(0));
this->setElementSize(elementSize);
GLOO_ENFORCE_EQ(elementsPerRank.size(), context->size);
this->outOffsetPerRank.reserve(elementsPerRank.size());
this->outLengthPerRank.reserve(elementsPerRank.size());
splitOffsetsAndLengths(
elementsPerRank,
elementSize,
this->outOffsetPerRank,
this->outLengthPerRank);
this->out = context->createUnboundBuffer(ptr, totalElements * elementSize);
}

void alltoallv(AlltoallvOptions& opts) {
const auto& context = opts.context;
transport::UnboundBuffer* in = opts.in.get();
transport::UnboundBuffer* out = opts.out.get();
std::vector<size_t>& inOffsetPerRank = opts.inOffsetPerRank;
std::vector<size_t>& inLengthPerRank = opts.inLengthPerRank;
std::vector<size_t>& outOffsetPerRank = opts.outOffsetPerRank;
std::vector<size_t>& outLengthPerRank = opts.outLengthPerRank;
const auto slot = Slot::build(kAlltoallSlotPrefix, opts.tag);

// Sanity checks.
GLOO_ENFORCE(opts.elementSize > 0);
GLOO_ENFORCE(in != nullptr);
GLOO_ENFORCE(out != nullptr);

int myRank = context->rank;
int worldSize = context->size;

// Local copy.
GLOO_ENFORCE(inLengthPerRank[myRank] == outLengthPerRank[myRank]);
size_t myInOffset = inOffsetPerRank[myRank];
size_t myOutOffset = outOffsetPerRank[myRank];
size_t myChunkSize = inLengthPerRank[myRank];
memcpy(
static_cast<char*>(out->ptr) + myOutOffset,
static_cast<char*>(in->ptr) + myInOffset,
myChunkSize);

// Remote copy.
for (int i = 1; i < worldSize; i++) {
int sendRank = (myRank + i) % worldSize;
int recvRank = (myRank + worldSize - i) % worldSize;
in->send(
sendRank, slot, inOffsetPerRank[sendRank], inLengthPerRank[sendRank]);
out->recv(
recvRank, slot, outOffsetPerRank[recvRank], outLengthPerRank[recvRank]);
}

for (int i = 1; i < worldSize; i++) {
in->waitSend(opts.timeout);
out->waitRecv(opts.timeout);
}
}

} // namespace gloo
Loading