Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: celerity/celerity-runtime
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: d5d2e905e5c0f230ab6f6596acab036e7ff38143
Choose a base ref
..
head repository: celerity/celerity-runtime
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: b88b60fe9033714474fa3a5e58fd369775083973
Choose a head ref
Showing with 133 additions and 130 deletions.
  1. +18 −41 include/divergence_block_chain.h
  2. +9 −70 src/divergence_block_chain.cc
  3. +1 −5 src/runtime.cc
  4. +97 −0 test/divergence_check_test_utils.h
  5. +7 −6 test/divergence_check_tests.cc
  6. +1 −0 test/system/distr_tests.cc
  7. +0 −8 test/test_utils.h
59 changes: 18 additions & 41 deletions include/divergence_block_chain.h
Original file line number Diff line number Diff line change
@@ -6,7 +6,10 @@
#include <vector>

namespace celerity::detail {
// in c++23 replace this with mdspan
/** @brief This class is a wrapper around a 1D vector that allows us to access it as a 2D array.
*
* It is used to send the task hashes to other nodes using MPI while keeping the code simple and readable.
*/
template <typename T>
struct mpi_2d_send_wrapper {
public:
@@ -24,7 +27,10 @@ struct mpi_2d_send_wrapper {
const size_t m_width;
};

// Probably replace this in c++20 with span
/** @brief This class gives a view into a const vector.
*
* It is used to give us the currently unhashed task records while keeping track of the offset and width.
*/
template <typename T>
struct window {
public:
@@ -56,6 +62,12 @@ using task_hash = size_t;
using task_hash_data = mpi_2d_send_wrapper<task_hash>;
using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;

/** @brief This class is the base class for the divergence check.
*
* It is responsible for collecting the task hashes from all nodes and checking for differences -> divergence.
* When a divergence is found, the task record for the diverging task is printed and the program is terminated.
* Additionally it also checks for deadlocks and prints a warning if one is detected.
*/
class abstract_block_chain {
friend struct abstract_block_chain_testspy;

@@ -114,49 +126,13 @@ class abstract_block_chain {
MPI_Comm m_comm;
};

class single_node_test_divergence_block_chain : public abstract_block_chain {
public:
single_node_test_divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_recorder, MPI_Comm comm,
const std::vector<std::reference_wrapper<const std::vector<task_record>>>& other_task_records)
: abstract_block_chain(num_nodes, local_nid, task_recorder, comm), m_other_hashes(other_task_records.size()) {
for(auto& tsk_rcd : other_task_records) {
m_other_task_records.push_back(window<task_record>(tsk_rcd));
}
}

private:
void run() override {}

void divergence_out(const divergence_map& check_map, const int task_num) override;
void allgather_sizes() override;
void allgather_hashes(const int max_size, task_hash_data& data) override;

void dedub_print_task_record(const divergence_map& check_map, const int task_num) const override;

std::vector<std::vector<task_hash>> m_other_hashes;
std::vector<window<task_record>> m_other_task_records;

int m_injected_delete_size = 0;
};

class distributed_test_divergence_block_chain : public abstract_block_chain {
public:
distributed_test_divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_record, MPI_Comm comm)
: abstract_block_chain(num_nodes, local_nid, task_record, comm) {}

private:
void run() override {}

void divergence_out(const divergence_map& check_map, const int task_num) override;
};

class divergence_block_chain : public abstract_block_chain {
public:
void start();
void stop() override;

divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_record, MPI_Comm comm)
: abstract_block_chain(num_nodes, local_nid, task_record, comm) {
divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_record, MPI_Comm comm, bool test_mode = false)
: abstract_block_chain(num_nodes, local_nid, task_record, comm), m_test_mode(test_mode) {
divergence_block_chain::start();
}

@@ -175,5 +151,6 @@ class divergence_block_chain : public abstract_block_chain {

private:
std::thread m_thread;
bool m_test_mode = false;
};
} // namespace celerity::detail
}; // namespace celerity::detail
79 changes: 9 additions & 70 deletions src/divergence_block_chain.cc
Original file line number Diff line number Diff line change
@@ -151,71 +151,6 @@ bool abstract_block_chain::check_for_divergence() {
return false;
}

void single_node_test_divergence_block_chain::divergence_out(const divergence_map& check_map, const int task_num) {
if(m_local_nid == 0) { print_node_divergences(check_map, task_num); }

dedub_print_task_record(check_map, task_num);
}

void single_node_test_divergence_block_chain::allgather_sizes() {
for(size_t i = 0; i < m_num_nodes - 1; ++i) {
auto& other_hashes = m_other_hashes[i];
other_hashes.erase(other_hashes.begin(), other_hashes.begin() + m_injected_delete_size);
m_other_task_records[i].slide(m_injected_delete_size);
}

for(size_t i = 0; i < m_num_nodes - 1; ++i) {
for(size_t j = 0; j < m_other_task_records[i].size(); ++j) {
std::size_t seed = m_other_hashes[i].empty() ? 0 : m_other_hashes[i].back();
celerity::detail::utils::hash_combine(seed, std::hash<task_record>{}(m_other_task_records[i][j]));
m_other_hashes[i].push_back(seed);
}
}

for(size_t i = 1; i < m_num_nodes; ++i) {
m_sizes[i] = static_cast<int>(m_other_hashes[i - 1].size());
}

m_injected_delete_size = *std::min_element(m_sizes.cbegin(), m_sizes.cend());
}

void single_node_test_divergence_block_chain::allgather_hashes(const int max_size, task_hash_data& data) {
auto data_data = data.data();
for(size_t i = 0; i < m_num_nodes - 1; ++i) {
for(int j = 0; j < max_size; ++j) {
auto index = (i + 1) * max_size + j;
if(m_other_hashes[i].size() > static_cast<size_t>(j)) {
data_data[index] = m_other_hashes[i][j];
} else {
data_data[index] = 0;
}
}
}

for(int j = 0; j < max_size; ++j) {
data_data[j] = m_hashes[j];
}
}

void single_node_test_divergence_block_chain::dedub_print_task_record(const divergence_map& check_map, const int task_num) const {
for(auto& [hash, nodes] : check_map) {
if(nodes[0] == m_local_nid) {
print_task_record(check_map, m_task_recorder_window[task_num], hash);
} else {
print_task_record(check_map, m_other_task_records[nodes[0] - 1][task_num], hash);
}
}
}

void distributed_test_divergence_block_chain::divergence_out(const divergence_map& check_map, const int task_num) {
if(m_local_nid == 0) { print_node_divergences(check_map, task_num); }

// sleep for local_nid * 100 ms such that we have a no lock synchronized output
std::this_thread::sleep_for(std::chrono::milliseconds(m_local_nid * 100));
dedub_print_task_record(check_map, task_num);
}


void divergence_block_chain::divergence_out(const divergence_map& check_map, const int task_num) {
if(m_local_nid == 0) { print_node_divergences(check_map, task_num); }

@@ -224,14 +159,18 @@ void divergence_block_chain::divergence_out(const divergence_map& check_map, con

dedub_print_task_record(check_map, task_num);

MPI_Barrier(m_comm);
if(!m_test_mode) {
MPI_Barrier(m_comm);

throw std::runtime_error("Divergence in task graph detected");
throw std::runtime_error("Divergence in task graph detected");
}
}

void divergence_block_chain::start() {
stop();
m_thread = std::thread(&divergence_block_chain::run, this);
if(!m_test_mode) {
stop();
m_thread = std::thread(&divergence_block_chain::run, this);
}
m_is_running = true;
}

@@ -245,6 +184,6 @@ void divergence_block_chain::run() {

void divergence_block_chain::stop() {
m_is_running = false;
if(m_thread.joinable()) { m_thread.join(); }
if(!m_test_mode && m_thread.joinable()) { m_thread.join(); }
}
} // namespace celerity::detail
6 changes: 1 addition & 5 deletions src/runtime.cc
Original file line number Diff line number Diff line change
@@ -162,11 +162,7 @@ namespace detail {
if(m_cfg->is_recording()) {
MPI_Comm comm = nullptr;
MPI_Comm_dup(MPI_COMM_WORLD, &comm);
if(m_test_mode) {
m_divergence_check = std::make_unique<distributed_test_divergence_block_chain>(m_num_nodes, m_local_nid, m_task_recorder->get_tasks(), comm);
} else {
m_divergence_check = std::make_unique<divergence_block_chain>(m_num_nodes, m_local_nid, m_task_recorder->get_tasks(), comm);
}
m_divergence_check = std::make_unique<divergence_block_chain>(m_num_nodes, m_local_nid, m_task_recorder->get_tasks(), comm, m_test_mode);
}

CELERITY_INFO("Celerity runtime version {} running on {}. PID = {}, build type = {}, {}", get_version_string(), get_sycl_version(), get_pid(),
97 changes: 97 additions & 0 deletions test/divergence_check_test_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#pragma once

#include "divergence_block_chain.h"

using namespace celerity;
using namespace celerity::detail;

namespace celerity {
namespace detail {
struct abstract_block_chain_testspy {
static bool call_check_for_divergence(abstract_block_chain& div_test) { return div_test.check_for_divergence(); }

static void set_last_cleared(abstract_block_chain& div_test, std::chrono::time_point<std::chrono::steady_clock> time) {
div_test.m_last_cleared = time;
}
};
} // namespace detail
namespace test_utils {
class single_node_test_divergence_block_chain : public celerity::detail::abstract_block_chain {
public:
single_node_test_divergence_block_chain(size_t num_nodes, node_id local_nid, const std::vector<task_record>& task_recorder, MPI_Comm comm,
const std::vector<std::reference_wrapper<const std::vector<task_record>>>& other_task_records)
: abstract_block_chain(num_nodes, local_nid, task_recorder, comm), m_other_hashes(other_task_records.size()) {
for(auto& tsk_rcd : other_task_records) {
m_other_task_records.push_back(window<task_record>(tsk_rcd));
}
}

private:
void run() override {}

void divergence_out(const divergence_map& check_map, const int task_num) override {
if(m_local_nid == 0) { print_node_divergences(check_map, task_num); }

dedub_print_task_record(check_map, task_num);
}

void allgather_sizes() override {
for(size_t i = 0; i < m_num_nodes - 1; ++i) {
auto& other_hashes = m_other_hashes[i];
other_hashes.erase(other_hashes.begin(), other_hashes.begin() + m_injected_delete_size);
m_other_task_records[i].slide(m_injected_delete_size);
}

for(size_t i = 0; i < m_num_nodes - 1; ++i) {
for(size_t j = 0; j < m_other_task_records[i].size(); ++j) {
std::size_t seed = m_other_hashes[i].empty() ? 0 : m_other_hashes[i].back();
celerity::detail::utils::hash_combine(seed, std::hash<task_record>{}(m_other_task_records[i][j]));
m_other_hashes[i].push_back(seed);
}
}

for(size_t i = 1; i < m_num_nodes; ++i) {
m_sizes[i] = static_cast<int>(m_other_hashes[i - 1].size());
}

m_injected_delete_size = *std::min_element(m_sizes.cbegin(), m_sizes.cend());
}

void allgather_hashes(const int max_size, task_hash_data& data) override {
auto data_data = data.data();
for(size_t i = 0; i < m_num_nodes - 1; ++i) {
for(int j = 0; j < max_size; ++j) {
auto index = (i + 1) * max_size + j;
if(m_other_hashes[i].size() > static_cast<size_t>(j)) {
data_data[index] = m_other_hashes[i][j];
} else {
data_data[index] = 0;
}
}
}

for(int j = 0; j < max_size; ++j) {
data_data[j] = m_hashes[j];
}
}


void dedub_print_task_record(const divergence_map& check_map, const int task_num) const override {
for(auto& [hash, nodes] : check_map) {
if(nodes[0] == m_local_nid) {
print_task_record(check_map, m_task_recorder_window[task_num], hash);
} else {
print_task_record(check_map, m_other_task_records[nodes[0] - 1][task_num], hash);
}
}
}

std::vector<std::vector<task_hash>> m_other_hashes;
std::vector<window<task_record>> m_other_task_records;

int m_injected_delete_size = 0;
};


} // namespace test_utils
} // namespace celerity
13 changes: 7 additions & 6 deletions test/divergence_check_tests.cc
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@

#include <celerity.h>

#include "divergence_check_test_utils.h"
#include "log_test_utils.h"
#include "test_utils.h"

@@ -13,7 +14,7 @@ using namespace celerity::detail;
using namespace celerity::test_utils;
using celerity::access::fixed;

TEST_CASE("Test diverged task execution on device tasks", "[divergence]") {
TEST_CASE("test diverged task execution on device tasks", "[divergence]") {
using namespace cl::sycl::access;

auto tt = test_utils::task_test_context{};
@@ -36,7 +37,7 @@ TEST_CASE("Test diverged task execution on device tasks", "[divergence]") {
CHECK_THAT(log_capture.get_log(), Catch::Matchers::ContainsSubstring("Divergence detected"));
}

TEST_CASE("Test divergence free task execution on device", "[divergence]") {
TEST_CASE("test divergence free task execution on device", "[divergence]") {
using namespace cl::sycl::access;

auto tt = test_utils::task_test_context{};
@@ -71,7 +72,7 @@ TEST_CASE("Test divergence free task execution on device", "[divergence]") {
CHECK(abstract_block_chain_testspy::call_check_for_divergence(div_test));
}

TEST_CASE("Test diverged task execution on host task", "[divergence]") {
TEST_CASE("test diverged task execution on host task", "[divergence]") {
using namespace cl::sycl::access;

auto tt = test_utils::task_test_context{};
@@ -94,7 +95,7 @@ TEST_CASE("Test diverged task execution on host task", "[divergence]") {
CHECK_THAT(log_capture.get_log(), Catch::Matchers::ContainsSubstring("Divergence detected"));
}

TEST_CASE("Test divergence free task execution on host task", "[divergence]") {
TEST_CASE("test divergence free task execution on host task", "[divergence]") {
using namespace cl::sycl::access;

auto tt = test_utils::task_test_context{};
@@ -123,7 +124,7 @@ TEST_CASE("Test divergence free task execution on host task", "[divergence]") {
CHECK(abstract_block_chain_testspy::call_check_for_divergence(div_test));
}

TEST_CASE("Test divergence warning for tasks that are stale longer than 10 seconds", "[divergence]") {
TEST_CASE("test divergence warning for tasks that are stale longer than 10 seconds", "[divergence]") {
using namespace cl::sycl::access;

auto tt = test_utils::task_test_context{};
@@ -156,7 +157,7 @@ size_t get_hash(const std::vector<task_record>& tasks, size_t start, size_t end)
return seed;
}

TEST_CASE("Test correct output of 3 different divergent tasks", "[divergence]") {
TEST_CASE("test correct output of 3 different divergent tasks", "[divergence]") {
using namespace cl::sycl::access;

auto tt = test_utils::task_test_context{};
1 change: 1 addition & 0 deletions test/system/distr_tests.cc
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@

#include <celerity.h>

#include "../divergence_check_test_utils.h"
#include "../log_test_utils.h"

namespace celerity {
8 changes: 0 additions & 8 deletions test/test_utils.h
Original file line number Diff line number Diff line change
@@ -111,14 +111,6 @@ namespace detail {
}
return false;
}

struct abstract_block_chain_testspy {
static bool call_check_for_divergence(abstract_block_chain& div_test) { return div_test.check_for_divergence(); }

static void set_last_cleared(abstract_block_chain& div_test, std::chrono::time_point<std::chrono::steady_clock> time) {
div_test.m_last_cleared = time;
}
};
} // namespace detail

namespace test_utils {