Skip to content

Commit 928bccd

Browse files
author
Benjamin Berg
committed
Let the GSource dispatch python callbacks
Turn things around and rather than iterating the GLib main context from python iterate the python mainloop from GLib. In order to do this, we need to be able to calculate the timeout (and whether anything can be dispatched) from inside the prepare and check functions of the source. We can do that easily by looking at the _ready and _schedule attributes of the loop. Once we have that, we can dispatch everything scheduled by python by calling _run_once and relying on our select() implementation to return immediately.
1 parent 6bf9545 commit 928bccd

File tree

4 files changed

+163
-51
lines changed

4 files changed

+163
-51
lines changed

asyncio_glib/glib_events.py

Lines changed: 135 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
1+
import sys
12
import asyncio
3+
from asyncio import events
24
import threading
35

46
from gi.repository import GLib
57

68
from . import glib_selector
79

10+
# The override for GLib.MainLoop.run installs a signal wakeup fd,
11+
# which interferes with asyncio signal handlers. Try to get the
12+
# direct version.
13+
try:
14+
g_main_loop_run = super(GLib.MainLoop, GLib.MainLoop).run
15+
except AttributeError:
16+
g_main_loop_run = GLib.MainLoop.run
817

918
__all__ = (
1019
'GLibEventLoop',
@@ -13,20 +22,136 @@
1322

1423

1524
class GLibEventLoop(asyncio.SelectorEventLoop):
16-
"""An asyncio event loop that runs the GLib main loop"""
25+
"""An asyncio event loop that runs the GLib context using a main loop"""
1726

18-
def __init__(self, main_context=None):
19-
if main_context is None:
20-
main_context = GLib.MainContext.default()
21-
selector = glib_selector.GLibSelector(main_context)
27+
# This is based on the selector event loop, but never actually runs select()
28+
# in the strict sense.
29+
# We use the selector to register all FDs with the main context using our
30+
# own GSource. For python timeouts/idle equivalent, we directly query them
31+
# from the context by providing the _get_timeout_ms function that the
32+
# GSource uses. This in turn access _ready and _scheduled to calculate
33+
# the timeout and whether python can dispatch anything non-FD based yet.
34+
#
35+
# To simplify matters, we call the normal _run_once method of the base
36+
# class which will call select(). As we know that we are ready at the time
37+
# that select() will return immediately with the FD information we have
38+
# gathered already.
39+
#
40+
# With that, we just need to override and slightly modify the run_forever
41+
# method so that it calls g_main_loop_run instead of looping _run_once.
42+
43+
def __init__(self, main_context):
44+
# A mainloop in case we want to run our context
45+
assert main_context is not None
46+
self._context = main_context
47+
self._main_loop = GLib.MainLoop.new(self._context, False)
48+
49+
selector = glib_selector.GLibSelector(self._context, self)
2250
super().__init__(selector)
2351

52+
# This is used by run_once to not busy loop if the timeout is floor'ed to zero
53+
self._clock_resolution = 1e-3
54+
55+
def run_forever(self):
56+
self._check_closed()
57+
self._check_running()
58+
self._set_coroutine_origin_tracking(self._debug)
59+
self._thread_id = threading.get_ident()
60+
61+
old_agen_hooks = sys.get_asyncgen_hooks()
62+
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
63+
finalizer=self._asyncgen_finalizer_hook)
64+
try:
65+
events._set_running_loop(self)
66+
g_main_loop_run(self._main_loop)
67+
finally:
68+
self._thread_id = None
69+
events._set_running_loop(None)
70+
self._set_coroutine_origin_tracking(False)
71+
sys.set_asyncgen_hooks(*old_agen_hooks)
72+
73+
def time(self):
74+
return GLib.get_monotonic_time() / 1000000
75+
76+
def _get_timeout_ms(self):
77+
if self._ready:
78+
return 0
79+
80+
if self._scheduled:
81+
timeout = (self._scheduled[0]._when - self.time()) * 1000
82+
return timeout if timeout >= 0 else 0
83+
84+
return -1
85+
86+
def is_running(self):
87+
# If we are currently the owner, then the context is running
88+
# (and we are being dispatched by it)
89+
if self._context.is_owner():
90+
return True
91+
92+
# Otherwise, it might (but shouldn't) be running in a different thread
93+
# Try aquiring it, if that fails, another thread is owning it
94+
if not self._context.acquire():
95+
return True
96+
self._context.release()
97+
98+
return False
99+
100+
def stop(self):
101+
self._main_loop.quit()
24102

25-
class GLibEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
103+
class GLibEventLoopPolicy(events.AbstractEventLoopPolicy):
26104
"""An asyncio event loop policy that runs the GLib main loop"""
27105

106+
_loops = {}
107+
108+
def get_event_loop(self):
109+
"""Get the event loop for the current context.
110+
111+
Returns an event loop object implementing the BaseEventLoop interface,
112+
or raises an exception in case no event loop has been set for the
113+
current context and the current policy does not specify to create one.
114+
115+
It should never return None."""
116+
# Get the thread default main context
117+
ctx = GLib.MainContext.get_thread_default()
118+
# If there is none, and we are on the main thread, then use the default context
119+
if ctx is None and threading.current_thread() is threading.main_thread():
120+
ctx = GLib.MainContext.default()
121+
# Otherwise, if there is still none, create a new one for this thread and push it
122+
if ctx is None:
123+
ctx = GLib.MainContext.new()
124+
ctx.push_thread_default()
125+
126+
# Note: We cannot attach it to ctx, as getting the default will always
127+
# return a new python wrapper. But, we can use hash() as that returns
128+
# the pointer to the C structure.
129+
if ctx in self._loops:
130+
loop = self._loops[ctx]
131+
# If the loop is already closed, then return a new one instead
132+
if not loop._closed:
133+
return loop
134+
135+
self._loops[ctx] = GLibEventLoop(ctx)
136+
return self._loops[ctx]
137+
138+
def set_event_loop(self, loop):
139+
"""Set the event loop for the current context to loop."""
140+
raise NotImplementedError
141+
28142
def new_event_loop(self):
29-
if threading.current_thread() != threading.main_thread():
30-
raise RuntimeError("GLibEventLoopPolicy only allows the main "
31-
"thread to create event loops")
32-
return GLibEventLoop()
143+
"""Create and return a new event loop object according to this
144+
policy's rules. If there's need to set this loop as the event loop for
145+
the current context, set_event_loop must be called explicitly."""
146+
raise NotImplementedError
147+
148+
# Child processes handling (Unix only).
149+
150+
def get_child_watcher(self):
151+
"Get the watcher for child processes."
152+
raise NotImplementedError
153+
154+
def set_child_watcher(self, watcher):
155+
"""Set the watcher for child processes."""
156+
raise NotImplementedError
157+

asyncio_glib/glib_selector.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,41 @@ def __init__(self, selector):
1717
self._selector = selector
1818

1919
def prepare(self):
20-
return False, self._selector._get_timeout_ms()
20+
timeout = self._selector._loop._get_timeout_ms()
21+
22+
# NOTE: Always return False as we query the FDs in only in check
23+
return False, timeout
2124

2225
def check(self):
23-
return False
26+
has_events = False
27+
timeout = self._selector._loop._get_timeout_ms()
28+
if timeout == 0:
29+
has_events = True
2430

25-
def dispatch(self, callback, args):
2631
for (fd, tag) in self._fd_to_tag.items():
2732
condition = self.query_unix_fd(tag)
2833
events = self._fd_to_events.setdefault(fd, 0)
29-
if condition & GLib.IOCondition.IN:
34+
if condition & (GLib.IOCondition.IN | GLib.IOCondition.HUP):
3035
events |= selectors.EVENT_READ
36+
has_events = True
3137
if condition & GLib.IOCondition.OUT:
3238
events |= selectors.EVENT_WRITE
39+
has_events = True
3340
self._fd_to_events[fd] = events
41+
42+
return has_events
43+
44+
def dispatch(self, callback, args):
45+
# Now, wag the dog by its tail
46+
self._selector._loop._run_once()
3447
return GLib.SOURCE_CONTINUE
3548

3649
def register(self, fd, events):
3750
assert fd not in self._fd_to_tag
3851

3952
condition = GLib.IOCondition(0)
4053
if events & selectors.EVENT_READ:
41-
condition |= GLib.IOCondition.IN
54+
condition |= GLib.IOCondition.IN | GLib.IOCondition.HUP
4255
if events & selectors.EVENT_WRITE:
4356
condition |= GLib.IOCondition.OUT
4457
self._fd_to_tag[fd] = self.add_unix_fd(fd, condition)
@@ -56,13 +69,15 @@ def clear(self):
5669

5770
class GLibSelector(selectors._BaseSelectorImpl):
5871

59-
def __init__(self, context):
72+
def __init__(self, context, loop):
6073
super().__init__()
6174
self._context = context
75+
self._loop = loop
6276
self._source = _SelectorSource(self)
6377
self._source.attach(self._context)
6478

6579
def close(self):
80+
self._source.clear()
6681
self._source.destroy()
6782
super().close()
6883

@@ -76,23 +91,13 @@ def unregister(self, fileobj):
7691
self._source.unregister(key.fd)
7792
return key
7893

79-
def _get_timeout_ms(self):
80-
"""Return the timeout for the current select/iteration"""
81-
return self._timeout
82-
8394
def select(self, timeout=None):
84-
# Calling .set_ready_time() always causes a mainloop iteration to finish
85-
if timeout is not None and timeout >= 0:
86-
self._timeout = int(timeout * 1000)
87-
else:
88-
self._timeout = -1
89-
90-
self._source.clear()
91-
self._context.iteration(True)
92-
95+
# Dummy select that just returns immediately with what we already know.
9396
ready = []
9497
for key in self.get_map().values():
9598
events = self._source.get_events(key.fd) & key.events
9699
if events != 0:
97100
ready.append((key, events))
101+
self._source.clear()
102+
98103
return ready

tests/test_glib_events.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77
from asyncio import test_utils
88

99
from asyncio_glib import glib_events
10-
10+
import gi
11+
from gi.repository import GLib
1112

1213
class GLibEventLoopTests(UnixEventLoopTestsMixin, test_utils.TestCase):
1314

1415
def create_event_loop(self):
15-
return glib_events.GLibEventLoop()
16+
return glib_events.GLibEventLoop(main_context=GLib.MainContext.default())
1617

1718
def test_read_pipe(self):
1819
raise unittest.SkipTest("TODO")
@@ -37,8 +38,4 @@ def test_new_event_loop(self):
3738
loop.close()
3839

3940
def test_set_event_loop(self):
40-
policy = self.create_policy()
41-
loop = policy.new_event_loop()
42-
policy.set_event_loop(loop)
43-
self.assertIs(loop, policy.get_event_loop())
44-
loop.close()
41+
raise unittest.SkipTest("Not compatible with GLib")

tests/test_glib_selector.py

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)