-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathsubscription.py
642 lines (561 loc) · 24 KB
/
subscription.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
# Copyright (c) 2020 6WIND S.A.
# SPDX-License-Identifier: BSD-3-Clause
import asyncio
import functools
import logging
from typing import Any, Callable
from libyang.data import DNode
from _sysrepo import ffi, lib
from .errors import SysrepoError, check_call
from .util import c2str, is_async_func
LOG = logging.getLogger(__name__)
# ------------------------------------------------------------------------------
class Subscription:
"""
Python representation of `sr_subscription_ctx_t *`.
.. attention::
Do not instantiate this class manually, use `SysrepoSession.subscribe_*`.
"""
def __init__(
self,
callback: Callable,
private_data: Any = None,
asyncio_register: bool = False,
strict: bool = False,
include_implicit_defaults: bool = True,
include_deleted_values: bool = False,
extra_info: bool = False,
unsafe: bool = False,
):
"""
:arg callback:
The python callback function or coroutine function used when subscribing.
:arg private_data:
Opaque data used when subscribing, will be passed to callback.
:arg asyncio_register:
Add this subscription event pipe into asyncio event loop monitored file
descriptors. When the event pipe becomes readable, call process_events().
:arg strict:
If True, reject data with no schema definition from rpc output parameters
and operational data callbacks. Otherwise, ignore unknown data and log a
warning message.
:arg include_implicit_defaults:
If True, include implicit default nodes into Change objects passed to module
change callbacks and into input parameters passed to RPC/action callbacks.
:arg include_deleted_values:
If True, include the complete deleted node values into Change objects passed
to module change callbacks.
:arg extra_info:
When True, the given callback is called with extra keyword arguments
containing extra information of the sysrepo session that gave origin to the
event
:arg unsafe:
When True, the given callback returns implicit session.
"""
if is_async_func(callback) and not asyncio_register:
raise ValueError(
"%s is an async function, asyncio_register is mandatory" % callback
)
self.callback = callback
self.private_data = private_data
self.asyncio_register = asyncio_register
self.strict = strict
self.include_implicit_defaults = include_implicit_defaults
self.include_deleted_values = include_deleted_values
self.extra_info = extra_info
if asyncio_register:
self.loop = asyncio.get_event_loop()
else:
self.loop = None
self.tasks = {}
self.cdata = None
self.fd = -1
self.handle = ffi.new_handle(self)
self.unsafe = unsafe
def init(self, cdata) -> None:
"""
Initialization of this object is not complete after calling __init__. The
sr_subscription_ctx_t object is allocated by sysrepo when calling one of
sr_*_subscribe functions and we need to pass self.handle to these functions so
that this subscription can be forwarded to C callbacks.
Subscription.init() is called just after sr_*_subscribe functions to complete
initialization. See SysrepoSession.subscribe_* functions for more details.
if self.asyncio_register is True, add this subscription event pipe to the
monitored file descriptors for reading in asyncio event loop.
:arg "sr_subscription_ctx_t *" cdata:
The subscription pointer allocated by sysrepo.
"""
if self.cdata is not None:
raise RuntimeError("init was already called once")
self.cdata = cdata
if self.asyncio_register:
self.fd = self.get_fd()
self.loop.add_reader(self.fd, self.process_events)
def get_fd(self) -> int:
"""
Get the event pipe of a subscription. Event pipe can be used in `select()`,
`poll()`, or similar functions to listen for new events. It will then be ready
for reading.
"""
fd_p = ffi.new("int *")
check_call(lib.sr_get_event_pipe, self.cdata, fd_p)
return fd_p[0]
def unsubscribe(self) -> None:
"""
Unsubscribes from a subscription acquired by any of sr_*_subscribe calls and
releases all subscription-related data.
Removes self.fd from asyncio event loop monitored file descriptors.
"""
if self.cdata is None:
return
if self.asyncio_register and self.fd != -1:
self.loop.remove_reader(self.fd)
try:
check_call(lib.sr_unsubscribe, self.cdata)
finally:
self.cdata = None
for t in list(self.tasks.values()):
t.cancel()
self.tasks.clear()
def process_events(self) -> None:
"""
Called when self.fd becomes readable.
"""
check_call(lib.sr_subscription_process_events, self.cdata, ffi.NULL, ffi.NULL)
def task_done(self, task_id: Any, event: str, task: asyncio.Task) -> None:
"""
Called when self.callback is an async function/method and it has finished. This
calls self.process_events() so that the C callback is invoked again with the
same arguments (request_id, event) and we can return the actual result.
"""
if task.cancelled():
self.tasks.pop(task_id, None)
return
try:
if event in ("update", "change", "rpc", "oper"):
# The task result will be evaluated in the C callback.
# It will return the result to sysrepo.
self.process_events()
else:
# Sysrepo does not care about the result of the callback.
# This will raise the exception here if any occured in the task
# and will be logged (i.e. not lost).
self.tasks.pop(task_id, None)
task.result()
except Exception:
LOG.exception("failure in task: %r", task)
# ------------------------------------------------------------------------------
EVENT_NAMES = {
lib.SR_EV_UPDATE: "update",
lib.SR_EV_CHANGE: "change",
lib.SR_EV_DONE: "done",
lib.SR_EV_ABORT: "abort",
lib.SR_EV_ENABLED: "enabled",
lib.SR_EV_RPC: "rpc",
}
NOTIF_TYPES = {
lib.SR_EV_NOTIF_REALTIME: "realtime",
lib.SR_EV_NOTIF_REPLAY: "replay",
lib.SR_EV_NOTIF_REPLAY_COMPLETE: "replay_complete",
lib.SR_EV_NOTIF_TERMINATED: "terminated",
lib.SR_EV_NOTIF_SUSPENDED: "suspended",
lib.SR_EV_NOTIF_RESUMED: "resumed",
}
# ------------------------------------------------------------------------------
@ffi.def_extern(name="srpy_module_change_cb")
def module_change_callback(session, sub_id, module, xpath, event, req_id, priv):
"""
Callback to be called on the event of changing datastore content of the specified
module.
This python function mapped to the C srpy_module_change_cb function. When the C
srpy_module_change_cb function is called by libsysrepo.so, this function is called
with the same arguments.
:arg "sr_session_ctx_t *" session:
Implicit session (do not stop) with information about the changed data.
:arg "uint32_t" sub_id:
Subscription ID.
:arg "const char *" module:
Name of the module where the change has occurred.
:arg "const char *" xpath:
XPath used when subscribing, NULL if the whole module was subscribed to.
:arg "sr_event_t" event:
Type of the callback event that has occurred.
:arg "uint32_t" req_id:
Request ID unique for the specific module_name. Connected events for one request
(SR_EV_CHANGE and SR_EV_DONE, for example) have the same request ID.
:arg "void *" priv:
Private context opaque to sysrepo. Contains a CFFI handle to the Subscription
python object.
:returns:
User error code (sr_error_t).
:raises:
IMPORTANT: This function *CANNOT* raise any exception. The C callstack does not
handle that well and when it happens the outcome is undetermined. Make sure to
catch all errors and log them so they are not lost.
"""
try:
# convert C arguments to python objects.
from .session import SysrepoSession # circular import
session = SysrepoSession(session, True)
module = c2str(module)
xpath = c2str(xpath)
root_xpath = ("/%s:*" % module) if xpath is None else xpath
subscription = ffi.from_handle(priv)
callback = subscription.callback
private_data = subscription.private_data
event_name = EVENT_NAMES[event]
if subscription.unsafe:
callback(session, root_xpath, event_name, req_id, private_data)
return lib.SR_ERR_OK
if subscription.extra_info:
try:
extra_info = {
"netconf_id": session.get_netconf_id(),
"user": session.get_user(),
}
except SysrepoError:
extra_info = {
"netconf_id": -1,
"user": "",
}
else:
extra_info = {}
if is_async_func(callback):
task_id = (event, req_id)
if task_id not in subscription.tasks:
# ATTENTION: the implicit session passed as argument will be
# freed when this function returns. The async callback must NOT
# keep a reference on it as it will be invalid. Changes must be
# gathered now.
changes = list(
session.get_changes(
root_xpath + "//.",
include_implicit_defaults=subscription.include_implicit_defaults,
include_deleted_values=subscription.include_deleted_values,
)
)
task = subscription.loop.create_task(
callback(event_name, req_id, changes, private_data, **extra_info)
)
task.add_done_callback(
functools.partial(subscription.task_done, task_id, event_name)
)
subscription.tasks[task_id] = task
if event not in (lib.SR_EV_UPDATE, lib.SR_EV_CHANGE):
# Return immediately, process_events will not be called in
# subscription.task_done. Sysrepo does not care about the
# result of the operation.
return lib.SR_ERR_OK
task = subscription.tasks[task_id]
if not task.done():
return lib.SR_ERR_CALLBACK_SHELVE
del subscription.tasks[task_id]
task.result() # raise error if any
else:
changes = list(
session.get_changes(
root_xpath + "//.",
include_implicit_defaults=subscription.include_implicit_defaults,
include_deleted_values=subscription.include_deleted_values,
)
)
callback(event_name, req_id, changes, private_data, **extra_info)
return lib.SR_ERR_OK
except SysrepoError as e:
if (
event in (lib.SR_EV_UPDATE, lib.SR_EV_CHANGE)
and e.msg
and isinstance(session, SysrepoSession)
and isinstance(xpath, str)
):
session.set_error(e.msg)
return e.rc
except BaseException as e:
# ATTENTION: catch all exceptions!
# including KeyboardInterrupt, CancelledError, etc.
# We are in a C callback, we cannot let any error pass
LOG.exception("%r callback failed", locals().get("callback", priv))
if (
event in (lib.SR_EV_UPDATE, lib.SR_EV_CHANGE)
and isinstance(session, SysrepoSession)
and isinstance(xpath, str)
):
session.set_error(str(e))
return lib.SR_ERR_CALLBACK_FAILED
# ------------------------------------------------------------------------------
@ffi.def_extern(name="srpy_oper_data_cb")
def oper_data_callback(session, sub_id, module, xpath, req_xpath, req_id, parent, priv):
"""
Callback to be called when operational data at the selected xpath are requested.
:arg "sr_session_ctx_t *" session:
Implicit session (do not stop).
:arg "uint32_t" sub_id:
Subscription ID.
:arg "const char *" module:
Name of the affected module.
:arg "const char *" xpath:
XPath identifying the subtree that is supposed to be provided, same as the one
used for the subscription.
:arg "const char *" req_xpath:
XPath as requested by a client. Can be NULL.
:arg "uint32_t" req_id:
Request ID unique for the specific module name.
:arg "struct lyd_node **" parent:
Pointer to an existing parent of the requested nodes. Is NULL for top-level
nodes. Callback is supposed to append the requested nodes to this data subtree
and return either the original parent or a top-level node.
:arg "void *" priv:
Private context opaque to sysrepo. Contains a CFFI handle to the Subscription
python object.
:returns:
User error code (sr_error_t).
:raises:
IMPORTANT: This function *CANNOT* raise any exception. The C callstack does not
handle that well and when it happens the outcome is undetermined. Make sure to
catch all errors and log them so they are not lost.
"""
try:
# convert C arguments to python objects.
from .session import SysrepoSession # circular import
session = SysrepoSession(session, True)
module = c2str(module)
xpath = c2str(xpath)
req_xpath = c2str(req_xpath)
subscription = ffi.from_handle(priv)
callback = subscription.callback
private_data = subscription.private_data
if subscription.extra_info:
extra_info = {
"netconf_id": session.get_netconf_id(),
"user": session.get_user(),
}
else:
extra_info = {}
if is_async_func(callback):
task_id = req_id
if task_id not in subscription.tasks:
task = subscription.loop.create_task(
callback(req_xpath, private_data, **extra_info)
)
task.add_done_callback(
functools.partial(subscription.task_done, task_id, "oper")
)
subscription.tasks[task_id] = task
task = subscription.tasks[task_id]
if not task.done():
return lib.SR_ERR_CALLBACK_SHELVE
del subscription.tasks[task_id]
oper_data = task.result()
else:
oper_data = callback(req_xpath, private_data, **extra_info)
if isinstance(oper_data, dict):
# convert oper_data to a libyang.DNode object
with session.get_ly_ctx() as ly_ctx:
dnode = ly_ctx.get_module(module).parse_data_dict(
oper_data, strict=subscription.strict, validate=False
)
if dnode is not None:
if parent[0]:
root = DNode.new(ly_ctx, parent[0]).root()
root.merge(dnode, destruct=True)
else:
# The FFI bindings of libyang and sysrepo are different.
# Casting is required.
parent[0] = ffi.cast("struct lyd_node *", dnode.cdata)
elif oper_data is not None:
raise TypeError(
"bad return type from %s (expected dict or None)" % callback
)
return lib.SR_ERR_OK
except SysrepoError as e:
if e.msg and isinstance(session, SysrepoSession) and isinstance(xpath, str):
session.set_error(e.msg)
return e.rc
except BaseException as e:
# ATTENTION: catch all exceptions!
# including KeyboardInterrupt, CancelledError, etc.
# We are in a C callback, we cannot let any error pass
LOG.exception("%r callback failed", locals().get("callback", priv))
if isinstance(session, SysrepoSession) and isinstance(xpath, str):
session.set_error(str(e))
return lib.SR_ERR_CALLBACK_FAILED
# ------------------------------------------------------------------------------
@ffi.def_extern(name="srpy_rpc_tree_cb")
def rpc_callback(session, sub_id, xpath, input_node, event, req_id, output_node, priv):
"""
Callback to be called for the delivery of an RPC/action.
:arg "sr_session_ctx_t *" session:
Implicit session (do not stop).
:arg "uint32_t" sub_id:
Subscription ID.
:arg "const char *" xpath:
Simple operation path identifying the RPC/action.
:arg "const struct lyd_node *" input_node:
Data tree of input parameters.
:arg "sr_event_t" event:
Type of the callback event that has occurred.
:arg "uint32_t" req_id:
Request ID unique for the specific xpath.
:arg "struct lyd_node *" output_node:
Data tree of output parameters. Should be allocated on heap, will be freed by
sysrepo after sending of the RPC response.
:arg "void *" priv:
Private context opaque to sysrepo. Contains a CFFI handle to the Subscription
python object.
:returns:
User error code (sr_error_t).
:raises:
IMPORTANT: This function *CANNOT* raise any exception. The C callstack does not
handle that well and when it happens the outcome is undetermined. Make sure to
catch all errors and log them so they are not lost.
"""
try:
# convert C arguments to python objects.
from .session import SysrepoSession # circular import
session = SysrepoSession(session, True)
subscription = ffi.from_handle(priv)
callback = subscription.callback
private_data = subscription.private_data
event_name = EVENT_NAMES[event]
with session.get_ly_ctx() as ly_ctx:
rpc_input = DNode.new(ly_ctx, input_node)
xpath = rpc_input.path()
# strip all parents, only preserve the input tree
input_dict = next(
iter(
rpc_input.print_dict(
include_implicit_defaults=subscription.include_implicit_defaults,
absolute=False,
).values()
)
)
if subscription.extra_info:
extra_info = {
"netconf_id": session.get_netconf_id(),
"user": session.get_user(),
}
else:
extra_info = {}
if is_async_func(callback):
task_id = (event, req_id)
if task_id not in subscription.tasks:
task = subscription.loop.create_task(
callback(xpath, input_dict, event_name, private_data, **extra_info)
)
task.add_done_callback(
functools.partial(subscription.task_done, task_id, event_name)
)
subscription.tasks[task_id] = task
task = subscription.tasks[task_id]
if not task.done():
return lib.SR_ERR_CALLBACK_SHELVE
del subscription.tasks[task_id]
output_dict = task.result()
else:
output_dict = callback(
xpath, input_dict, event_name, private_data, **extra_info
)
if event != lib.SR_EV_RPC:
# May happen when there are multiple callback registered for the
# same RPC. If one of the callbacks has failed, the other ones will
# be called with SR_EV_ABORT. In that case, abort early and do
# not return the RPC output data to sysrepo.
return lib.SR_ERR_OK
if isinstance(output_dict, dict):
# update output_node with contents of output_dict
with session.get_ly_ctx() as ly_ctx:
DNode.new(ly_ctx, output_node).merge_data_dict(
output_dict,
rpcreply=True,
strict=subscription.strict,
validate=False,
)
elif output_dict is not None:
raise TypeError(
"bad return type from %s (expected dict or None)" % callback
)
return lib.SR_ERR_OK
except SysrepoError as e:
if e.msg and isinstance(session, SysrepoSession) and isinstance(xpath, str):
session.set_error(e.msg)
return e.rc
except BaseException as e:
# ATTENTION: catch all exceptions!
# including KeyboardInterrupt, CancelledError, etc.
# We are in a C callback, we cannot let any error pass
LOG.exception("%r callback failed", locals().get("callback", priv))
if isinstance(session, SysrepoSession) and isinstance(xpath, str):
session.set_error(str(e))
return lib.SR_ERR_CALLBACK_FAILED
# ------------------------------------------------------------------------------
@ffi.def_extern(name="srpy_event_notif_tree_cb")
def event_notif_tree_callback(session, sub_id, notif_type, notif, timestamp, priv):
"""
Callback to be called when a notification is received.
:arg "sr_session_ctx_t *" session:
Implicit session (do not stop).
:arg "uint32_t" sub_id:
Subscription ID.
:arg "sr_ev_notif_type_t" notif_type:
Type of the notification event that has occurred.
:arg "const struct lyd_node *" notif:
Data tree of input parameters.
:arg "struct timespec*" timestamp:
Timestamp of the notification.
:arg "void *" priv:
Private context opaque to sysrepo. Contains a CFFI handle to the Subscription
python object.
:returns:
None
:raises:
IMPORTANT: This function *CANNOT* raise any exception. The C callstack does not
handle that well and when it happens the outcome is undetermined. Make sure to
catch all errors and log them so they are not lost.
"""
try:
notif_type = NOTIF_TYPES[notif_type]
if notif_type == "terminated" or notif == ffi.NULL:
return
# convert C arguments to python objects.
from .session import SysrepoSession # circular import
timestamp = timestamp.tv_sec
session = SysrepoSession(session, True)
subscription = ffi.from_handle(priv)
callback = subscription.callback
private_data = subscription.private_data
with session.get_ly_ctx() as ly_ctx:
notif_dnode = DNode.new(ly_ctx, notif)
xpath = notif_dnode.path()
notif_dict = next(
iter(
notif_dnode.print_dict(
include_implicit_defaults=subscription.include_implicit_defaults,
absolute=False,
).values()
)
)
if subscription.extra_info:
extra_info = {
"netconf_id": session.get_netconf_id(),
"user": session.get_user(),
}
else:
extra_info = {}
if is_async_func(callback):
task = subscription.loop.create_task(
callback(
xpath, notif_type, notif_dict, timestamp, private_data, **extra_info
)
)
task.add_done_callback(
functools.partial(subscription.task_done, None, "notif")
)
else:
callback(
xpath, notif_type, notif_dict, timestamp, private_data, **extra_info
)
except BaseException:
# ATTENTION: catch all exceptions!
# including KeyboardInterrupt, CancelledError, etc.
# We are in a C callback, we cannot let any error pass
LOG.exception("%r callback failed", locals().get("callback", priv))