Skip to content

[CLIENT-3411] Refactor code that initializes transaction-level as_policy_batch policy using a Python dictionary #756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: dev
Choose a base branch
from
10 changes: 5 additions & 5 deletions src/include/policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,11 @@ as_status pyobject_to_policy_operate(AerospikeClient *self, as_error *err,
as_policy_operate *config_operate_policy,
as_exp *exp_list, as_exp **exp_list_p);

as_status pyobject_to_policy_batch(AerospikeClient *self, as_error *err,
PyObject *py_policy, as_policy_batch *policy,
as_policy_batch **policy_p,
as_policy_batch *config_batch_policy,
as_exp *exp_list, as_exp **exp_list_p);
as_status as_policy_batch_init_and_set_from_pyobject(
AerospikeClient *self, as_error *err, PyObject *py_policy,
as_policy_batch *policy, as_policy_batch **policy_p,
as_policy_batch *config_batch_policy, as_exp *exp_list,
as_exp **exp_list_p);

as_status pyobject_to_map_policy(as_error *err, PyObject *py_policy,
as_map_policy *policy);
Expand Down
4 changes: 2 additions & 2 deletions src/main/client/batch_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ static PyObject *AerospikeClient_Batch_Apply_Invoke(
sizeof(as_key) * processed_key_count);

if (py_policy_batch) {
if (pyobject_to_policy_batch(
self, err, py_policy_batch, &policy_batch, &policy_batch_p,
if (as_policy_batch_init_and_set_from_pyobject(
self, err, py_policy_batch, &policy_batch,
&self->as->config.policies.batch, &batch_exp_list,
&batch_exp_list_p) != AEROSPIKE_OK) {
goto CLEANUP;
Expand Down
2 changes: 1 addition & 1 deletion src/main/client/batch_operate.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ static PyObject *AerospikeClient_Batch_Operate_Invoke(
sizeof(as_key) * processed_key_count);

if (py_policy_batch) {
if (pyobject_to_policy_batch(
if (as_policy_batch_init_and_set_from_pyobject(
self, err, py_policy_batch, &policy_batch, &policy_batch_p,
&self->as->config.policies.batch, &batch_exp_list,
&batch_exp_list_p) != AEROSPIKE_OK) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/client/batch_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ PyObject *AerospikeClient_BatchRead(AerospikeClient *self, PyObject *args,
as_exp *batch_exp_list_p = NULL;

if (py_policy_batch) {
if (pyobject_to_policy_batch(
if (as_policy_batch_init_and_set_from_pyobject(
self, &err, py_policy_batch, &policy_batch, &policy_batch_p,
&self->as->config.policies.batch, &batch_exp_list,
&batch_exp_list_p) != AEROSPIKE_OK) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/client/batch_remove.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ static PyObject *AerospikeClient_Batch_Remove_Invoke(
sizeof(as_key) * processed_key_count);

if (py_policy_batch) {
if (pyobject_to_policy_batch(
if (as_policy_batch_init_and_set_from_pyobject(
self, err, py_policy_batch, &policy_batch, &policy_batch_p,
&self->as->config.policies.batch, &batch_exp_list,
&batch_exp_list_p) != AEROSPIKE_OK) {
Expand Down
17 changes: 8 additions & 9 deletions src/main/client/batch_write.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ static PyObject *AerospikeClient_BatchWriteInvoke(AerospikeClient *self,
as_batch_records *batch_records_p = NULL;

as_policy_batch batch_policy;
as_policy_batch *batch_policy_p = NULL;
as_exp exp_list;
as_exp *exp_list_p = NULL;

Expand Down Expand Up @@ -157,13 +156,12 @@ static PyObject *AerospikeClient_BatchWriteInvoke(AerospikeClient *self,
goto CLEANUP4;
}

if (py_policy != NULL) {
if (pyobject_to_policy_batch(self, err, py_policy, &batch_policy,
&batch_policy_p,
&self->as->config.policies.batch,
&exp_list, &exp_list_p) != AEROSPIKE_OK) {
goto CLEANUP4;
}
as_policy_batch *batch_policy_p;
if (as_policy_batch_init_and_set_from_pyobject(
self, err, py_policy, &batch_policy, &batch_policy_p,
&self->as->config.policies.batch_parent_write, &exp_list,
&exp_list_p) != AEROSPIKE_OK) {
goto CLEANUP4;
}

if (!is_pyobj_correct_as_helpers_type(py_obj, "batch.records",
Expand Down Expand Up @@ -574,7 +572,8 @@ static PyObject *AerospikeClient_BatchWriteInvoke(AerospikeClient *self,
PyObject *AerospikeClient_BatchWrite(AerospikeClient *self, PyObject *args,
PyObject *kwds)
{
PyObject *py_policy = NULL;
// TODO: check ref count
PyObject *py_policy = Py_None;
PyObject *py_batch_recs = NULL;

as_error err;
Expand Down
122 changes: 67 additions & 55 deletions src/main/policy.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@
if (!PyDict_Check(py_policy)) { \
return as_error_update(err, AEROSPIKE_ERR_PARAM, \
"policy must be a dict"); \
} \
__policy##_init(policy);
}

#define POLICY_UPDATE() *policy_p = policy;

Expand Down Expand Up @@ -300,9 +299,9 @@ static inline void check_and_set_txn_field(as_error *err,
}

static inline as_status
pyobject_to_policy_base(AerospikeClient *self, as_error *err,
PyObject *py_policy, as_policy_base *policy,
as_exp *exp_list, as_exp **exp_list_p)
as_policy_base_init_and_set_from_py_optional_policy_dict(
AerospikeClient *self, as_error *err, PyObject *py_policy,
as_policy_base *policy, as_exp *exp_list, as_exp **exp_list_p)
{
POLICY_SET_FIELD(total_timeout, uint32_t);
POLICY_SET_FIELD(socket_timeout, uint32_t);
Expand Down Expand Up @@ -342,8 +341,9 @@ as_status pyobject_to_policy_apply(AerospikeClient *self, as_error *err,

if (py_policy && py_policy != Py_None) {
// Set policy fields
as_status retval = pyobject_to_policy_base(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
as_status retval =
as_policy_base_init_and_set_from_py_optional_policy_dict(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
if (retval != AEROSPIKE_OK) {
return retval;
}
Expand Down Expand Up @@ -414,8 +414,9 @@ as_status pyobject_to_policy_query(AerospikeClient *self, as_error *err,
as_policy_query_copy(config_query_policy, policy);

if (py_policy && py_policy != Py_None) {
as_status retval = pyobject_to_policy_base(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
as_status retval =
as_policy_base_init_and_set_from_py_optional_policy_dict(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
if (retval != AEROSPIKE_OK) {
return retval;
}
Expand Down Expand Up @@ -456,8 +457,9 @@ as_status pyobject_to_policy_read(AerospikeClient *self, as_error *err,

if (py_policy && py_policy != Py_None) {
// Set policy fields
as_status retval = pyobject_to_policy_base(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
as_status retval =
as_policy_base_init_and_set_from_py_optional_policy_dict(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
if (retval != AEROSPIKE_OK) {
return retval;
}
Expand Down Expand Up @@ -500,8 +502,9 @@ as_status pyobject_to_policy_remove(AerospikeClient *self, as_error *err,

if (py_policy && py_policy != Py_None) {
// Set policy fields
as_status retval = pyobject_to_policy_base(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
as_status retval =
as_policy_base_init_and_set_from_py_optional_policy_dict(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
if (retval != AEROSPIKE_OK) {
return retval;
}
Expand Down Expand Up @@ -542,8 +545,9 @@ as_status pyobject_to_policy_scan(AerospikeClient *self, as_error *err,

if (py_policy && py_policy != Py_None) {
// Set policy fields
as_status retval = pyobject_to_policy_base(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
as_status retval =
as_policy_base_init_and_set_from_py_optional_policy_dict(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
if (retval != AEROSPIKE_OK) {
return retval;
}
Expand Down Expand Up @@ -581,8 +585,9 @@ as_status pyobject_to_policy_write(AerospikeClient *self, as_error *err,

if (py_policy && py_policy != Py_None) {
// Set policy fields
as_status retval = pyobject_to_policy_base(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
as_status retval =
as_policy_base_init_and_set_from_py_optional_policy_dict(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
if (retval != AEROSPIKE_OK) {
return retval;
}
Expand Down Expand Up @@ -625,8 +630,9 @@ as_status pyobject_to_policy_operate(AerospikeClient *self, as_error *err,

if (py_policy && py_policy != Py_None) {
// Set policy fields
as_status retval = pyobject_to_policy_base(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
as_status retval =
as_policy_base_init_and_set_from_py_optional_policy_dict(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
if (retval != AEROSPIKE_OK) {
return retval;
}
Expand All @@ -653,51 +659,57 @@ as_status pyobject_to_policy_operate(AerospikeClient *self, as_error *err,
}

/**
* Converts a PyObject into an as_policy_batch object.
* Returns AEROSPIKE_OK on success. On error, the err argument is populated.
* We assume that the error object and the policy object are already allocated
* and initialized (although, we do reset the error object here).
* Given a:
* - non-NULL dict called *py_policy*
* - pointer to an uninitialized as_policy_batch instance called *policy*. This is a transaction-level batch policy passed
* to the C client's API.
*
* - If py_policy is None, return NULL.
* - If py_policy is not None:
* 1. Initialize *policy* to *config_policy*'s fields.
* 2. Then use py_policy's fields to override *policy*'s fields.
*
* that can be passed to a C client API call that takes in a reference to a batch policy.
* On error, the err argument is populated.
* NOTE: we do reset the error object here.
*/
as_status pyobject_to_policy_batch(AerospikeClient *self, as_error *err,
PyObject *py_policy, as_policy_batch *policy,
as_policy_batch **policy_p,
as_policy_batch *config_batch_policy,
as_exp *exp_list, as_exp **exp_list_p)
as_policy_batch *as_policy_batch_init_and_set_from_pyobject(
AerospikeClient *self, as_error *err, PyObject *py_policy,
as_policy_batch *policy, as_policy_batch *config_policy, as_exp *exp_list,
as_exp **exp_list_p)
{
if (py_policy && py_policy != Py_None) {
// Initialize Policy
POLICY_INIT(as_policy_batch);
// TODO: not sure if this should be here
as_error_reset(err);

if (!PyDict_Check(py_policy)) {
return as_error_update(err, AEROSPIKE_ERR_PARAM,
"policy must be a dict");
}
//Initialize policy with global defaults
as_policy_batch_copy(config_batch_policy, policy);

if (py_policy && py_policy != Py_None) {
// Set policy fields
as_status retval = pyobject_to_policy_base(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
if (retval != AEROSPIKE_OK) {
return retval;
}
as_policy_batch_copy(config_policy, policy);

POLICY_SET_FIELD(concurrent, bool);
POLICY_SET_FIELD(allow_inline, bool);
POLICY_SET_FIELD(deserialize, bool);
POLICY_SET_FIELD(replica, as_policy_replica);
POLICY_SET_FIELD(read_touch_ttl_percent, int);
// Set policy fields
as_status retval = as_policy_base_init_and_set_from_py_optional_policy_dict(
self, err, py_policy, &policy->base, exp_list, exp_list_p);
if (retval != AEROSPIKE_OK) {
return retval;
}

// 4.0.0 new policies
POLICY_SET_FIELD(read_mode_ap, as_policy_read_mode_ap);
POLICY_SET_FIELD(read_mode_sc, as_policy_read_mode_sc);
POLICY_SET_FIELD(concurrent, bool);
POLICY_SET_FIELD(allow_inline, bool);
POLICY_SET_FIELD(deserialize, bool);
POLICY_SET_FIELD(replica, as_policy_replica);
POLICY_SET_FIELD(read_touch_ttl_percent, int);

// C client 6.0.0 (batch writes)
POLICY_SET_FIELD(allow_inline_ssd, bool);
POLICY_SET_FIELD(respond_all_keys, bool);
}
// 4.0.0 new policies
POLICY_SET_FIELD(read_mode_ap, as_policy_read_mode_ap);
POLICY_SET_FIELD(read_mode_sc, as_policy_read_mode_sc);

// Update the policy
POLICY_UPDATE();
// C client 6.0.0 (batch writes)
POLICY_SET_FIELD(allow_inline_ssd, bool);
POLICY_SET_FIELD(respond_all_keys, bool);

return err->code;
return policy;
}

// New with server 6.0, C client 5.2.0 (batch writes)
Expand Down
Loading