Skip to content

Commit 03c270e

Browse files
committed
PYCBC-1646: Migrate transactions to use transaction_context
Motivation ========== The current path of using the C++ core async_attempt_context adds complexity to both the Python SDK and C++ core (the Python SDK is the only client using the async_attempt_context logic). Migrating to using transaction_context will place the Python SDK inline with other wrappers and also allow some code removal/cleanup in the C++ core. Modification ============ * Remove attempt_context and use transaction_context instead * Remove executing the transaction lambda in the bindings * Rearrange how transaction exceptions are built in the bindings * Update Python (non-binding) transaction code to execute transaction logic Results ======= All tests pass without any modification to tests. Change-Id: Iaca68e5d36507bcc64d2cd34afc8e244dc819fc4 Reviewed-on: https://review.couchbase.org/c/couchbase-python-client/+/221542 Tested-by: Build Bot <[email protected]> Reviewed-by: Dimitris Christodoulou <[email protected]>
1 parent 9593cc1 commit 03c270e

File tree

7 files changed

+613
-344
lines changed

7 files changed

+613
-344
lines changed

acouchbase/transactions/transactions.py

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
import asyncio
1716
import logging
1817
from functools import wraps
1918
from typing import (TYPE_CHECKING,
2019
Any,
2120
Awaitable,
2221
Callable,
22+
Coroutine,
2323
Dict,
2424
Optional)
2525

@@ -111,28 +111,19 @@ def __init__(self,
111111
):
112112
super().__init__(cluster, config)
113113

114-
@AsyncWrapper.inject_callbacks(TransactionResult)
115-
def run(self,
116-
txn_logic, # type: Callable[[AttemptContextLogic], None]
117-
transaction_options=None, # type: Optional[TransactionOptions]
118-
**kwargs) -> Awaitable[TransactionResult]:
119-
def wrapped_logic(c):
120-
try:
121-
ctx = AttemptContext(c, self._loop, self._transcoder)
122-
asyncio.run_coroutine_threadsafe(txn_logic(ctx), self._loop).result()
123-
log.debug('wrapped logic completed')
124-
except Exception as e:
125-
log.debug('wrapped_logic raised %s', e)
126-
raise e
127-
114+
async def run(self,
115+
txn_logic, # type: Callable[[AttemptContextLogic], Coroutine[Any, Any, None]]
116+
transaction_options=None, # type: Optional[TransactionOptions]
117+
**kwargs) -> TransactionResult:
128118
opts = None
129119
if transaction_options:
130-
opts = transaction_options
120+
opts = transaction_options._base
131121
if 'per_txn_config' in kwargs:
132122
Supportability.method_param_deprecated('per_txn_config', 'transaction_options')
133123
opts = kwargs.pop('per_txn_config', None)
134124

135-
return super().run(wrapped_logic, opts, **kwargs)
125+
txn_result = await super().run_async(txn_logic, AttemptContext(self._txns, self._loop, self._transcoder, opts))
126+
return TransactionResult(**txn_result)
136127

137128
# TODO: make async?
138129
def close(self):
@@ -149,12 +140,32 @@ def close(self):
149140

150141

151142
class AttemptContext(AttemptContextLogic):
143+
152144
def __init__(self,
153145
ctx, # type: PyCapsuleType
154146
loop, # type: AbstractEventLoop
155-
transcoder # type: Transcoder
147+
transcoder, # type: Transcoder
148+
opts # type: Optional[PyCapsuleType]
156149
):
157-
super().__init__(ctx, loop, transcoder)
150+
super().__init__(ctx, transcoder, loop, opts)
151+
152+
@AsyncWrapper.inject_callbacks(None)
153+
def _new_attempt(self,
154+
**kwargs # type: Dict[str, Any]
155+
) -> Awaitable[None]:
156+
return super()._new_attempt_async(**kwargs)
157+
158+
@AsyncWrapper.inject_callbacks(None)
159+
def _rollback(self,
160+
**kwargs # type: Dict[str, Any]
161+
) -> Awaitable[None]:
162+
return super()._rollback_async(**kwargs)
163+
164+
@AsyncWrapper.inject_callbacks(TransactionResult)
165+
def _commit(self,
166+
**kwargs # type: Dict[str, Any]
167+
) -> Awaitable[TransactionResult]:
168+
return super()._commit_async(**kwargs)
158169

159170
@AsyncWrapper.inject_callbacks(TransactionGetResult)
160171
def _get(self,

couchbase/transactions/logic/attempt_context_logic.py

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,21 @@
1414
# limitations under the License.
1515

1616
import logging
17-
from typing import TYPE_CHECKING, Optional
17+
from typing import (TYPE_CHECKING,
18+
Any,
19+
Dict,
20+
Optional)
1821

19-
from couchbase.pycbc_core import (transaction_op,
22+
from couchbase.exceptions import ErrorMapper
23+
from couchbase.exceptions import exception as CouchbaseBaseException
24+
from couchbase.options import TransactionOptions
25+
from couchbase.pycbc_core import (create_new_attempt_context,
26+
create_transaction_context,
27+
transaction_commit,
28+
transaction_op,
2029
transaction_operations,
21-
transaction_query_op)
30+
transaction_query_op,
31+
transaction_rollback)
2232

2333
if TYPE_CHECKING:
2434
from asyncio import AbstractEventLoop
@@ -31,23 +41,61 @@
3141

3242
class AttemptContextLogic:
3343
def __init__(self,
34-
ctx, # type: PyCapsuleType
35-
loop, # type: Optional[AbstractEventLoop]
44+
txns, # type: PyCapsuleType
3645
transcoder, # type: Transcoder
37-
):
38-
log.debug('creating new attempt context with context=%s, loop=%s, and transcoder=%s', ctx, loop, transcoder)
39-
self._ctx = ctx
46+
loop, # type: Optional[AbstractEventLoop]
47+
opts # type: Optional[PyCapsuleType]
48+
) -> None:
49+
if opts is None:
50+
opts = TransactionOptions()._base
51+
self._txnctx = create_transaction_context(txns=txns, transaction_options=opts)
4052
self._loop = loop
4153
self._transcoder = transcoder
4254

55+
def _handle_exception(self, ex: Any) -> None:
56+
if isinstance(ex, Exception):
57+
raise ex
58+
if isinstance(ex, CouchbaseBaseException):
59+
raise ErrorMapper.build_exception(ex)
60+
61+
def _new_attempt(self) -> None:
62+
new_attempt_ctx = create_new_attempt_context(ctx=self._txnctx)
63+
self._handle_exception(new_attempt_ctx)
64+
65+
def _new_attempt_async(self,
66+
**kwargs, # type: Dict[str, Any]
67+
) -> None:
68+
create_new_attempt_context(ctx=self._txnctx, **kwargs)
69+
70+
def _rollback(self) -> None:
71+
rollback_res = transaction_rollback(ctx=self._txnctx)
72+
self._handle_exception(rollback_res)
73+
74+
def _rollback_async(self,
75+
**kwargs, # type: Dict[str, Any]
76+
) -> None:
77+
transaction_rollback(ctx=self._txnctx, **kwargs)
78+
79+
def _commit(self) -> Optional[Dict[str, Any]]:
80+
commit_res = transaction_commit(ctx=self._txnctx)
81+
self._handle_exception(commit_res)
82+
return commit_res
83+
84+
def _commit_async(self,
85+
**kwargs, # type: Dict[str, Any]
86+
) -> Optional[Dict[str, Any]]:
87+
return transaction_commit(ctx=self._txnctx, **kwargs)
88+
4389
def get(self, coll, key, **kwargs):
4490
# make sure we don't pass the transcoder along
4591
kwargs.pop('transcoder', None)
4692
kwargs.update(coll._get_connection_args())
4793
kwargs.pop("conn")
48-
kwargs["key"] = key
49-
kwargs["ctx"] = self._ctx
50-
kwargs["op"] = transaction_operations.GET.value
94+
kwargs.update({
95+
'key': key,
96+
'ctx': self._txnctx,
97+
'op': transaction_operations.GET.value
98+
})
5199
log.debug('get calling transaction op with %s', kwargs)
52100
return transaction_op(**kwargs)
53101

@@ -57,7 +105,7 @@ def insert(self, coll, key, value, **kwargs):
57105
kwargs.pop("conn")
58106
kwargs.update({
59107
'key': key,
60-
'ctx': self._ctx,
108+
'ctx': self._txnctx,
61109
'op': transaction_operations.INSERT.value,
62110
'value': transcoder.encode_value(value)
63111
})
@@ -67,7 +115,7 @@ def insert(self, coll, key, value, **kwargs):
67115
def replace(self, txn_get_result, value, **kwargs):
68116
transcoder = kwargs.pop('transcoder', self._transcoder)
69117
kwargs.update({
70-
'ctx': self._ctx,
118+
'ctx': self._txnctx,
71119
'op': transaction_operations.REPLACE.value,
72120
'value': transcoder.encode_value(value),
73121
'txn_get_result': txn_get_result._res
@@ -76,13 +124,13 @@ def replace(self, txn_get_result, value, **kwargs):
76124
return transaction_op(**kwargs)
77125

78126
def remove(self, txn_get_result, **kwargs):
79-
kwargs.update({"ctx": self._ctx,
80-
"op": transaction_operations.REMOVE.value,
81-
"txn_get_result": txn_get_result._res})
127+
kwargs.update({'ctx': self._txnctx,
128+
'op': transaction_operations.REMOVE.value,
129+
'txn_get_result': txn_get_result._res})
82130
log.debug('remove calling transaction op with %s', kwargs)
83131
return transaction_op(**kwargs)
84132

85133
def query(self, query, options, **kwargs):
86-
kwargs.update({"ctx": self._ctx, "statement": query, "options": options._base})
134+
kwargs.update({'ctx': self._txnctx, 'statement': query, 'options': options._base})
87135
log.debug('query calling transaction_op with %s', kwargs)
88136
return transaction_query_op(**kwargs)

couchbase/transactions/logic/transactions_logic.py

Lines changed: 54 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,18 @@
1717
from typing import (TYPE_CHECKING,
1818
Any,
1919
Callable,
20-
Dict,
21-
Optional)
20+
Coroutine)
2221

23-
from couchbase.logic.supportability import Supportability
24-
from couchbase.pycbc_core import (create_transactions,
25-
destroy_transactions,
26-
run_transaction)
22+
from couchbase.exceptions import TransactionExpired, TransactionFailed
23+
from couchbase.pycbc_core import create_transactions, destroy_transactions
24+
from couchbase.transactions.transaction_result import TransactionResult
2725
from couchbase.transcoder import JSONTranscoder
2826

2927
if TYPE_CHECKING:
28+
from acouchbase.transactions import AttemptContext as AsyncAttemptContext
3029
from couchbase.logic.cluster import ClusterLogic
31-
from couchbase.options import TransactionConfig, TransactionOptions
32-
from couchbase.transactions.logic.attempt_context_logic import AttemptContextLogic
30+
from couchbase.options import TransactionConfig
31+
from couchbase.transactions import AttemptContext as BlockingAttemptContext
3332

3433
log = logging.getLogger(__name__)
3534

@@ -49,21 +48,54 @@ def __init__(self,
4948
log.info('created transactions object using config=%s, transcoder=%s', self._config, self._transcoder)
5049

5150
def run(self,
52-
logic, # type: Callable[[AttemptContextLogic], None]
53-
transaction_options=None, # type: Optional[TransactionOptions]
54-
**kwargs # type: Optional[Dict[str, Any]]
55-
):
56-
if transaction_options:
57-
kwargs['transaction_options'] = transaction_options._base
58-
if 'per_txn_config' in kwargs:
59-
Supportability.method_param_deprecated('per_txn_config', 'transaction_options')
60-
kwargs['transaction_options'] = kwargs.pop('per_txn_config', None)
51+
logic, # type: Callable[[BlockingAttemptContext], None]
52+
attempt_ctx # type: BlockingAttemptContext
53+
) -> TransactionResult:
6154

62-
try:
63-
return run_transaction(txns=self._txns, logic=logic, **kwargs)
64-
except Exception as e:
65-
log.debug('txn_logic.run() got %s:%s, re-raising it', e.__class__.__name__, e)
66-
raise e
55+
while True:
56+
attempt_ctx._new_attempt()
57+
try:
58+
logic(attempt_ctx)
59+
except Exception as ex:
60+
attempt_ctx._rollback()
61+
if isinstance(ex, TransactionExpired):
62+
raise ex from None
63+
raise TransactionFailed(exc_info={'inner_cause': ex}) from None
64+
65+
try:
66+
# calls finalize internally
67+
res = attempt_ctx._commit()
68+
if not res:
69+
continue
70+
return TransactionResult(**res)
71+
except Exception:
72+
# commit failed, retrying...
73+
pass # nosec
74+
75+
async def run_async(self,
76+
logic, # type: Callable[[AsyncAttemptContext], Coroutine[Any, Any, None]]
77+
attempt_ctx # type: AsyncAttemptContext
78+
) -> TransactionResult:
79+
80+
while True:
81+
await attempt_ctx._new_attempt()
82+
try:
83+
await logic(attempt_ctx)
84+
except Exception as ex:
85+
await attempt_ctx._rollback()
86+
if isinstance(ex, TransactionExpired):
87+
raise ex from None
88+
raise TransactionFailed(exc_info={'inner_cause': ex}) from None
89+
90+
try:
91+
# calls finalize internally
92+
res = await attempt_ctx._commit()
93+
if not res:
94+
continue
95+
return res
96+
except Exception:
97+
# commit failed, retrying...
98+
pass # nosec
6799

68100
def close(self, **kwargs):
69101
log.info('shutting down transactions...')

couchbase/transactions/transactions.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -116,22 +116,14 @@ def txn_logic(ctx):
116116
117117
""" # noqa: E501
118118

119-
def wrapped_txn_logic(c):
120-
try:
121-
ctx = AttemptContext(c, self._transcoder)
122-
return txn_logic(ctx)
123-
except Exception as e:
124-
log.debug('wrapped_txn_logic got %s:%s, re-raising it', e.__class__.__name__, e)
125-
raise e
126-
127119
opts = None
128120
if transaction_options:
129-
opts = transaction_options
121+
opts = transaction_options._base
130122
if 'per_txn_config' in kwargs:
131123
Supportability.method_param_deprecated('per_txn_config', 'transaction_options')
132124
opts = kwargs.pop('per_txn_config', None)
133125

134-
return TransactionResult(**super().run(wrapped_txn_logic, opts))
126+
return super().run(txn_logic, AttemptContext(self._txns, self._transcoder, opts))
135127

136128
def close(self):
137129
super().close()
@@ -141,10 +133,11 @@ def close(self):
141133
class AttemptContext(AttemptContextLogic):
142134

143135
def __init__(self,
144-
ctx, # type: PyCapsuleType
145-
transcoder # type: Transcoder
136+
txns, # type: PyCapsuleType
137+
transcoder, # type: Transcoder
138+
opts # type: Optional[PyCapsuleType]
146139
):
147-
super().__init__(ctx, None, transcoder)
140+
super().__init__(txns, transcoder, None, opts)
148141

149142
@BlockingWrapper.block(TransactionGetResult)
150143
def _get(self,

src/client.cxx

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -401,10 +401,14 @@ static struct PyMethodDef methods[] = {
401401
(PyCFunction)pycbc_txns::create_transactions,
402402
METH_VARARGS | METH_KEYWORDS,
403403
"Create a transactions object" },
404-
{ "run_transaction",
405-
(PyCFunction)pycbc_txns::run_transactions,
404+
{ "create_transaction_context",
405+
(PyCFunction)pycbc_txns::create_transaction_context,
406406
METH_VARARGS | METH_KEYWORDS,
407-
"Run a transaction" },
407+
"Create a transaction context object" },
408+
{ "create_new_attempt_context",
409+
(PyCFunction)pycbc_txns::create_new_attempt_context,
410+
METH_VARARGS | METH_KEYWORDS,
411+
"Create a new attempt context object" },
408412
{ "transaction_op",
409413
(PyCFunction)pycbc_txns::transaction_op,
410414
METH_VARARGS | METH_KEYWORDS,
@@ -413,6 +417,14 @@ static struct PyMethodDef methods[] = {
413417
(PyCFunction)pycbc_txns::transaction_query_op,
414418
METH_VARARGS | METH_KEYWORDS,
415419
"perform a transactional query" },
420+
{ "transaction_commit",
421+
(PyCFunction)pycbc_txns::transaction_commit,
422+
METH_VARARGS | METH_KEYWORDS,
423+
"Commit a transaction" },
424+
{ "transaction_rollback",
425+
(PyCFunction)pycbc_txns::transaction_rollback,
426+
METH_VARARGS | METH_KEYWORDS,
427+
"Rollback a transaction" },
416428
{ "destroy_transactions",
417429
(PyCFunction)pycbc_txns::destroy_transactions,
418430
METH_VARARGS | METH_KEYWORDS,

0 commit comments

Comments
 (0)