Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint some pthread group work. #239

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions src/qvi-group-pthread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,33 @@
#include "qvi-group-pthread.h"
#include "qvi-utils.h"

qvi_group_pthread_s::qvi_group_pthread_s(
int group_size
) {
const int rc = qvi_new(&thgroup, group_size);
if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error();
}

qvi_group_pthread_s::~qvi_group_pthread_s(void)
{
qvi_delete(&thgroup);
}

int
qvi_group_pthread_s::self(
qvi_group_t **child
qvi_group_t **
) {
constexpr int group_size = 1;
qvi_group_pthread_t *ichild = nullptr;
// Create a group containing a single thread.
const int rc = qvi_new(&ichild, group_size);
if (qvi_unlikely(rc != QV_SUCCESS)) {
qvi_delete(&ichild);
}
*child = ichild;
return rc;
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}

int
qvi_group_pthread_s::split(
int,
int,
qvi_group_t **
qvi_group_s **
) {
// TODO(skg) Test to see if we can do this.
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}

Expand Down
16 changes: 5 additions & 11 deletions src/qvi-group-pthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,9 @@ struct qvi_group_pthread_s : public qvi_group_s {
/** Constructor. */
qvi_group_pthread_s(
int group_size
) {
const int rc = qvi_new(&thgroup, group_size);
if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error();
}
);
/** Destructor. */
virtual ~qvi_group_pthread_s(void)
{
qvi_delete(&thgroup);
}
virtual ~qvi_group_pthread_s(void);

virtual qvi_task_t *
task(void)
Expand Down Expand Up @@ -78,7 +72,7 @@ struct qvi_group_pthread_s : public qvi_group_s {
int,
qvi_group_s **
) {
// TODO(skg) Need to test this.
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}

Expand All @@ -96,7 +90,7 @@ struct qvi_group_pthread_s : public qvi_group_s {
bool *shared,
qvi_bbuff_t ***rxbuffs
) {
return thgroup->gather_bbuffs(
return thgroup->gather(
txbuff, root, shared, rxbuffs
);
}
Expand All @@ -107,7 +101,7 @@ struct qvi_group_pthread_s : public qvi_group_s {
int root,
qvi_bbuff_t **rxbuff
) {
return thgroup->scatter_bbuffs(
return thgroup->scatter(
txbuffs, root, rxbuff
);
}
Expand Down
16 changes: 16 additions & 0 deletions src/qvi-group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ qvi_group_s::thsplit(
return rc;
}

int
qvi_group_s::next_id(
qvi_group_id_t *gid
) {
// Global group ID. Note that we pad its initial value so that other
// infrastructure (e.g., QVI_MPI_GROUP_WORLD) will never equal or exceed
// this value.
static std::atomic<qvi_group_id_t> group_id(64);
if (group_id == UINT64_MAX) {
qvi_log_error("Group ID space exhausted");
return QV_ERR_OOR;
}
*gid = group_id++;
return QV_SUCCESS;
}

/*
* vim: ft=cpp ts=4 sts=4 sw=4 expandtab
*/
13 changes: 1 addition & 12 deletions src/qvi-group.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,7 @@ struct qvi_group_s : qvi_refc_s {
static int
next_id(
qvi_group_id_t *gid
) {
// Global group ID. Note that we pad its initial value so that other
// infrastructure (e.g., QVI_MPI_GROUP_WORLD) will never equal or exceed
// this value.
static std::atomic<qvi_group_id_t> group_id(64);
if (group_id == UINT64_MAX) {
qvi_log_error("Group ID space exhausted");
return QV_ERR_OOR;
}
*gid = group_id++;
return QV_SUCCESS;
}
);
};
typedef struct qvi_group_s qvi_group_t;

Expand Down
2 changes: 1 addition & 1 deletion src/qvi-mpi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ qvi_mpi_group_gather_bbuffs(
if (group_id == root) {
// Zero initialize array of pointers to nullptr.
bbuffs = new qvi_bbuff_t*[group_size]();

// TODO(skg) Use dup.
byte_t *bytepos = allbytes.data();
for (int i = 0; i < group_size; ++i) {
rc = qvi_bbuff_new(&bbuffs[i]);
Expand Down
61 changes: 60 additions & 1 deletion src/qvi-pthread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@

#include "qvi-pthread.h"
#include "qvi-task.h" // IWYU pragma: keep
#include "qvi-bbuff.h"
#include "qvi-utils.h"

qvi_pthread_group_s::qvi_pthread_group_s(
int group_size
) : m_size(group_size)
{
const int rc = pthread_barrier_init(&m_barrier, NULL, group_size);
if (qvi_unlikely(rc != 0)) throw qvi_runtime_error();
}

void *
qvi_pthread_group_s::call_first_from_pthread_create(
void *arg
Expand Down Expand Up @@ -76,6 +83,27 @@ qvi_pthread_group_s::~qvi_pthread_group_s(void)
pthread_barrier_destroy(&m_barrier);
}

int
qvi_pthread_group_s::size(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_size;
}

int
qvi_pthread_group_s::rank(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_tid2rank.at(qvi_gettid());
}

qvi_task_t *
qvi_pthread_group_s::task(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_tid2task.at(qvi_gettid());
}

int
qvi_pthread_group_s::barrier(void)
{
Expand All @@ -86,6 +114,37 @@ qvi_pthread_group_s::barrier(void)
return QV_SUCCESS;
}

int
qvi_pthread_group_s::split(
int,
int,
qvi_pthread_group_s **
) {
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}

int
qvi_pthread_group_s::gather(
qvi_bbuff_t *,
int,
bool *,
qvi_bbuff_t ***
) {
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}

int
qvi_pthread_group_s::scatter(
qvi_bbuff_t **,
int,
qvi_bbuff_t **
) {
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}

/*
* vim: ft=cpp ts=4 sts=4 sw=4 expandtab
*/
70 changes: 22 additions & 48 deletions src/qvi-pthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#define QVI_PTHREAD_H

#include "qvi-common.h"
#include "qvi-utils.h"

typedef void *(*qvi_pthread_routine_fun_ptr_t)(void *);

Expand Down Expand Up @@ -66,11 +65,7 @@ struct qvi_pthread_group_s {
*/
qvi_pthread_group_s(
int group_size
) : m_size(group_size)
{
const int rc = pthread_barrier_init(&m_barrier, NULL, group_size);
if (qvi_unlikely(rc != 0)) throw qvi_runtime_error();
}
);
/**
* This function shall be called by pthread_create() to finish group
* construction. This function is called by the pthreads and NOT their
Expand All @@ -83,60 +78,39 @@ struct qvi_pthread_group_s {
/** Destructor. */
~qvi_pthread_group_s(void);

int
size(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_size;
}
qvi_task_t *
task(void);

int
rank(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_tid2rank.at(qvi_gettid());
}
size(void);

qvi_task_t *
task(void)
{
std::lock_guard<std::mutex> guard(m_mutex);
return m_tid2task.at(qvi_gettid());
}
int
rank(void);

int
barrier(void);

int
create_from_split(
int,
int,
qvi_pthread_group_s **
) {
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}
split(
int color,
int key,
qvi_pthread_group_s **child
);

int
gather_bbuffs(
qvi_bbuff_t *,
int,
bool *,
qvi_bbuff_t ***
) {
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}
gather(
qvi_bbuff_t *txbuff,
int root,
bool *shared,
qvi_bbuff_t ***rxbuffs
);

int
scatter_bbuffs(
qvi_bbuff_t **,
int,
qvi_bbuff_t **
) {
// TODO(skg)
return QV_ERR_NOT_SUPPORTED;
}
scatter(
qvi_bbuff_t **txbuffs,
int root,
qvi_bbuff_t **rxbuff
);
};
typedef struct qvi_pthread_group_s qvi_pthread_group_t;

Expand Down
4 changes: 2 additions & 2 deletions tests/test-pthread-split.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ main(void)
}

pthread_t thid2[nthreads];
for(int i = 0 ; i < nthreads; ++i) {
for (int i = 0 ; i < nthreads; ++i) {
const int ptrc = qv_pthread_create(
&thid2[i], attr, thread_work, &thargs2[i], th_scopes[i]
);
Expand All @@ -177,7 +177,7 @@ main(void)
//fprintf(stdout,"Thread finished with '%s'\n", (char *)ret);
}

/* Clean up */
// Clean up.
rc = qv_pthread_scopes_free(nthreads, th_scopes);
if (rc != QV_SUCCESS) {
ers = "qv_pthread_scope_free() failed";
Expand Down
Loading