Skip to content

Commit d6a1aef

Browse files
authored
Merge pull request #11 from gselzer/proxy-wrapper
feat: Client-side Proxy wrapper
2 parents 9d6a7aa + a9355d7 commit d6a1aef

File tree

9 files changed

+222
-79
lines changed

9 files changed

+222
-79
lines changed

.pre-commit-config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ repos:
3333
files: "^src/"
3434
additional_dependencies:
3535
- pymmcore_plus
36-
- Pyro5
36+
- Pyro5
37+
- types-cachetools

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ mmcore-remote
4646
Run `mmcore-remote --help` for more options.
4747

4848
Then, on the client side (or in a separate process), connect to the server using
49-
using `pymmcore_remote.MMCoreProxy`. `MMCorePlusProxy` accepts `host` and `port`
49+
using `pymmcore_remote.ClientCMMCorePlus`. `ClientCMMCorePlus` accepts `host` and `port`
5050
arguments that must match the server (if you override the defaults).
5151

5252
```python
53-
from pymmcore_remote import MMCorePlusProxy
53+
from pymmcore_remote import ClientCMMCorePlus
5454

55-
with MMCorePlusProxy() as core:
55+
with ClientCMMCorePlus() as core:
5656
core.loadSystemConfiguration("path/to/config.cfg")
5757
# continue using core as you would with pymmcore_plus.CMMCorePlus
5858
```

examples/remote_mda.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import numpy as np
22
from useq import MDAEvent, MDASequence, TIntervalLoops
33

4-
from pymmcore_remote import MMCorePlusProxy, server_process
4+
from pymmcore_remote import ClientCMMCorePlus, server_process
55

66
PORT = 55999
77

88
# this context manager ensures a server is running, or creates a new one if not.
99
with server_process(port=PORT):
1010
# create a proxy object that communicates with the MMCore object on the server
11-
with MMCorePlusProxy(port=PORT) as core:
11+
with ClientCMMCorePlus(port=PORT) as core:
1212
# continue using core as usual:
1313
core.loadSystemConfiguration()
1414

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ classifiers = [
3333
"Programming Language :: Python :: 3.13",
3434
"Typing :: Typed",
3535
]
36-
dependencies = ["Pyro5", "pymmcore-plus[cli]>=0.14.0", "msgpack", "msgpack-numpy"]
36+
dependencies = ["cachetools", "Pyro5", "pymmcore-plus[cli]>=0.14.0", "msgpack", "msgpack-numpy"]
3737

3838
# https://peps.python.org/pep-0621/#dependencies-optional-dependencies
3939
[dependency-groups]

src/pymmcore_remote/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
__author__ = "Talley Lambert"
1010
__email__ = "[email protected]"
1111

12-
from .client import MMCorePlusProxy
12+
from .client import ClientCMMCorePlus
1313
from .server import serve, server_process
1414

15-
__all__ = ["MMCorePlusProxy", "serve", "server_process"]
15+
__all__ = ["ClientCMMCorePlus", "serve", "server_process"]

src/pymmcore_remote/_serialize.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,11 @@ def to_dict(self, obj: DeviceProperty) -> dict:
128128
}
129129

130130
def from_dict(self, classname: str, d: dict) -> DeviceProperty:
131-
from pymmcore_remote.client import MMCorePlusProxy
131+
from pymmcore_remote.client import ClientCMMCorePlus
132132

133133
# TODO: not sure if this is the best way to get the remote core object
134134
core_uri = d.pop("core_uri")
135-
core = MMCorePlusProxy.instance(core_uri)
135+
core = ClientCMMCorePlus.instance(core_uri)
136136
return DeviceProperty(**d, mmcore=core)
137137

138138

@@ -146,10 +146,10 @@ def to_dict(self, obj: DeviceAdapter) -> dict:
146146
}
147147

148148
def from_dict(self, classname: str, d: dict) -> DeviceAdapter:
149-
from pymmcore_remote.client import MMCorePlusProxy
149+
from pymmcore_remote.client import ClientCMMCorePlus
150150

151151
core_uri = d.pop("core_uri")
152-
core = MMCorePlusProxy.instance(core_uri)
152+
core = ClientCMMCorePlus.instance(core_uri)
153153
return DeviceAdapter(**d, mmcore=core)
154154

155155

@@ -163,10 +163,10 @@ def to_dict(self, obj: Device) -> dict:
163163
}
164164

165165
def from_dict(self, classname: str, d: dict) -> Device:
166-
from pymmcore_remote.client import MMCorePlusProxy
166+
from pymmcore_remote.client import ClientCMMCorePlus
167167

168168
core_uri = d.pop("core_uri")
169-
core = MMCorePlusProxy.instance(core_uri)
169+
core = ClientCMMCorePlus.instance(core_uri)
170170
return Device.create(d["device_label"], mmcore=core)
171171

172172

@@ -188,10 +188,10 @@ def to_dict(self, obj: ConfigGroup) -> dict:
188188
}
189189

190190
def from_dict(self, classname: str, d: dict) -> ConfigGroup:
191-
from pymmcore_remote.client import MMCorePlusProxy
191+
from pymmcore_remote.client import ClientCMMCorePlus
192192

193193
core_uri = d.pop("core_uri")
194-
core = MMCorePlusProxy.instance(core_uri)
194+
core = ClientCMMCorePlus.instance(core_uri)
195195
return ConfigGroup(**d, mmcore=core)
196196

197197

src/pymmcore_remote/client.py

Lines changed: 168 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
from __future__ import annotations
22

33
import threading
4-
from typing import TYPE_CHECKING, Any, ClassVar, cast, overload
4+
from abc import ABC, abstractmethod
5+
from typing import TYPE_CHECKING, Any, ClassVar, Generic, TypeVar, cast, overload
56

67
import Pyro5.api
78
import Pyro5.errors
9+
from cachetools import LRUCache
810
from pymmcore_plus.core.events import CMMCoreSignaler
911
from pymmcore_plus.mda.events import MDASignaler
12+
from typing_extensions import override
1013

1114
from . import server
1215
from ._serialize import register_serializers
@@ -17,14 +20,14 @@
1720
from pymmcore_plus.mda import MDARunner
1821

1922

20-
class MDARunnerProxy(Pyro5.api.Proxy):
23+
class _MDARunnerProxy(Pyro5.api.Proxy):
2124
"""Proxy for MDARunner object on server."""
2225

23-
def __init__(self, mda_runner_uri: Any, cb_thread: _DaemonThread) -> None:
24-
super().__init__(mda_runner_uri)
26+
def __init__(self, uri: Pyro5.api.URI | str, connected_socket: Any = None) -> None:
27+
super().__init__(uri, connected_socket)
2528
events = ClientSideMDASignaler()
2629
object.__setattr__(self, "events", events)
27-
cb_thread.api_daemon.register(events)
30+
_DaemonThread.instance("CallbackDaemon").api_daemon.register(events)
2831
self.connect_client_side_callback(events) # must come after register()
2932

3033
# this is a lie... but it's more useful than -> Self
@@ -33,50 +36,19 @@ def __enter__(self) -> MDARunner:
3336
return super().__enter__() # type: ignore [no-any-return]
3437

3538

36-
class MMCorePlusProxy(Pyro5.api.Proxy):
39+
class _MMCorePlusProxy(Pyro5.api.Proxy):
3740
"""Proxy for CMMCorePlus object on server."""
3841

39-
_mda_runner: MDARunnerProxy
40-
_instances: ClassVar[dict[str, MMCorePlusProxy]] = {}
42+
_instances: ClassVar[dict[str, _MMCorePlusProxy]] = {}
4143

4244
@classmethod
43-
def instance(cls, uri: Pyro5.api.URI | str) -> MMCorePlusProxy:
45+
def instance(cls, uri: Pyro5.api.URI | str) -> _MMCorePlusProxy:
4446
"""Return the instance for the given URI, creating it if necessary."""
4547
if str(uri) not in cls._instances:
4648
cls._instances[str(uri)] = cls(uri)
4749
return cls._instances[str(uri)]
4850

49-
@overload
50-
def __init__(
51-
self,
52-
*,
53-
port: int,
54-
object_id: str | None = None,
55-
host: str | None = None,
56-
connected_socket: Any = None,
57-
) -> None: ...
58-
@overload
59-
def __init__(
60-
self,
61-
uri: Pyro5.api.URI | str,
62-
*,
63-
connected_socket: Any = None,
64-
) -> None: ...
65-
66-
def __init__(
67-
self,
68-
uri: Pyro5.api.URI | str | None = None,
69-
*,
70-
object_id: str | None = None,
71-
host: str | None = None,
72-
port: int | None = None,
73-
connected_socket: Any = None,
74-
) -> None:
75-
if uri is None:
76-
object_id = server.CORE_NAME if object_id is None else object_id
77-
host = server.DEFAULT_HOST if host is None else host
78-
port = server.DEFAULT_PORT if port is None else port
79-
uri = f"PYRO:{object_id}@{host}:{port}"
51+
def __init__(self, uri: Pyro5.api.URI | str, connected_socket: Any = None) -> None:
8052
register_serializers()
8153
super().__init__(uri, connected_socket=connected_socket)
8254
self._instances[str(self._pyroUri)] = self
@@ -95,26 +67,12 @@ def __init__(
9567
# here on the client side
9668
events = ClientSideCMMCoreSignaler()
9769
object.__setattr__(self, "events", events)
98-
# create daemon thread to listen for callbacks/signals coming from the server
70+
# listen for callbacks/signals coming from the server
9971
# and register the callback handler
100-
cb_thread = _DaemonThread(name="CallbackDaemon")
101-
cb_thread.api_daemon.register(events)
72+
_DaemonThread.instance("CallbackDaemon").api_daemon.register(events)
10273
# connect our local callback handler to the server's signaler
10374
self.connect_client_side_callback(events) # must come after register()
10475

105-
# Create a proxy object for the mda_runner as well, passing in the daemon thread
106-
# so it too can receive signals from the server
107-
object.__setattr__(
108-
self, "_mda_runner", MDARunnerProxy(self.get_mda_runner_uri(), cb_thread)
109-
)
110-
# start the callback-handling thread
111-
cb_thread.start()
112-
113-
@property
114-
def mda(self) -> MDARunner:
115-
"""Return the MDARunner proxy."""
116-
return self._mda_runner
117-
11876
# this is a lie... but it's more useful than -> Self
11977
def __enter__(self) -> CMMCorePlus:
12078
"""Use as a context manager."""
@@ -141,7 +99,160 @@ class ClientSideMDASignaler(MDASignaler):
14199

142100

143101
class _DaemonThread(threading.Thread):
144-
def __init__(self, name: str = "DaemonThread"):
102+
_instances: ClassVar[dict[str, _DaemonThread]] = {}
103+
104+
@classmethod
105+
def instance(cls, name: str = "DaemonThread") -> _DaemonThread:
106+
if name not in cls._instances:
107+
cls._instances[name] = cls(name)
108+
return cls._instances[name]
109+
110+
def __init__(self, name: str = "DaemonThread") -> None:
145111
self.api_daemon = Pyro5.api.Daemon()
146112
self._stop_event = threading.Event()
147113
super().__init__(target=self.api_daemon.requestLoop, name=name, daemon=True)
114+
self.start()
115+
116+
117+
PT = TypeVar("PT", bound=Pyro5.api.Proxy)
118+
119+
120+
class ProxyHandler(ABC, Generic[PT]):
121+
"""A wrapper around multiple Pyro proxies.
122+
123+
PyMMCore objects are often used in their own event callbacks. This presents a
124+
problem for Pyro objects, as these callbacks are executed by Pyro worker threads,
125+
which will need ownership over their own proxy. Thus handling PyMMCore object
126+
callbacks requires organized transfer of multiple proxy objects - that is the goal
127+
of this class.
128+
"""
129+
130+
_instances: ClassVar[dict[str, ProxyHandler]] = {}
131+
132+
@property
133+
@abstractmethod
134+
def _proxy_type(self) -> type[PT]:
135+
"""Return the proxy type handled by this class."""
136+
...
137+
138+
@classmethod
139+
def instance(cls, uri: Pyro5.api.URI | str | None = None) -> Any:
140+
"""Return the instance for the given URI, creating it if necessary."""
141+
key = str(uri)
142+
if key not in cls._instances:
143+
cls._instances[key] = cls(uri)
144+
return cls._instances[key]
145+
146+
def __init__(self, uri: Pyro5.api.URI | str, connected_socket: Any = None) -> None:
147+
self._connected_socket = connected_socket
148+
self._uri = uri
149+
# FIXME: There are many reasons why a cache with maximum capacity is a bad idea.
150+
# First, there seems no reasonable maximum size. (Currently it's just a magic
151+
# number). Second, there seems no reasonable eviction policy. LRU could be
152+
# problematic if there are (maxsize) event callbacks. Suppose maxsize=2 - if you
153+
# call snapImage on a CMMCorePlus proxy, and there are two imageSnapped
154+
# callbacks, the second callback would then try to evict the original proxy held
155+
# by the snapImage caller (assuming different Pyro worker threads for each
156+
# callback). MRU might actually be most reasonable in this case...
157+
self._proxy_cache: LRUCache[threading.Thread, PT] = LRUCache(maxsize=4)
158+
self._proxy_lock = threading.Lock()
159+
self._instances[str(self._uri)] = self
160+
161+
def _proxy_attr(self, name: str) -> Any:
162+
"""Retrieves an attribute on the appropriate proxy object for this thread."""
163+
cache = self._proxy_cache
164+
thread = threading.current_thread()
165+
if thread not in cache:
166+
with self._proxy_lock:
167+
if len(cache) < cache.maxsize:
168+
# Cache not full - we can just add a new one
169+
proxy = self._proxy_type(
170+
uri=self._uri, connected_socket=self._connected_socket
171+
)
172+
else:
173+
# Cache full - repurpose lru proxy for the current thread
174+
_lru_thread, proxy = cache.popitem()
175+
proxy._pyroClaimOwnership()
176+
# Insert the new thread-proxy mapping
177+
cache[thread] = proxy
178+
179+
# Delegate the call this thread's proxy
180+
attr = getattr(cache[thread], name)
181+
return attr
182+
183+
# Note this method must exist explicitly to enable context manager behavior
184+
def __enter__(self) -> Any:
185+
"""Use as a context manager."""
186+
return self._proxy_attr("__enter__")()
187+
188+
# Note this method must exist explicitly to enable context manager behavior
189+
def __exit__(
190+
self, exc_type: type | None, exc_value: Exception | None, traceback: str | None
191+
) -> None:
192+
"""Use as a context manager."""
193+
self._proxy_attr("__exit__")(
194+
exc_type=exc_type, exc_value=exc_value, traceback=traceback
195+
)
196+
197+
def __getattr__(self, name: str) -> Any:
198+
"""Delegate to an appropriate MMCorePlusProxy."""
199+
return self._proxy_attr(name)
200+
201+
202+
class ClientMDARunner(ProxyHandler[_MDARunnerProxy]):
203+
"""A handle on a CMMCorePlus instance running outside of this process."""
204+
205+
@property
206+
def _proxy_type(self) -> type[_MDARunnerProxy]:
207+
return _MDARunnerProxy
208+
209+
210+
# TODO: Consider adding CMMCorePlus as supertype
211+
class ClientCMMCorePlus(ProxyHandler[_MMCorePlusProxy]):
212+
"""A handle on a CMMCorePlus instance running outside of this process."""
213+
214+
@overload
215+
def __init__(
216+
self,
217+
*,
218+
port: int,
219+
object_id: str | None = None,
220+
host: str | None = None,
221+
connected_socket: Any = None,
222+
) -> None: ...
223+
@overload
224+
def __init__(
225+
self,
226+
uri: Pyro5.api.URI | str,
227+
*,
228+
connected_socket: Any = None,
229+
) -> None: ...
230+
def __init__(
231+
self,
232+
uri: Pyro5.api.URI | str | None = None,
233+
*,
234+
object_id: str | None = None,
235+
host: str | None = None,
236+
port: int | None = None,
237+
connected_socket: Any = None,
238+
) -> None:
239+
if uri is None:
240+
object_id = server.CORE_NAME if object_id is None else object_id
241+
host = server.DEFAULT_HOST if host is None else host
242+
port = server.DEFAULT_PORT if port is None else port
243+
uri = f"PYRO:{object_id}@{host}:{port}"
244+
super().__init__(uri=uri, connected_socket=connected_socket)
245+
246+
# Create a proxy handler for the mda runner so it too can receive server signals
247+
self.mda = ClientMDARunner(uri=self.get_mda_runner_uri())
248+
249+
@property
250+
def _proxy_type(self) -> type[_MMCorePlusProxy]:
251+
return _MMCorePlusProxy
252+
253+
# Overridden to provide a nice (although tehcnically wrong) type hint :)
254+
@override
255+
def __enter__(self) -> CMMCorePlus:
256+
"""Use as a context manager."""
257+
super().__enter__()
258+
return cast("CMMCorePlus", self)

0 commit comments

Comments
 (0)