Open
Description
I was using a event_loop
fixture to use a custom event loop class, and noticed the warning from pytest to not do that and use an event loop policy instead. However, no matter how I define my policy fixture pytest warns that I didn't close the event loop. The policy usually isn't responsible for closing the event loop though, so I assume this is a pytest bug? Any tips on what I should be doing appreciated.
class CustomEventLoop(asyncio.SelectorEventLoop):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class CustomEventLoopPolicy(asyncio.AbstractEventLoopPolicy):
def __init__(self) -> None:
self._loop: asyncio.AbstractEventLoop | None = None
# Use the default policy for child watcher methods
self._default_policy = asyncio.get_event_loop_policy()
def get_event_loop(self) -> asyncio.AbstractEventLoop:
if self._loop is None or self._loop.is_closed():
self._loop = self.new_event_loop()
return self._loop
def new_event_loop(self) -> asyncio.AbstractEventLoop:
return CustomEventLoop()
def set_event_loop(self, loop: asyncio.AbstractEventLoop | None) -> None:
assert isinstance(loop, TaskTrackingEventLoop)
self._loop = loop
def get_child_watcher(self) -> asyncio.AbstractChildWatcher:
# Delegate to the default policy's child watcher
return self._default_policy.get_child_watcher()
def set_child_watcher(self, watcher: asyncio.AbstractChildWatcher) -> None:
# Delegate to the default policy's method
self._default_policy.set_child_watcher(watcher)
@pytest.fixture(scope="module")
def event_loop_policy() -> Generator[TaskTrackingEventLoopPolicy | None, None, None]:
policy = CustomEventLoopPolicy()
yield policy
policy.get_event_loop().close() # makes no difference