|
4 | 4 | from functools import partial |
5 | 5 | from inspect import isawaitable |
6 | 6 | from logging import Logger, getLogger |
7 | | -from typing import Awaitable, Callable |
| 7 | +from typing import Awaitable, Callable, Tuple |
8 | 8 |
|
9 | 9 | from anyio import ( |
10 | 10 | TASK_STATUS_IGNORED, |
@@ -37,12 +37,11 @@ class YRoom: |
37 | 37 | _on_message: Callable[[bytes], Awaitable[bool] | bool] | None |
38 | 38 | _update_send_stream: MemoryObjectSendStream |
39 | 39 | _update_receive_stream: MemoryObjectReceiveStream |
40 | | - _task_group: TaskGroup | None = None |
41 | | - _started: Event | None = None |
| 40 | + _task_group: TaskGroup | None |
| 41 | + _started: Event | None |
42 | 42 | _stopped: Event |
43 | | - __start_lock: Lock | None = None |
44 | | - _subscription: Subscription | None = None |
45 | | - |
| 43 | + __start_lock: Lock | None |
| 44 | + _subscription: Subscription | None |
46 | 45 | def __init__( |
47 | 46 | self, |
48 | 47 | ready: bool = True, |
@@ -82,6 +81,13 @@ def __init__( |
82 | 81 | self._on_message = None |
83 | 82 | self.exception_handler = exception_handler |
84 | 83 | self._stopped = Event() |
| 84 | + self._update_send_stream, self._update_receive_stream = create_memory_object_stream( |
| 85 | + max_buffer_size=65536 |
| 86 | + ) |
| 87 | + self._task_group = None |
| 88 | + self._started = None |
| 89 | + self.__start_lock = None |
| 90 | + self._subscription= None |
85 | 91 |
|
86 | 92 | @property |
87 | 93 | def _start_lock(self) -> Lock: |
@@ -230,6 +236,7 @@ async def stop(self) -> None: |
230 | 236 | self._stopped.set() |
231 | 237 | self._task_group.cancel_scope.cancel() |
232 | 238 | self._task_group = None |
| 239 | + await self.ystore.stop() |
233 | 240 | if self._subscription is not None: |
234 | 241 | self.ydoc.unobserve(self._subscription) |
235 | 242 |
|
|
0 commit comments