Skip to content

Commit

Permalink
Merge pull request #905 from minrk/client-thread-output
Browse files Browse the repository at this point in the history
detach io thread output from creation cell
  • Loading branch information
minrk authored Oct 30, 2024
2 parents d7713f6 + e534ef1 commit 204d777
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 6 deletions.
4 changes: 3 additions & 1 deletion ipyparallel/_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from tornado.ioloop import IOLoop

from ipyparallel.util import _OutputProducingThread as Thread


def _asyncio_run(coro):
"""Like asyncio.run, but works when there's no event loop"""
Expand Down Expand Up @@ -41,7 +43,7 @@ def _in_thread(self, async_f, *args, **kwargs):
"""Run an async function in a background thread"""
if self._async_thread is None:
self._loop_started = threading.Event()
self._async_thread = threading.Thread(target=self._thread_main, daemon=True)
self._async_thread = Thread(target=self._thread_main, daemon=True)
self._async_thread.start()
self._loop_started.wait(timeout=5)

Expand Down
3 changes: 2 additions & 1 deletion ipyparallel/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from functools import partial
from getpass import getpass
from pprint import pprint
from threading import Event, Thread, current_thread
from threading import Event, current_thread

import jupyter_client.session
import zmq
Expand Down Expand Up @@ -49,6 +49,7 @@
import ipyparallel as ipp
from ipyparallel import error, serialize, util
from ipyparallel.serialize import PrePickled, Reference
from ipyparallel.util import _OutputProducingThread as Thread

from .asyncresult import AsyncHubResult, AsyncResult
from .futures import MessageFuture, multi_future
Expand Down
7 changes: 4 additions & 3 deletions ipyparallel/cluster/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from traitlets.config.configurable import LoggingConfigurable

from ..traitlets import entry_points
from ..util import _OutputProducingThread as Thread
from ..util import shlex_join
from ._winhpcjob import IPControllerJob, IPControllerTask, IPEngineSetJob, IPEngineTask
from .shellcmd import ShellCommandSend
Expand Down Expand Up @@ -524,7 +525,7 @@ def _start_waiting(self):
# ensure self.loop is accessed on the main thread before waiting
self.loop
self._stop_waiting = threading.Event()
self._wait_thread = threading.Thread(
self._wait_thread = Thread(
target=self._wait, daemon=True, name=f"wait(pid={self.pid})"
)
self._wait_thread.start()
Expand Down Expand Up @@ -583,7 +584,7 @@ def _stream_file(self, path):
time.sleep(0.1)

def _start_streaming(self):
self._stream_thread = t = threading.Thread(
self._stream_thread = t = Thread(
target=partial(self._stream_file, self.output_file),
name=f"Stream Output {self.identifier}",
daemon=True,
Expand Down Expand Up @@ -1352,7 +1353,7 @@ def _start_waiting(self):
# ensure self.loop is accessed on the main thread before waiting
self.loop
self._stop_waiting = threading.Event()
self._wait_thread = threading.Thread(
self._wait_thread = Thread(
target=self._wait,
daemon=True,
name=f"wait(host={self.location}, pid={self.pid})",
Expand Down
34 changes: 33 additions & 1 deletion ipyparallel/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
import sys
import warnings
from datetime import datetime, timezone
from functools import lru_cache
from functools import lru_cache, partial
from signal import SIGABRT, SIGINT, SIGTERM, signal
from threading import Thread, current_thread
from types import FunctionType

import traitlets
Expand Down Expand Up @@ -804,3 +805,34 @@ def connect(
socket.setsockopt(zmq.CURVE_SECRETKEY, curve_secretkey)
socket.setsockopt(zmq.CURVE_PUBLICKEY, curve_publickey)
return socket.connect(url)


def _detach_thread_output(ident=None):
"""undo thread-parent mapping in ipykernel#1186"""
# disable ipykernel's association of thread output with the cell that
# spawned the thread.
# there should be a public API for this...
if ident is None:
ident = current_thread().ident
for stream in (sys.stdout, sys.stderr):
for name in ("_thread_to_parent", "_thread_to_parent_header"):
mapping = getattr(stream, name, None)
if mapping:
mapping.pop(ident, None)


class _OutputProducingThread(Thread):
"""
Subclass Thread to workaround bug in ipykernel
associating thread output with wrong Cell
See https://github.com/ipython/ipykernel/issues/1289
"""

def __init__(self, target, **kwargs):
wrapped_target = partial(self._wrapped_target, target)
super().__init__(target=wrapped_target, **kwargs)

def _wrapped_target(self, target, *args, **kwargs):
_detach_thread_output(self.ident)
return target(*args, **kwargs)

0 comments on commit 204d777

Please sign in to comment.