Skip to content

Commit ee987f5

Browse files
committed
first commit
0 parents  commit ee987f5

File tree

6 files changed

+379
-0
lines changed

6 files changed

+379
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/Pipfile.lock

Pipfile

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[[source]]
2+
url = "https://pypi.org/simple"
3+
verify_ssl = true
4+
name = "pypi"
5+
6+
[packages]
7+
eventlet = "*"
8+
9+
[dev-packages]
10+
ipython = "*"
11+
12+
[requires]
13+
python_version = "3.11"
14+
python_full_version = "3.11.6"

client.py

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import eventlet
2+
from duplex import Duplex
3+
from tools import *
4+
eventlet_routine()
5+
6+
7+
def main():
8+
duplex = Duplex.connect_to()
9+
duplex.send(b"Just connected to you client !")
10+
i = 0
11+
12+
while True:
13+
try:
14+
msg = duplex.receive(1)
15+
if msg:
16+
print(f"[CLIENT] Received: {msg.decode()}")
17+
except eventlet.queue.Empty as e:
18+
pass
19+
20+
msg = f"Hello {i} from client."
21+
duplex.send(msg.encode())
22+
print("[CLIENT] Sent: ", msg)
23+
24+
if i > 6:
25+
print("[CLIENT] Closing...")
26+
duplex.close()
27+
eventlet.sleep()
28+
print("[CLIENT] Closed.")
29+
break
30+
31+
i += 1
32+
eventlet.sleep()
33+
34+
print("[CLIENT] Done.")
35+
36+
37+
if __name__ == "__main__":
38+
main()

duplex.py

+245
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
import re
2+
import socket
3+
import logging
4+
import eventlet
5+
6+
from tools import *
7+
8+
log = configure_logging('Duplex')
9+
10+
11+
class DuplexAPI:
12+
def __new__(cls):
13+
eventlet_routine()
14+
return object.__new__(cls)
15+
16+
def __init__(self):
17+
self._job = None
18+
self._sock = None
19+
self._regex = None
20+
self._running = None
21+
self._last_exc = None
22+
self._buffer = bytearray()
23+
24+
self._pool = eventlet.GreenPool()
25+
self._recv_queue = eventlet.Queue()
26+
self._send_queue = eventlet.Queue()
27+
28+
if type(self) is DuplexAPI:
29+
raise NotImplementedError("DuplexAPI is not meant to be instanciated, use the Duplex class.")
30+
31+
def _pack_msg(self, msg: bytes):
32+
return b'%START%' + msg + b'%STOP%'
33+
34+
def _unpack_msg(self, data: bytes):
35+
if self._regex is None:
36+
self._regex = re.compile(b'%START%(.*?)%STOP%', flags=re.DOTALL)
37+
38+
messages = self._regex.findall(data)
39+
remainder = self._regex.sub(b'', data)
40+
return messages, remainder
41+
42+
def _reset_buffer(self, data: bytes):
43+
self._buffer.clear()
44+
self._buffer.extend(data)
45+
46+
def _recv_to_buffer(self):
47+
while True:
48+
try:
49+
chunk = self._sock.recv(4096)
50+
except socket.error:
51+
# Non-blocking socket, no data available.
52+
break
53+
54+
if chunk:
55+
self._buffer.extend(chunk)
56+
else:
57+
self._notify_disconnected()
58+
break
59+
60+
def _recv_msg(self):
61+
self._recv_to_buffer()
62+
messages, remainder = self._unpack_msg(self._buffer)
63+
self._reset_buffer(remainder)
64+
return messages
65+
66+
def _send_msg(self, msg: bytes):
67+
packet = self._pack_msg(msg)
68+
self._sock.sendall(packet)
69+
70+
def _send_items_in_queue(self):
71+
if not self._send_queue.empty():
72+
try:
73+
packet = self._send_queue.get(timeout=2)
74+
except eventlet.queue.Empty:
75+
log.warning("Couldn't fetch item from queue.")
76+
else:
77+
self._send_msg(packet)
78+
79+
def _receive_and_put_in_queue(self):
80+
for msg in self._recv_msg():
81+
try:
82+
self._recv_queue.put(msg, timeout=2)
83+
except eventlet.queue.Full:
84+
log.warning(f"Couldn't put item in queue.")
85+
86+
def _loop_running(self):
87+
return self._running is True
88+
89+
def _stop_loop(self):
90+
self._running = False
91+
92+
def _exchange_messages(self):
93+
while self._loop_running():
94+
self._send_items_in_queue()
95+
self._receive_and_put_in_queue()
96+
eventlet.sleep()
97+
98+
def _notify_disconnected(self):
99+
log.debug("Peer disconnected.")
100+
self.stop(exception=socket.error("Other side disconnected."))
101+
102+
def _kill_job(self):
103+
if self._job is not None:
104+
self._job.kill()
105+
if self._sock is not None:
106+
self._sock.close()
107+
108+
def _handle_exception(self, raise_exc: bool = True):
109+
if self._last_exc is None:
110+
return
111+
112+
last_exc = self._last_exc
113+
114+
if raise_exc:
115+
self._last_exc = None
116+
raise last_exc
117+
else:
118+
log.exception("Duplex stopped due to an exception.", exc_info=last_exc)
119+
120+
def _get(self, block: bool = True, timeout: int | None = None):
121+
msg = self._recv_queue.get(block=block, timeout=timeout)
122+
return msg
123+
124+
def _get_safe(self):
125+
while True:
126+
try:
127+
msg = self._get(block=False, timeout=1)
128+
return msg
129+
except eventlet.queue.Empty:
130+
pass
131+
132+
if not self._loop_running():
133+
log.error("Can't receive message, duplex interrupted.")
134+
return b''
135+
136+
eventlet.sleep()
137+
138+
def _put(self, *args, **kwargs):
139+
self._send_queue.put(*args, **kwargs)
140+
141+
def __del__(self):
142+
self.close(raise_exc=False)
143+
144+
145+
class Duplex(DuplexAPI):
146+
147+
def listen(self, port: int, *, host: str):
148+
"""Wait for a connection on the given port and host."""
149+
server = socket.create_server((host, port))
150+
server.listen(0)
151+
log.info(f"Listening on %s:%d.", host, port)
152+
153+
self._sock, peer = server.accept()
154+
self._sock.setblocking(False)
155+
log.info(f"Connection from %s accepted.", peer[0])
156+
157+
def connect(self, host: str, port: int):
158+
"""Connect to the given host and port."""
159+
self._sock = socket.create_connection((host, port))
160+
self._sock.setblocking(False)
161+
log.info(f"Connected to %s:%d.", host, port)
162+
163+
def start(self):
164+
"""Start the message exchange loop."""
165+
self._running = True
166+
self._job = self._pool.spawn(self._exchange_messages)
167+
log.debug("Message exchange job started.")
168+
169+
def stop(self, exception: Exception | None = None):
170+
"""Stop the message exchange loop."""
171+
if self._loop_running():
172+
self._stop_loop()
173+
log.debug("Message exchange loop stopped.")
174+
175+
if exception is not None:
176+
self._last_exc = exception
177+
178+
def close(self, raise_exc: bool = True):
179+
"""Wait for everything to be closed, raise an exception if there was an error by default."""
180+
if self._loop_running():
181+
self.stop()
182+
183+
self._kill_job()
184+
self._pool.waitall()
185+
self._handle_exception(raise_exc)
186+
187+
def receive(self, timeout: int | None = None):
188+
"""
189+
Wait for an incoming message.
190+
If `timeout` is 0, return directly with a message if available or raise `eventlet.queue.Empty`.
191+
If `timeout` is a positive integer, wait that many seconds for a message, before raising `eventlet.queue.Empty`.
192+
If `timeout` is None or not specified, block until a message is received. This method *will* return early in case of error.
193+
"""
194+
if not self._loop_running():
195+
self.close()
196+
197+
if timeout is None:
198+
return self._get_safe()
199+
return self._get(block=True, timeout=timeout)
200+
201+
def send(self, msg: bytes, notify_cb: callable = None):
202+
"""Put a message in the send queue."""
203+
if not self._loop_running():
204+
self.close()
205+
206+
self._put(msg)
207+
208+
def set_logging_level(self, level: int):
209+
log.setLevel(level)
210+
211+
@classmethod
212+
def connect_to(cls, host: str = '127.0.0.1', port: int = 8765, verbose: bool = False):
213+
"""Create a Duplex client connected to the given host and port."""
214+
duplex = cls()
215+
if verbose: duplex.set_logging_level(logging.DEBUG)
216+
duplex.connect(host, port)
217+
duplex.start()
218+
return duplex
219+
220+
@classmethod
221+
def listen_on(cls, host: str = '0.0.0.0', port: int = 8765, verbose: bool = False):
222+
"""Create a Duplex server listening on the given host and port."""
223+
duplex = cls()
224+
if verbose: duplex.set_logging_level(logging.DEBUG)
225+
duplex.listen(port, host=host)
226+
duplex.start()
227+
return duplex
228+
229+
230+
@classmethod
231+
@make_awaitable
232+
def listen_wait(cls, host: str = '0.0.0.0', port: int = 8765, verbose: bool = False):
233+
"""Create a Duplex server and wait for a connection on given host and port."""
234+
return cls.listen_on(host, port, verbose)
235+
236+
@make_awaitable
237+
def wait_for_message(self, timeout: int | None = None):
238+
"""
239+
Wait for an incoming message.
240+
If `timeout` is 0, return directly with a message if available or raise `eventlet.queue.Empty`.
241+
If `timeout` is a positive integer, wait that many seconds for a message, before raising `eventlet.queue.Empty`.
242+
If `timeout` is None or not specified, block until a message is received. This method *will* still return early in case of error.
243+
"""
244+
return self.receive(timeout=timeout)
245+

server.py

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from tools import *
2+
eventlet_routine()
3+
4+
import asyncio
5+
import eventlet
6+
from duplex import Duplex
7+
8+
9+
def print_message(msg):
10+
print(f">>> {msg}")
11+
12+
async def wait_for_messages(duplex: Duplex, callback: callable):
13+
while True:
14+
print("[SERVER] Waiting for message...")
15+
msg = await duplex.wait_for_message()
16+
17+
if msg:
18+
callback(msg)
19+
elif not msg:
20+
print("[SERVER] No message received.")
21+
break
22+
elif msg == b"STOP":
23+
print("[SERVER] Received stop message.")
24+
break
25+
26+
await asyncio.sleep(0.2)
27+
28+
print(f"[SERVER] Closing.")
29+
30+
async def main():
31+
print("[SERVER] Starting server...")
32+
duplex = await Duplex.listen_wait()
33+
34+
job = asyncio.create_task(wait_for_messages(duplex, print_message))
35+
res = await asyncio.gather(job, return_exceptions=True)
36+
print(f'------\n[SERVER] Result: {res}\n-----')
37+
38+
39+
if __name__ == "__main__":
40+
asyncio.run(main())

tools.py

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import socket
2+
import logging
3+
import eventlet
4+
import asyncio
5+
from functools import wraps, partial
6+
from eventlet import debug as eventlet_debug
7+
8+
log = logging.getLogger('Duplex.misc')
9+
10+
11+
def eventlet_routine():
12+
"""Patch imports and disable multiple readers warnings."""
13+
log.debug("> Entering eventlet routine.")
14+
15+
if not eventlet.patcher.is_monkey_patched(socket):
16+
log.debug("Patching imports...")
17+
eventlet.monkey_patch()
18+
eventlet_debug.hub_prevent_multiple_readers(False)
19+
else:
20+
log.debug("All modules patched.")
21+
22+
23+
def configure_logging(name: str, level: int = logging.INFO):
24+
"""Create a logger instance with provided name and configure it."""
25+
logger = logging.getLogger(name)
26+
logger.setLevel(level)
27+
formatter = logging.Formatter('[%(levelname)s] %(name)s - %(message)s')
28+
handler = logging.StreamHandler()
29+
handler.setFormatter(formatter)
30+
logger.addHandler(handler)
31+
return logger
32+
33+
34+
def make_awaitable(func: callable):
35+
@wraps(func)
36+
async def run(*args, loop=None, executor=None, **kwargs):
37+
if loop is None:
38+
loop = asyncio.get_event_loop()
39+
pfunc = partial(func, *args, **kwargs)
40+
return await loop.run_in_executor(executor, pfunc)
41+
return run

0 commit comments

Comments
 (0)