Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
74a3bcc
Make thread safe
juliannguyen4 Jun 18, 2025
ea2b7fd
Trying to fix paginate disrupt issue.
Jun 19, 2025
510a314
just add 16 more spots if we run out
juliannguyen4 Jun 20, 2025
e3c7bfb
check not needed
juliannguyen4 Jun 20, 2025
b27f484
use api to get list size
juliannguyen4 Jun 20, 2025
a5ff908
use as_vector
juliannguyen4 Jun 20, 2025
af5f8a2
fix
juliannguyen4 Jun 20, 2025
130a87c
use thread local error to handle python callback raising an exception
juliannguyen4 Jul 1, 2025
505ac35
fix
juliannguyen4 Jul 1, 2025
6aa08d8
fix
juliannguyen4 Jul 1, 2025
65033ab
need to return a reference to as_error*
juliannguyen4 Jul 1, 2025
77b68aa
maybe invalid syntax?
juliannguyen4 Jul 1, 2025
16f4da6
confusing comment
juliannguyen4 Jul 2, 2025
450d8f5
dup comment
juliannguyen4 Jul 2, 2025
4299df1
Merge branch 'dev' into parakh-paginate-tmp
juliannguyen4 Jul 2, 2025
9f4d903
as_vector was appending the ptr's contents and not the ptr itself
juliannguyen4 Jul 2, 2025
472ba84
fix
juliannguyen4 Jul 2, 2025
8dcae5b
clean up..
juliannguyen4 Jul 2, 2025
289c0b8
also clean up.
juliannguyen4 Jul 2, 2025
9c1ea4c
make label more meaningful
juliannguyen4 Jul 2, 2025
ef3fe16
operator precedence
juliannguyen4 Jul 2, 2025
0384f4f
comment
juliannguyen4 Jul 2, 2025
77ab7ed
fix mem leak
juliannguyen4 Jul 2, 2025
74c0377
rm dup init
juliannguyen4 Jul 2, 2025
32dd60f
fix seg fault
juliannguyen4 Jul 2, 2025
93bae35
fix build on windows
juliannguyen4 Jul 2, 2025
826f81d
include pthread_win32 header file.
juliannguyen4 Jul 3, 2025
42e8e73
include pthread.h from src/include, not the system one
juliannguyen4 Jul 3, 2025
36ae621
Revert "include pthread.h from src/include, not the system one"
juliannguyen4 Jul 3, 2025
5c24ace
Revert "include pthread_win32 header file."
juliannguyen4 Jul 3, 2025
5c36d9d
pthread isnt being linked against when compiling the python client
juliannguyen4 Jul 3, 2025
29a216a
fix naming
juliannguyen4 Jul 3, 2025
2d73aec
enable verbose output to show which linked libs are being bundled wit…
juliannguyen4 Jul 3, 2025
a5abd4b
dont override main error if it was set
juliannguyen4 Jul 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-wheels.yml
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ jobs:
auditwheel repair -w {dest_dir} {wheel} &&
auditwheel show {dest_dir}/* &&
rm -rf $WHEEL_DIR
CIBW_REPAIR_WHEEL_COMMAND_WINDOWS: "delvewheel repair --add-path ./aerospike-client-c/vs/x64/Release -w {dest_dir} {wheel}"
CIBW_REPAIR_WHEEL_COMMAND_WINDOWS: "delvewheel repair -vv --add-path ./aerospike-client-c/vs/x64/Release -w {dest_dir} {wheel}"
# We also want to verify the same thing on macos
CIBW_REPAIR_WHEEL_COMMAND_MACOS: >
delocate-wheel --require-archs {delocate_archs} -w {dest_dir} -v {wheel} &&
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
AEROSPIKE_C_TARGET = AEROSPIKE_C_HOME + '/target/Linux-' + machine
elif WINDOWS:
libraries.clear()
libraries.append("pthreadVC2")
extra_compile_args.append("-DAS_SHARED_IMPORT")
include_dirs.append(f"{AEROSPIKE_C_TARGET}/vs/packages/aerospike-client-c-dependencies.{c_client_dependencies_version}/build/native/include")
else:
Expand Down
95 changes: 52 additions & 43 deletions src/main/query/foreach.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
******************************************************************************/
#include <Python.h>
#include <stdbool.h>
#include <pthread.h>

#include <aerospike/aerospike_scan.h>
#include <aerospike/as_error.h>
#include <aerospike/as_query.h>
#include <aerospike/as_partition.h>
#include <aerospike/as_arraylist.h>
#include <aerospike/as_vector.h>
#include <aerospike/aerospike_query.h>

#include "client.h"
Expand All @@ -31,23 +33,23 @@

// Struct for Python User-Data for the Callback
typedef struct {
as_error error;
PyObject *callback;
AerospikeClient *client;
int partition_query;
as_vector thread_errors;
pthread_mutex_t thread_errors_mutex;
} LocalData;

static bool each_result(const as_val *val, void *udata)
{
bool rval = true;
bool retval = true;

if (!val) {
return false;
}

// Extract callback user-data
LocalData *data = (LocalData *)udata;
as_error *err = &data->error;
PyObject *py_callback = data->callback;

// Python Function Arguments and Result Value
Expand All @@ -60,14 +62,14 @@ static bool each_result(const as_val *val, void *udata)
gstate = PyGILState_Ensure();

// Convert as_val to a Python Object
val_to_pyobject(data->client, err, val, &py_result);

// The record could not be converted to a python object
if (!py_result) {
//TBD set error here
// Must release the interpreter lock before returning
PyGILState_Release(gstate);
return true;
// Use local thread error so we don't need to pass the main error to the callback
// We want to avoid resetting the main error in case it was already set by another thread.
as_error thread_err_local;
as_error_init(&thread_err_local);
val_to_pyobject(data->client, &thread_err_local, val, &py_result);

if (thread_err_local.code != AEROSPIKE_OK) {
goto EXIT_CALLBACK;
}

// Build Python Function Arguments
Expand Down Expand Up @@ -101,28 +103,30 @@ static bool each_result(const as_val *val, void *udata)
if (!py_return) {
// an exception was raised, handle it (someday)
// for now, we bail from the loop
as_error_update(err, AEROSPIKE_ERR_CLIENT,
as_error_update(&thread_err_local, AEROSPIKE_ERR_CLIENT,
"Callback function contains an error");
rval = false;
retval = false;
}
else if (PyBool_Check(py_return)) {
if (Py_False == py_return) {
rval = false;
}
else {
rval = true;
}
Py_DECREF(py_return);
else if (py_return == Py_False) {
retval = false;
}
else {
rval = true;
Py_DECREF(py_return);
Py_XDECREF(py_return);

EXIT_CALLBACK:
if (thread_err_local.code != AEROSPIKE_OK) {
pthread_mutex_lock(&data->thread_errors_mutex);
as_error *stored_err_ref = (as_error *)cf_malloc(sizeof(as_error));
as_error_copy(stored_err_ref, &thread_err_local);
as_vector_append(&data->thread_errors, &stored_err_ref);
pthread_mutex_unlock(&data->thread_errors_mutex);

retval = false;
}

// Release Python State
PyGILState_Release(gstate);

return rval;
return retval;
}

PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args,
Expand All @@ -149,10 +153,14 @@ PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args,
data.client = self->client;
data.partition_query = 0;

as_error_init(&data.error);
// Main error
as_error err;
as_error_init(&err);
// Stores errors reported by individual threads when they call the each_result callback
as_vector_init(&data.thread_errors, sizeof(as_error *), 16);
pthread_mutex_init(&data.thread_errors_mutex, NULL);

// Aerospike Client Arguments
as_error err;
as_policy_query query_policy;
as_policy_query *query_policy_p = NULL;

Expand All @@ -165,7 +173,6 @@ PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args,
as_partitions_status *ps = NULL;

// Initialize error
as_error_init(&err);

if (!self || !self->client->as) {
as_error_update(&err, AEROSPIKE_ERR_PARAM, "Invalid aerospike object");
Expand Down Expand Up @@ -214,9 +221,9 @@ PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args,
as_partition_filter_set_partitions(partition_filter_p, ps);
}

aerospike_query_partitions(self->client->as, &data.error,
query_policy_p, &self->query,
partition_filter_p, each_result, &data);
aerospike_query_partitions(self->client->as, &err, query_policy_p,
&self->query, partition_filter_p,
each_result, &data);

if (ps) {
as_partitions_status_release(ps);
Expand All @@ -229,9 +236,11 @@ PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args,

Py_END_ALLOW_THREADS

if (data.error.code != AEROSPIKE_OK) {
as_error_update(&data.error, data.error.code, NULL);
goto CLEANUP;
// Promote any thread-level error if the main error was not set
if (err.code == AEROSPIKE_OK && data.thread_errors.size > 0) {
as_error *vector_item =
(as_error *)as_vector_get_ptr(&data.thread_errors, 0);
as_error_copy(&err, vector_item);
}

CLEANUP:
Expand All @@ -244,15 +253,15 @@ PyObject *AerospikeQuery_Foreach(AerospikeQuery *self, PyObject *args,
}
self->query.apply.arglist = NULL;

if (err.code != AEROSPIKE_OK || data.error.code != AEROSPIKE_OK) {
if (err.code != AEROSPIKE_OK) {
raise_exception_base(&err, Py_None, Py_None, Py_None, Py_None,
Py_None);
}
if (data.error.code != AEROSPIKE_OK) {
raise_exception_base(&data.error, Py_None, Py_None, Py_None,
Py_None, Py_None);
}
for (uint32_t i = 0; i < data.thread_errors.size; ++i) {
void *err_ptr = as_vector_get_ptr(&data.thread_errors, i);
cf_free(err_ptr);
}
as_vector_destroy(&data.thread_errors);
pthread_mutex_destroy(&data.thread_errors_mutex);

if (err.code != AEROSPIKE_OK) {
raise_exception_base(&err, Py_None, Py_None, Py_None, Py_None, Py_None);
return NULL;
}

Expand Down
Loading