Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/include/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,9 @@ PyObject *AerospikeQuery_Get_Partitions_status(AerospikeQuery *self);
PyObject *StoreUnicodePyObject(AerospikeQuery *self, PyObject *obj);

int64_t pyobject_to_int64(PyObject *py_obj);

int AerospikeQuery_Where_Add(AerospikeClient *client, as_query *query,
PyObject *py_ctx, as_predicate_type predicate,
as_index_datatype in_datatype, PyObject *py_bin,
PyObject *py_val1, PyObject *py_val2,
int index_type, as_error *err);
200 changes: 2 additions & 198 deletions src/main/client/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,203 +47,7 @@ AerospikeQuery *AerospikeClient_Query(AerospikeClient *self, PyObject *args,
{
return AerospikeQuery_New(self, args, kwds);
}
static int query_where_add(as_query **query, as_predicate_type predicate,
as_index_datatype in_datatype, PyObject *py_bin,
PyObject *py_val1, PyObject *py_val2, int index_type,
as_error *err)

{
int64_t min;
int64_t max;
char *val = NULL, *bin = NULL;
PyObject *py_ubin = NULL;
switch (predicate) {
case AS_PREDICATE_EQUAL: {
if (in_datatype == AS_INDEX_STRING) {
if (PyUnicode_Check(py_bin)) {
py_ubin = PyUnicode_AsUTF8String(py_bin);
bin = PyBytes_AsString(py_ubin);
}
else if (PyByteArray_Check(py_bin)) {
bin = PyByteArray_AsString(py_bin);
}
else {
as_error_update(err, AEROSPIKE_ERR_PARAM,
"Bin must be a string or unicode");
return 1;
}

if (PyUnicode_Check(py_val1)) {
val = strdup((char *)PyUnicode_AsUTF8(py_val1));
}
else {
as_error_update(
err, AEROSPIKE_ERR_PARAM,
"Comparison value for string equality must be a string");
return 1;
}

as_query_where_init(*query, 1);
if (index_type == 0) {
as_query_where(*query, bin, as_equals(STRING, val));
}
else if (index_type == 1) {
as_query_where(*query, bin, as_contains(LIST, STRING, val));
}
else if (index_type == 2) {
as_query_where(*query, bin, as_contains(MAPKEYS, STRING, val));
}
else if (index_type == 3) {
as_query_where(*query, bin,
as_contains(MAPVALUES, STRING, val));
}
else {
as_error_update(err, AEROSPIKE_ERR_PARAM,
"Invalid query index type");
return 1;
}
if (py_ubin) {
Py_DECREF(py_ubin);
py_ubin = NULL;
}
}
else if (in_datatype == AS_INDEX_NUMERIC) {
if (PyUnicode_Check(py_bin)) {
py_ubin = PyUnicode_AsUTF8String(py_bin);
bin = PyBytes_AsString(py_ubin);
}
else if (PyByteArray_Check(py_bin)) {
bin = PyByteArray_AsString(py_bin);
}
else {
as_error_update(err, AEROSPIKE_ERR_PARAM,
"Bin must be a string or unicode");
return 1;
}
int64_t val = pyobject_to_int64(py_val1);

as_query_where_init(*query, 1);
if (index_type == 0) {
as_query_where(*query, bin, as_equals(NUMERIC, val));
}
else if (index_type == 1) {
as_query_where(*query, bin, as_contains(LIST, NUMERIC, val));
}
else if (index_type == 2) {
as_query_where(*query, bin, as_contains(MAPKEYS, NUMERIC, val));
}
else if (index_type == 3) {
as_query_where(*query, bin,
as_contains(MAPVALUES, NUMERIC, val));
}
else {
as_error_update(err, AEROSPIKE_ERR_PARAM,
"Invalid query index types");
return 1;
}
if (py_ubin) {
Py_DECREF(py_ubin);
py_ubin = NULL;
}
}
else {
// If it ain't expected, raise and error
as_error_update(
err, AEROSPIKE_ERR_PARAM,
"predicate 'equals' expects a string or integer value.");
return 1;
}

break;
}
case AS_PREDICATE_RANGE: {
if (in_datatype == AS_INDEX_NUMERIC) {
if (PyUnicode_Check(py_bin)) {
py_ubin = PyUnicode_AsUTF8String(py_bin);
bin = PyBytes_AsString(py_ubin);
}
else if (PyByteArray_Check(py_bin)) {
bin = PyByteArray_AsString(py_bin);
}
else {
as_error_update(err, AEROSPIKE_ERR_PARAM,
"Bin must be a string or unicode");
return 1;
}

if (py_val1 == Py_None || py_val2 == Py_None) {
Py_XDECREF(py_ubin);
as_error_update(
err, AEROSPIKE_ERR_PARAM,
"Min and max must be provided for a range query");
return 1;
}
if (PyLong_Check(py_val1)) {
min = pyobject_to_int64(py_val1);
}
else {
Py_XDECREF(py_ubin);
as_error_update(err, AEROSPIKE_ERR_PARAM,
"Min value must be an integer or long");
return 1;
}

if (PyLong_Check(py_val2)) {
max = pyobject_to_int64(py_val2);
}
else {
Py_XDECREF(py_ubin);
as_error_update(err, AEROSPIKE_ERR_PARAM,
"Max value must be an integer or long");
return 1;
}

as_query_where_init(*query, 1);
if (index_type == 0) {
as_query_where(*query, bin,
as_range(DEFAULT, NUMERIC, min, max));
}
else if (index_type == 1) {
as_query_where(*query, bin, as_range(LIST, NUMERIC, min, max));
}
else if (index_type == 2) {
as_query_where(*query, bin,
as_range(MAPKEYS, NUMERIC, min, max));
}
else if (index_type == 3) {
as_query_where(*query, bin,
as_range(MAPVALUES, NUMERIC, min, max));
}
else {
Py_XDECREF(py_ubin);
return 1;
}
if (py_ubin) {
Py_DECREF(py_ubin);
py_ubin = NULL;
}
}
else if (in_datatype == AS_INDEX_STRING) {
as_error_update(err, AEROSPIKE_ERR_PARAM,
"Range predicate not supported for strings");
return 1;
}
else {
// If it ain't right, raise and error
as_error_update(err, AEROSPIKE_ERR_PARAM,
"predicate 'between' expects two integer values.");
return 1;
}
break;
}
default: {
// If it ain't supported, raise and error
as_error_update(err, AEROSPIKE_ERR_PARAM, "unknown predicate type");
return 1;
}
}
return 0;
}
/**
* Queries a set in the Aerospike DB and applies UDF on it.
*
Expand Down Expand Up @@ -408,8 +212,8 @@ static PyObject *AerospikeClient_QueryApply_Invoke(
goto CLEANUP;
}

rc = query_where_add(
&query_ptr, (as_predicate_type)op, op_data,
rc = AerospikeQuery_Where_Add(
self, query_ptr, NULL, (as_predicate_type)op, op_data,
size > 2 ? PyTuple_GetItem(py_predicate, 2) : Py_None,
size > 3 ? PyTuple_GetItem(py_predicate, 3) : Py_None,
size > 4 ? PyTuple_GetItem(py_predicate, 4) : Py_None,
Expand Down
44 changes: 21 additions & 23 deletions src/main/query/where.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@ int64_t pyobject_to_int64(PyObject *py_obj)
}

// py_bin, py_val1, pyval2 are guaranteed to be non-NULL
static int AerospikeQuery_Where_Add(AerospikeQuery *self, PyObject *py_ctx,
as_predicate_type predicate,
as_index_datatype in_datatype,
PyObject *py_bin, PyObject *py_val1,
PyObject *py_val2, int index_type)
int AerospikeQuery_Where_Add(AerospikeClient *client, as_query *query,
PyObject *py_ctx, as_predicate_type predicate,
as_index_datatype in_datatype, PyObject *py_bin,
PyObject *py_val1, PyObject *py_val2,
int index_type, as_error *err)
{
as_error err;
as_cdt_ctx *pctx = NULL;
bool ctx_in_use = false;

Expand All @@ -59,9 +58,9 @@ static int AerospikeQuery_Where_Add(AerospikeQuery *self, PyObject *py_ctx,
memset(&static_pool, 0, sizeof(static_pool));
pctx = cf_malloc(sizeof(as_cdt_ctx));
memset(pctx, 0, sizeof(as_cdt_ctx));
if (get_cdt_ctx(self->client, &err, pctx, py_ctx, &ctx_in_use,
&static_pool, SERIALIZER_PYTHON) != AEROSPIKE_OK) {
return err.code;
if (get_cdt_ctx(client, err, pctx, py_ctx, &ctx_in_use, &static_pool,
SERIALIZER_PYTHON) != AEROSPIKE_OK) {
return err->code;
}
}

Expand Down Expand Up @@ -154,43 +153,42 @@ static int AerospikeQuery_Where_Add(AerospikeQuery *self, PyObject *py_ctx,
// Blobs are handled separately below, so we don't need to use the void* pointer
}

as_query_where_init(&self->query, 1);
as_query_where_init(query, 1);

if (predicate == AS_PREDICATE_EQUAL && in_datatype == AS_INDEX_BLOB) {
// We don't call as_blob_contains() directly because we can't pass in index_type as a parameter
as_query_where_with_ctx(&self->query, bin, pctx, predicate, index_type,
as_query_where_with_ctx(query, bin, pctx, predicate, index_type,
AS_INDEX_BLOB, val1_bytes, bytes_size, true);
}
else if (in_datatype == AS_INDEX_NUMERIC ||
in_datatype == AS_INDEX_STRING ||
in_datatype == AS_INDEX_GEO2DSPHERE) {
if (predicate == AS_PREDICATE_RANGE &&
in_datatype == AS_INDEX_NUMERIC) {
as_query_where_with_ctx(&self->query, bin, pctx, predicate,
index_type, in_datatype, val1_int,
val2_int);
as_query_where_with_ctx(query, bin, pctx, predicate, index_type,
in_datatype, val1_int, val2_int);
}
else {
as_query_where_with_ctx(&self->query, bin, pctx, predicate,
index_type, in_datatype, val1);
as_query_where_with_ctx(query, bin, pctx, predicate, index_type,
in_datatype, val1);
}

if (in_datatype == AS_INDEX_STRING ||
in_datatype == AS_INDEX_GEO2DSPHERE) {
self->query.where.entries[0].value.string_val._free = true;
query->where.entries[0].value.string_val._free = true;
}
}
else {
// If it ain't supported, raise and error
as_error_update(&err, AEROSPIKE_ERR_PARAM, "unknown predicate type");
as_error_update(err, AEROSPIKE_ERR_PARAM, "unknown predicate type");
PyObject *py_err = NULL;
error_to_pyobject(&err, &py_err);
error_to_pyobject(err, &py_err);
PyErr_SetObject(PyExc_Exception, py_err);
goto CLEANUP_ON_ERROR2;
}

if (ctx_in_use) {
self->query.where.entries[0].ctx_free = true;
query->where.entries[0].ctx_free = true;
}

return 0;
Expand Down Expand Up @@ -338,9 +336,9 @@ AerospikeQuery *AerospikeQuery_Where_Invoke(AerospikeQuery *self,
index_type = AS_INDEX_TYPE_DEFAULT;
}

int rc =
AerospikeQuery_Where_Add(self, py_ctx, predicate_type, index_datatype,
py_bin, py_val1, py_val2, index_type);
int rc = AerospikeQuery_Where_Add(self->client, &self->query, py_ctx,
predicate_type, index_datatype, py_bin,
py_val1, py_val2, index_type, &err);
/* Failed to add the predicate for some reason */
if (rc != 0) {
as_error_update(&err, AEROSPIKE_ERR_PARAM, "Failed to add predicate");
Expand Down
Loading