Skip to content

Commit a7b6d15

Browse files
avazquezrdrjarry
authored andcommitted
callback: create unsafe mode to retrieve implicit session
A new API for sync callbacks is created, where the implicit session is returned instead of the list of changes. This allows, in sync callbacks, accessing all data (including changed nodes). However, when callback returns to Sysrepo, the session is freed, so user callback should never keep a reference on this implicit session.
1 parent ab825a9 commit a7b6d15

File tree

4 files changed

+300
-1
lines changed

4 files changed

+300
-1
lines changed

examples/application.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ def main():
4242
with conn.start_session() as sess:
4343
logging.info("subscribing to module changes: sysrepo-example")
4444
sess.subscribe_module_change("sysrepo-example", None, module_change_cb)
45+
sess.subscribe_module_change_unsafe(
46+
"sysrepo-example",
47+
None,
48+
module_change_unsafe_cb,
49+
priority=1,
50+
)
4551
logging.info(
4652
"subscribing to operational data requests: /sysrepo-example:state"
4753
)
@@ -75,6 +81,19 @@ def module_change_cb(event, req_id, changes, private_data):
7581
print()
7682

7783

84+
# ------------------------------------------------------------------------------
85+
def module_change_unsafe_cb(session, event, req_id, private_data):
86+
print()
87+
print("========================")
88+
print("(unsafe) Module changed event: %s (request ID %s)" % (event, req_id))
89+
print("----- changes -----")
90+
changes = list(session.get_changes("/sysrepo-example:conf//."))
91+
for c in changes:
92+
print(repr(c))
93+
print("----- end of changes -----")
94+
print()
95+
96+
7897
# ------------------------------------------------------------------------------
7998
def oper_data_cb(xpath, private_data):
8099
print()

sysrepo/session.py

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
check_call,
1818
)
1919
from .subscription import Subscription
20-
from .util import c2str, str2c
20+
from .util import c2str, is_async_func, str2c
2121
from .value import Value
2222

2323

@@ -372,6 +372,118 @@ def subscribe_module_change(
372372

373373
self.subscriptions.append(sub)
374374

375+
UnsafeModuleChangeCallbackType = Callable[["SysrepoSession", str, int, Any], None]
376+
"""
377+
Callback to be called when the change in the datastore occurs. Provides implicit
378+
session object instead of list of changes. THE CALLBACK SHOULD NEVER KEEP A
379+
REFERENCE ON THE IMPLICIT SESSION OBJECT TO AVOID USER-AFTER-FREE BUGS.
380+
381+
:arg session:
382+
Implicit session (do not stop) with information about the changed data.
383+
:arg event:
384+
Type of the callback event that has occurred. Can be one of: "update", "change",
385+
"done", "abort", "enabled".
386+
:arg req_id:
387+
Request ID unique for the specific module name. Connected events for one request
388+
("change" and "done" for example) have the same request ID.
389+
:arg private_data:
390+
Private context opaque to sysrepo used when subscribing.
391+
392+
When event is one of ("update", "change"), if the callback raises an exception, the
393+
changes will be rejected and the error will be forwarded to the client that made the
394+
change. If the exception is a subclass of `SysrepoError`, the traceback will not be
395+
sent to the logging system. For consistency and to avoid confusion with unexpected
396+
errors, the callback should raise explicit `SysrepoValidationFailedError` exceptions
397+
to reject changes.
398+
"""
399+
400+
def subscribe_module_change_unsafe(
401+
self,
402+
module: str,
403+
xpath: Optional[str],
404+
callback: UnsafeModuleChangeCallbackType,
405+
*,
406+
priority: int = 0,
407+
no_thread: bool = False,
408+
passive: bool = False,
409+
done_only: bool = False,
410+
enabled: bool = False,
411+
private_data: Any = None,
412+
asyncio_register: bool = False,
413+
) -> None:
414+
"""
415+
Subscribe for changes made in the specified module. Implicit session object
416+
is returned to callback instead of list of changes. When callback returns to
417+
Sysrepo, the session is freed, so we cannot keep a reference on it and
418+
schedule async code to run later. For this reason, async callbacks are NOT
419+
allowed here.
420+
421+
:arg module:
422+
Name of the module of interest for change notifications.
423+
:arg xpath:
424+
Optional xpath further filtering the changes that will be handled
425+
by this subscription.
426+
:arg callback:
427+
Callback to be called when the change in the datastore occurs.
428+
:arg priority:
429+
Specifies the order in which the callbacks (**within module**) will
430+
be called.
431+
:arg no_thread:
432+
There will be no thread created for handling this subscription
433+
meaning no event will be processed! Default to `True` if
434+
asyncio_register is `True`.
435+
:arg passive:
436+
The subscriber is not the "owner" of the subscribed data tree, just
437+
a passive watcher for changes.
438+
:arg done_only:
439+
The subscriber does not support verification of the changes and
440+
wants to be notified only after the changes has been applied in the
441+
datastore, without the possibility to deny them.
442+
:arg enabled:
443+
The subscriber wants to be notified about the current configuration
444+
at the moment of subscribing.
445+
:arg private_data:
446+
Private context passed to the callback function, opaque to sysrepo.
447+
:arg asyncio_register:
448+
Add the created subscription event pipe into asyncio event loop
449+
monitored read file descriptors. Implies `no_thread=True`.
450+
"""
451+
if self.is_implicit:
452+
raise SysrepoUnsupportedError("cannot subscribe with implicit sessions")
453+
if is_async_func(callback):
454+
raise SysrepoUnsupportedError(
455+
"cannot use unsafe subscription with async callback"
456+
)
457+
_check_subscription_callback(callback, self.UnsafeModuleChangeCallbackType)
458+
459+
sub = Subscription(
460+
callback,
461+
private_data,
462+
asyncio_register=asyncio_register,
463+
unsafe=True,
464+
)
465+
sub_p = ffi.new("sr_subscription_ctx_t **")
466+
467+
if asyncio_register:
468+
no_thread = True # we manage our own event loop
469+
flags = _subscribe_flags(
470+
no_thread=no_thread, passive=passive, done_only=done_only, enabled=enabled
471+
)
472+
check_call(
473+
lib.sr_module_change_subscribe,
474+
self.cdata,
475+
str2c(module),
476+
str2c(xpath),
477+
lib.srpy_module_change_cb,
478+
sub.handle,
479+
priority,
480+
flags,
481+
sub_p,
482+
)
483+
sub.init(sub_p[0])
484+
485+
self.subscriptions.append(sub)
486+
375487
OperDataCallbackType = Callable[[str, Any], Optional[Dict]]
376488
"""
377489
Callback to be called when the operational data are requested.

sysrepo/subscription.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def __init__(
3535
include_implicit_defaults: bool = True,
3636
include_deleted_values: bool = False,
3737
extra_info: bool = False,
38+
unsafe: bool = False,
3839
):
3940
"""
4041
:arg callback:
@@ -58,6 +59,8 @@ def __init__(
5859
When True, the given callback is called with extra keyword arguments
5960
containing extra information of the sysrepo session that gave origin to the
6061
event
62+
:arg unsafe:
63+
When True, the given callback returns implicit session.
6164
"""
6265
if is_async_func(callback) and not asyncio_register:
6366
raise ValueError(
@@ -78,6 +81,7 @@ def __init__(
7881
self.cdata = None
7982
self.fd = -1
8083
self.handle = ffi.new_handle(self)
84+
self.unsafe = unsafe
8185

8286
def init(self, cdata) -> None:
8387
"""
@@ -228,6 +232,9 @@ def module_change_callback(session, sub_id, module, xpath, event, req_id, priv):
228232
callback = subscription.callback
229233
private_data = subscription.private_data
230234
event_name = EVENT_NAMES[event]
235+
if subscription.unsafe:
236+
callback(session, event_name, req_id, private_data)
237+
return lib.SR_ERR_OK
231238
if subscription.extra_info:
232239
try:
233240
extra_info = {

tests/test_subs_module_change.py

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import unittest
88

99
import sysrepo
10+
from sysrepo.session import SysrepoSession
1011

1112

1213
YANG_FILE = os.path.join(
@@ -307,3 +308,163 @@ def module_change_cb(event, req_id, changes, private_data, **kwargs):
307308
# * once with event "change"
308309
# * once with event "done"
309310
self.assertEqual(2, len(calls))
311+
312+
def test_module_change_sub_unsafe(self):
313+
priv = object()
314+
current_config = {}
315+
expected_changes = []
316+
317+
def module_change_cb(session, event, req_id, private_data):
318+
self.assertIsInstance(session, SysrepoSession)
319+
self.assertIn(event, ("change", "done", "abort"))
320+
self.assertIsInstance(req_id, int)
321+
self.assertIs(private_data, priv)
322+
changes = list(session.get_changes("/sysrepo-example:conf//."))
323+
for c in changes:
324+
if c.xpath == "/sysrepo-example:conf/system/hostname":
325+
if event == "change" and c.value == "INVALID":
326+
raise sysrepo.SysrepoValidationFailedError("invalid hostname")
327+
if event in ("change", "done"):
328+
self.assertEqual(changes, expected_changes)
329+
if event == "done":
330+
sysrepo.update_config_cache(current_config, changes)
331+
332+
self.sess.subscribe_module_change_unsafe(
333+
"sysrepo-example",
334+
"/sysrepo-example:conf",
335+
module_change_cb,
336+
private_data=priv,
337+
)
338+
339+
with self.conn.start_session("running") as ch_sess:
340+
# 1.
341+
sent_config = {"conf": {"system": {"hostname": "bar"}}}
342+
expected_changes = [
343+
sysrepo.ChangeCreated("/sysrepo-example:conf/system/hostname", "bar")
344+
]
345+
ch_sess.replace_config(sent_config, "sysrepo-example", strict=True)
346+
self.assertEqual(current_config, sent_config)
347+
# 2.
348+
sent_config = {"conf": {"system": {"hostname": "INVALID"}}}
349+
expected_changes = []
350+
with self.assertRaises(sysrepo.SysrepoCallbackFailedError):
351+
ch_sess.replace_config(sent_config, "sysrepo-example", strict=True)
352+
# 3.
353+
sent_config = {
354+
"conf": {
355+
"system": {"hostname": "bar"},
356+
"network": {"interface": [{"name": "eth0", "up": True}]},
357+
}
358+
}
359+
expected_changes = [
360+
sysrepo.ChangeCreated(
361+
"/sysrepo-example:conf/network/interface[name='eth0']",
362+
{"name": "eth0", "up": True},
363+
after="",
364+
),
365+
sysrepo.ChangeCreated(
366+
"/sysrepo-example:conf/network/interface[name='eth0']/name", "eth0"
367+
),
368+
sysrepo.ChangeCreated(
369+
"/sysrepo-example:conf/network/interface[name='eth0']/up", True
370+
),
371+
]
372+
ch_sess.replace_config(sent_config, "sysrepo-example", strict=True)
373+
self.assertEqual(current_config, sent_config)
374+
# 4.
375+
sent_config = {
376+
"conf": {
377+
"system": {"hostname": "bar"},
378+
"network": {"interface": [{"name": "eth2", "up": False}]},
379+
}
380+
}
381+
expected_changes = [
382+
sysrepo.ChangeDeleted(
383+
"/sysrepo-example:conf/network/interface[name='eth0']",
384+
None,
385+
),
386+
sysrepo.ChangeDeleted(
387+
"/sysrepo-example:conf/network/interface[name='eth0']/name",
388+
None,
389+
),
390+
sysrepo.ChangeDeleted(
391+
"/sysrepo-example:conf/network/interface[name='eth0']/up",
392+
None,
393+
),
394+
sysrepo.ChangeCreated(
395+
"/sysrepo-example:conf/network/interface[name='eth2']",
396+
{"name": "eth2", "up": False},
397+
after="",
398+
),
399+
sysrepo.ChangeCreated(
400+
"/sysrepo-example:conf/network/interface[name='eth2']/name", "eth2"
401+
),
402+
sysrepo.ChangeCreated(
403+
"/sysrepo-example:conf/network/interface[name='eth2']/up", False
404+
),
405+
]
406+
ch_sess.replace_config(sent_config, "sysrepo-example", strict=True)
407+
self.assertEqual(current_config, sent_config)
408+
# 5.
409+
sent_config = {
410+
"conf": {
411+
"system": {"hostname": "bar"},
412+
"network": {"interface": [{"name": "eth2", "up": True}]},
413+
}
414+
}
415+
expected_changes = [
416+
sysrepo.ChangeModified(
417+
"/sysrepo-example:conf/network/interface[name='eth2']/up",
418+
True,
419+
"false",
420+
False,
421+
),
422+
]
423+
ch_sess.replace_config(sent_config, "sysrepo-example", strict=True)
424+
self.assertEqual(current_config, sent_config)
425+
# 6.
426+
sent_config = {
427+
"conf": {
428+
"system": {"hostname": "bar"},
429+
"network": {
430+
"interface": [
431+
{"name": "eth2", "up": True},
432+
{"name": "eth0", "up": False},
433+
]
434+
},
435+
}
436+
}
437+
expected_changes = [
438+
sysrepo.ChangeCreated(
439+
"/sysrepo-example:conf/network/interface[name='eth0']",
440+
{"name": "eth0", "up": False},
441+
after="[name='eth2']",
442+
),
443+
sysrepo.ChangeCreated(
444+
"/sysrepo-example:conf/network/interface[name='eth0']/name", "eth0"
445+
),
446+
sysrepo.ChangeCreated(
447+
"/sysrepo-example:conf/network/interface[name='eth0']/up", False
448+
),
449+
]
450+
ch_sess.replace_config(sent_config, "sysrepo-example", strict=True)
451+
self.assertEqual(current_config, sent_config)
452+
# 7.
453+
sent_config = {
454+
"conf": {
455+
"system": {"hostname": "bar"},
456+
"network": {
457+
"interface": [
458+
{"name": "eth0", "up": False},
459+
{"name": "eth2", "up": True},
460+
]
461+
},
462+
}
463+
}
464+
expected_changes = [
465+
sysrepo.ChangeMoved(
466+
"/sysrepo-example:conf/network/interface[name='eth0']", after=""
467+
),
468+
]
469+
ch_sess.replace_config(sent_config, "sysrepo-example", strict=True)
470+
self.assertEqual(current_config, sent_config)

0 commit comments

Comments
 (0)