Skip to content

Commit

Permalink
Cleanup some pthread code. (#235)
Browse files Browse the repository at this point in the history
Signed-off-by: Samuel K. Gutierrez <[email protected]>
  • Loading branch information
samuelkgutierrez authored Jul 19, 2024
1 parent e848c8b commit 669188e
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 89 deletions.
16 changes: 3 additions & 13 deletions src/qvi-group-pthread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@ qvi_group_pthread_s::self(
) {
constexpr int group_size = 1;
qvi_group_pthread_t *ichild = nullptr;
int rc = qvi_new(&ichild);
if (rc != QV_SUCCESS) goto out;
// Create a group containing a single thread.
rc = qvi_new(&ichild->thgroup, group_size);
out:
if (rc != QV_SUCCESS) {
const int rc = qvi_new(&ichild, group_size);
if (qvi_unlikely(rc != QV_SUCCESS)) {
qvi_delete(&ichild);
}
*child = ichild;
Expand All @@ -38,17 +35,10 @@ qvi_group_pthread_s::split(
int,
qvi_group_t **
) {
// TODO(skg)
// TODO(skg) Test to see if we can do this.
return QV_ERR_NOT_SUPPORTED;
}

qvi_zgroup_pthread_s::qvi_zgroup_pthread_s(
int group_size
) {
int rc = qvi_new(&thgroup, group_size);
if (rc != QV_SUCCESS) throw qvi_runtime_error();
}

/*
* vim: ft=cpp ts=4 sts=4 sw=4 expandtab
*/
18 changes: 8 additions & 10 deletions src/qvi-group-pthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ struct qvi_group_pthread_s : public qvi_group_s {
/** Underlying group instance. */
qvi_pthread_group_t *thgroup = nullptr;
/** Constructor. */
qvi_group_pthread_s(void) = default;
qvi_group_pthread_s(void) = delete;
/** Constructor. */
qvi_group_pthread_s(
int group_size
) {
const int rc = qvi_new(&thgroup, group_size);
if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error();
}
/** Destructor. */
virtual ~qvi_group_pthread_s(void)
{
Expand Down Expand Up @@ -107,15 +114,6 @@ struct qvi_group_pthread_s : public qvi_group_s {
};
typedef qvi_group_pthread_s qvi_group_pthread_t;

struct qvi_zgroup_pthread_s : public qvi_group_pthread_s {
/** Default constructor. */
qvi_zgroup_pthread_s(void) = delete;
/** Constructor. */
qvi_zgroup_pthread_s(
int group_size
);
};

#endif

/*
Expand Down
8 changes: 2 additions & 6 deletions src/qvi-group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,8 @@ qvi_group_s::thsplit(
qvi_group_s **child
) {
qvi_group_pthread_t *ichild = nullptr;
int rc = qvi_new(&ichild);
if (rc != QV_SUCCESS) goto out;

rc = qvi_new(&ichild->thgroup, nthreads);
out:
if (rc != QV_SUCCESS) {
const int rc = qvi_new(&ichild, nthreads);
if (qvi_unlikely(rc != QV_SUCCESS)) {
qvi_delete(&ichild);
}
*child = ichild;
Expand Down
62 changes: 61 additions & 1 deletion src/qvi-pthread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,69 @@
*/

#include "qvi-pthread.h"
#include "qvi-task.h" // IWYU pragma: keep
#include "qvi-bbuff.h"
#include "qvi-utils.h"
#include "qvi-task.h"

void *
qvi_pthread_group_s::call_first_from_pthread_create(
void *arg
) {
// TODO(skg) Cleanup.
auto args = (qvi_pthread_group_pthread_create_args_s *)arg;
auto group = args->group;
auto thread_routine = args->th_routine;
auto th_routine_argp = args->th_routine_argp;
// Let the threads add their TIDs to the vector.
{
std::lock_guard<std::mutex> guard(group->m_mutex);
group->m_tids.push_back(qvi_gettid());
}
// Make sure they all contribute before continuing.
pthread_barrier_wait(&group->m_barrier);
// Elect one thread to be the worker.
bool worker = false;
{
std::lock_guard<std::mutex> guard(group->m_mutex);
worker = group->m_tids.at(0) == qvi_gettid();
}
// The worker populates the TID to rank mapping, while the others wait.
if (worker) {
std::sort(group->m_tids.begin(), group->m_tids.end());

for (int i = 0; i < group->m_size; ++i) {
const pid_t tid = group->m_tids[i];
group->m_tid2rank.insert({tid, i});
}
pthread_barrier_wait(&group->m_barrier);
}
else {
pthread_barrier_wait(&group->m_barrier);
}
// Everyone can now create their task and populate the mapping table.
{
std::lock_guard<std::mutex> guard(group->m_mutex);
qvi_task_t *task = nullptr;
const int rc = qvi_new(&task);
if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error();
group->m_tid2task.insert({qvi_gettid(), task});
}
// Make sure they all finish before continuing.
pthread_barrier_wait(&group->m_barrier);
// Free the provided argument container.
qvi_delete(&args);
// Finally, call the specified thread routine.
return thread_routine(th_routine_argp);
}

qvi_pthread_group_s::~qvi_pthread_group_s(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
for (auto &tt : m_tid2task) {
qvi_delete(&tt.second);
}
pthread_barrier_destroy(&m_barrier);
}

/*
* vim: ft=cpp ts=4 sts=4 sw=4 expandtab
Expand Down
75 changes: 16 additions & 59 deletions src/qvi-pthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#define QVI_PTHREAD_H

#include "qvi-common.h"
#include "qvi-task.h" // IWYU pragma: keep
#include "qvi-utils.h"

// TODO(skg) Rename
Expand All @@ -36,9 +35,9 @@ struct qvi_pthread_group_s {
int m_size = 0;
/** Holds the thread TIDs in this group. */
std::vector<pid_t> m_tids;
/** Holds tid to rank mapping. */
/** Holds TID to rank mapping. */
std::map<pid_t, int> m_tid2rank;
/** Holds tid to task mapping. */
/** Holds TID to task mapping. */
std::map<pid_t, qvi_task_t *> m_tid2task;
/** Used for mutexy things. */
std::mutex m_mutex;
Expand All @@ -47,77 +46,35 @@ struct qvi_pthread_group_s {
public:
/** Constructor. */
qvi_pthread_group_s(void) = delete;
/** Constructor. */
/**
* Constructor. This is called by the parent process to construct the
* maximum amount of infrastructure possible. The rest of group construction
* has to be performed after pthread_create() time. See
* call_first_from_pthread_create() for more details.
*/
qvi_pthread_group_s(
int group_size
) : m_size(group_size)
{
const int rc = pthread_barrier_init(&m_barrier, NULL, group_size);
if (qvi_unlikely(rc != 0)) throw qvi_runtime_error();
}
/** */
/**
* This function shall be called by pthread_create() to finish group
* construction. This function is called by the pthreads and NOT their
* parent.
*/
static void *
call_first_from_pthread_create(
void *arg
) {
// TODO(skg) Cleanup.
auto args = (qvi_pthread_group_pthread_create_args_s *)arg;
auto group = args->group;
auto thread_routine = args->th_routine;
auto th_routine_argp = args->th_routine_argp;
// Let the threads add their TIDs to the vector.
{
std::lock_guard<std::mutex> guard(group->m_mutex);
group->m_tids.push_back(qvi_gettid());
}
// Make sure they all contribute before continuing.
pthread_barrier_wait(&group->m_barrier);
// Elect one thread to be the worker.
bool worker = false;
{
std::lock_guard<std::mutex> guard(group->m_mutex);
worker = group->m_tids.at(0) == qvi_gettid();
}
// The worker populates the TID to rank mapping, while the others wait.
if (worker) {
std::sort(group->m_tids.begin(), group->m_tids.end());

for (int i = 0; i < group->m_size; ++i) {
const pid_t tid = group->m_tids[i];
group->m_tid2rank.insert({tid, i});
}
pthread_barrier_wait(&group->m_barrier);
}
else {
pthread_barrier_wait(&group->m_barrier);
}
// Everyone can now create their task and populate the mapping table.
{
std::lock_guard<std::mutex> guard(group->m_mutex);
qvi_task_t *task = nullptr;
const int rc = qvi_new(&task);
if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error();
group->m_tid2task.insert({qvi_gettid(), task});
}
// Make sure they all finish before continuing.
pthread_barrier_wait(&group->m_barrier);
// Free the provided argument container.
qvi_delete(&args);
// Finally, call the specified thread routine.
return thread_routine(th_routine_argp);
}
);
/** Destructor. */
~qvi_pthread_group_s(void)
{
for (auto &tt : m_tid2task) {
qvi_delete(&tt.second);
}
pthread_barrier_destroy(&m_barrier);
}
~qvi_pthread_group_s(void);

int
size(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_size;
}

Expand Down

0 comments on commit 669188e

Please sign in to comment.