Skip to content

Commit 4f69add

Browse files
committed
Add main_thread_only execmodel
In order to prevent tasks from running in a non-main thread, wait for the previous task inside _try_send_to_primary_thread, then schedule the next task. Add a main_thread_only execmodel to distinguish this new behavior from the existing thread execmodel, since users of the thread execmodel expect that tasks can run in multiple threads concurrently. Closes: pytest-dev#96
1 parent 372168e commit 4f69add

File tree

5 files changed

+46
-16
lines changed

5 files changed

+46
-16
lines changed

doc/basics.rst

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,14 @@ processes then you often want to call ``group.terminate()``
138138
yourself and specify a larger or not timeout.
139139

140140

141-
threading models: gevent, eventlet, thread
142-
===========================================
141+
threading models: gevent, eventlet, thread, main_thread_only
142+
====================================================================
143143

144144
.. versionadded:: 1.2 (status: experimental!)
145145

146-
execnet supports "thread", "eventlet" and "gevent" as thread models
147-
on each of the two sides. You need to decide which model to use
148-
before you create any gateways::
146+
execnet supports "main_thread_only", "thread", "eventlet" and "gevent"
147+
as thread models on each of the two sides. You need to decide which
148+
model to use before you create any gateways::
149149

150150
# content of threadmodel.py
151151
import execnet

src/execnet/gateway_base.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ def Event(self):
252252
def get_execmodel(backend):
253253
if hasattr(backend, "backend"):
254254
return backend
255-
if backend == "thread":
255+
if backend in ("thread", "main_thread_only"):
256256
return ThreadExecModel()
257257
elif backend == "eventlet":
258258
return EventletExecModel()
@@ -322,7 +322,7 @@ def __init__(self, execmodel, hasprimary=False):
322322
self._shuttingdown = False
323323
self._waitall_events = []
324324
if hasprimary:
325-
if self.execmodel.backend != "thread":
325+
if self.execmodel.backend not in ("thread", "main_thread_only"):
326326
raise ValueError("hasprimary=True requires thread model")
327327
self._primary_thread_task_ready = self.execmodel.Event()
328328
else:
@@ -332,7 +332,7 @@ def integrate_as_primary_thread(self):
332332
"""integrate the thread with which we are called as a primary
333333
thread for executing functions triggered with spawn().
334334
"""
335-
assert self.execmodel.backend == "thread", self.execmodel
335+
assert self.execmodel.backend in ("thread", "main_thread_only"), self.execmodel
336336
primary_thread_task_ready = self._primary_thread_task_ready
337337
# interacts with code at REF1
338338
while 1:
@@ -345,7 +345,11 @@ def integrate_as_primary_thread(self):
345345
with self._running_lock:
346346
if self._shuttingdown:
347347
break
348-
primary_thread_task_ready.clear()
348+
# Only clear if _try_send_to_primary_thread has not
349+
# yet set the next self._primary_thread_task reply
350+
# after waiting for this one to complete.
351+
if reply is self._primary_thread_task:
352+
primary_thread_task_ready.clear()
349353

350354
def trigger_shutdown(self):
351355
with self._running_lock:
@@ -376,6 +380,19 @@ def _try_send_to_primary_thread(self, reply):
376380
# wake up primary thread
377381
primary_thread_task_ready.set()
378382
return True
383+
elif (
384+
self.execmodel.backend == "main_thread_only"
385+
and self._primary_thread_task is not None
386+
):
387+
self._primary_thread_task.waitfinish()
388+
self._primary_thread_task = reply
389+
# wake up primary thread (it's okay if this is already set
390+
# because we waited for the previous task to finish above
391+
# and integrate_as_primary_thread will not clear it when
392+
# it enters self._running_lock if it detects that a new
393+
# task is available)
394+
primary_thread_task_ready.set()
395+
return True
379396
return False
380397

381398
def spawn(self, func, *args, **kwargs):
@@ -1106,7 +1123,18 @@ def join(self, timeout=None):
11061123
class WorkerGateway(BaseGateway):
11071124
def _local_schedulexec(self, channel, sourcetask):
11081125
sourcetask = loads_internal(sourcetask)
1109-
self._execpool.spawn(self.executetask, (channel, sourcetask))
1126+
if self.execmodel.backend == "main_thread_only":
1127+
# TODO: Maybe use something like queue.Queue to queue an asynchronous
1128+
# spawn here in order to avoid using another thread.
1129+
import threading
1130+
1131+
t = threading.Thread(
1132+
target=self._execpool.spawn,
1133+
args=(self.executetask, (channel, sourcetask)),
1134+
)
1135+
t.start()
1136+
else:
1137+
self._execpool.spawn(self.executetask, (channel, sourcetask))
11101138

11111139
def _terminate_execution(self):
11121140
# called from receiverthread
@@ -1132,7 +1160,7 @@ def serve(self):
11321160
def trace(msg):
11331161
self._trace("[serve] " + msg)
11341162

1135-
hasprimary = self.execmodel.backend == "thread"
1163+
hasprimary = self.execmodel.backend in ("thread", "main_thread_only")
11361164
self._execpool = WorkerPool(self.execmodel, hasprimary=hasprimary)
11371165
trace("spawning receiver thread")
11381166
self._initreceive()

testing/conftest.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def anypython(request):
124124
pytest.skip(f"no {name} found")
125125
if "execmodel" in request.fixturenames and name != "sys.executable":
126126
backend = request.getfixturevalue("execmodel").backend
127-
if backend != "thread":
127+
if backend not in ("thread", "main_thread_only"):
128128
pytest.xfail(f"cannot run {backend!r} execmodel with bare {name}")
129129
return executable
130130

@@ -173,7 +173,9 @@ def gw(request, execmodel, group):
173173
return gw
174174

175175

176-
@pytest.fixture(params=["thread", "eventlet", "gevent"], scope="session")
176+
@pytest.fixture(
177+
params=["thread", "main_thread_only", "eventlet", "gevent"], scope="session"
178+
)
177179
def execmodel(request):
178180
if request.param != "thread":
179181
pytest.importorskip(request.param)

testing/test_termination.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def doit():
3636

3737

3838
def test_endmarker_delivery_on_remote_killterm(makegateway, execmodel):
39-
if execmodel.backend != "thread":
39+
if execmodel.backend not in ("thread", "main_thread_only"):
4040
pytest.xfail("test and execnet not compatible to greenlets yet")
4141
gw = makegateway("popen")
4242
q = execmodel.queue.Queue()

testing/test_threadpool.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def wait_then_put():
164164

165165

166166
def test_primary_thread_integration(execmodel):
167-
if execmodel.backend != "thread":
167+
if execmodel.backend not in ("thread", "main_thread_only"):
168168
with pytest.raises(ValueError):
169169
WorkerPool(execmodel=execmodel, hasprimary=True)
170170
return
@@ -188,7 +188,7 @@ def func():
188188

189189

190190
def test_primary_thread_integration_shutdown(execmodel):
191-
if execmodel.backend != "thread":
191+
if execmodel.backend not in ("thread", "main_thread_only"):
192192
pytest.skip("can only run with threading")
193193
pool = WorkerPool(execmodel=execmodel, hasprimary=True)
194194
queue = execmodel.queue.Queue()

0 commit comments

Comments
 (0)