Skip to content

Commit 536b1cb

Browse files
authored
PYTHON-5406 - AsyncPeriodicExecutor must reset CSOT contextvars befor… (#2360)
1 parent 6d33d4f commit 536b1cb

File tree

4 files changed

+53
-0
lines changed

4 files changed

+53
-0
lines changed

pymongo/_csot.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@
3232
DEADLINE: ContextVar[float] = ContextVar("DEADLINE", default=float("inf"))
3333

3434

35+
def reset_all() -> None:
36+
TIMEOUT.set(None)
37+
RTT.set(0.0)
38+
DEADLINE.set(float("inf"))
39+
40+
3541
def get_timeout() -> Optional[float]:
3642
return TIMEOUT.get(None)
3743

pymongo/periodic_executor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import weakref
2424
from typing import Any, Optional
2525

26+
from pymongo import _csot
2627
from pymongo._asyncio_task import create_task
2728
from pymongo.lock import _create_lock
2829

@@ -93,6 +94,8 @@ def skip_sleep(self) -> None:
9394
self._skip_sleep = True
9495

9596
async def _run(self) -> None:
97+
# The CSOT contextvars must be cleared inside the executor task before execution begins
98+
_csot.reset_all()
9699
while not self._stopped:
97100
if self._task and self._task.cancelling(): # type: ignore[unused-ignore, attr-defined]
98101
raise asyncio.CancelledError
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Copyright 2025-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Test that AsyncPeriodicExecutors do not copy ContextVars from their parents."""
16+
from __future__ import annotations
17+
18+
import asyncio
19+
import sys
20+
from test.asynchronous.utils import async_get_pool
21+
from test.utils_shared import delay, one
22+
23+
sys.path[0:0] = [""]
24+
25+
from test.asynchronous import AsyncIntegrationTest
26+
27+
28+
class TestAsyncContextVarsReset(AsyncIntegrationTest):
29+
async def test_context_vars_are_reset_in_executor(self):
30+
if sys.version_info < (3, 11):
31+
self.skipTest("Test requires asyncio.Task.get_context (added in Python 3.11)")
32+
33+
client = self.simple_client()
34+
35+
await client.db.test.insert_one({"x": 1})
36+
for server in client._topology._servers.values():
37+
for context in [
38+
c
39+
for c in server._monitor._executor._task.get_context()
40+
if c.name in ["TIMEOUT", "RTT", "DEADLINE"]
41+
]:
42+
self.assertIn(context.get(), [None, float("inf"), 0.0])
43+
await client.db.test.delete_many({})

tools/synchro.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ def async_only_test(f: str) -> bool:
185185
"test_concurrency.py",
186186
"test_async_cancellation.py",
187187
"test_async_loop_safety.py",
188+
"test_async_contextvars_reset.py",
188189
]
189190

190191

0 commit comments

Comments
 (0)