Skip to content

Commit b116642

Browse files
committed
perf(storage): use zmq for storage write
Replace RPC calls with ZeroMQ messages for a more efficient transfer of image chunks. Signed-off-by: Cedric Hombourger <[email protected]>
1 parent 6fb486f commit b116642

File tree

5 files changed

+110
-102
lines changed

5 files changed

+110
-102
lines changed

docs/config.rst

+4
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ General settings
9898
Remote port to connect to in order to get console messages (defaults to
9999
``5557``).
100100

101+
* ``data``: integer [optional]
102+
Remote port for data transfers between the client and agent (defaults to
103+
``0`` for a dynamic port assignment).
104+
101105
* ``host``: string [optional]
102106
Remote host name or ip to connect to as a client to interact with the
103107
MTDA agent (defaults to ``localhost``).

mtda/client.py

+18-20
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import subprocess
1818
import tempfile
1919
import time
20+
import zmq
2021
import zstandard as zstd
2122

2223
from mtda.main import MultiTenantDeviceAccess
@@ -44,6 +45,7 @@ def __init__(self, host=None, session=None, config_files=None,
4445
else:
4546
self._impl = agent
4647
self._agent = agent
48+
self._data = None
4749

4850
if session is None:
4951
HOST = socket.gethostname()
@@ -115,7 +117,12 @@ def storage_open(self):
115117
while tries > 0:
116118
tries = tries - 1
117119
try:
118-
self._impl.storage_open(self._session)
120+
host = self.remote()
121+
port = self._impl.storage_open(self._session)
122+
context = zmq.Context()
123+
socket = context.socket(zmq.PUSH)
124+
socket.connect(f'tcp://{host}:{port}')
125+
self._data = socket
119126
return
120127
except Exception:
121128
if tries > 0:
@@ -201,7 +208,7 @@ def storage_write_image(self, path, callback=None):
201208

202209
try:
203210
# Prepare for download/copy
204-
file.prepare(image_size)
211+
file.prepare(self._data, image_size)
205212

206213
# Copy image to shared storage
207214
file.copy()
@@ -298,6 +305,7 @@ def __init__(self, path, agent, session, blksz, callback=None):
298305
self._path = path
299306
self._session = session
300307
self._totalread = 0
308+
self._totalsent = 0
301309

302310
def bmap(self, path):
303311
return None
@@ -324,21 +332,25 @@ def flush(self):
324332
inputsize = self._inputsize
325333
totalread = self._totalread
326334
outputsize = self._outputsize
335+
336+
agent.storage_flush(self._totalsent)
327337
while True:
328-
status, writing, written = agent.storage_status(self._session)
338+
status, writing, written = agent.storage_status()
329339
if callback is not None:
330340
callback(imgname, totalread, inputsize, written, outputsize)
331341
if writing is False:
332342
break
333343
time.sleep(0.5)
344+
self._socket.close()
334345

335346
def path(self):
336347
return self._path
337348

338-
def prepare(self, output_size=None, compression=None):
349+
def prepare(self, socket, output_size=None, compression=None):
339350
compr = self.compression() if compression is None else compression
340351
self._inputsize = self.size()
341352
self._outputsize = output_size
353+
self._socket = socket
342354
# if image is uncompressed, we compress on the fly
343355
if compr == CONSTS.IMAGE.RAW.value:
344356
compr = CONSTS.IMAGE.ZST.value
@@ -362,22 +374,8 @@ def size(self):
362374
return None
363375

364376
def _write_to_storage(self, data):
365-
max_tries = int(CONSTS.STORAGE.TIMEOUT / CONSTS.STORAGE.RETRY_INTERVAL)
366-
367-
for _ in range(max_tries):
368-
result = self._agent.storage_write(data, self._session)
369-
if result != 0:
370-
break
371-
time.sleep(CONSTS.STORAGE.RETRY_INTERVAL)
372-
373-
if result > 0:
374-
return result
375-
elif result < 0:
376-
exc = 'write or decompression error from shared storage'
377-
raise IOError(exc)
378-
else:
379-
exc = 'timeout from shared storage'
380-
raise IOError(exc)
377+
self._socket.send(data)
378+
self._totalsent += len(data)
381379

382380

383381
class ImageLocal(ImageFile):

mtda/constants.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ class STORAGE:
7777

7878

7979
class WRITER:
80-
QUEUE_SLOTS = 16
81-
QUEUE_TIMEOUT = 5
80+
RECV_TIMEOUT = 5
8281
READ_SIZE = 1*1024*1024
8382
WRITE_SIZE = 1*1024*1024

mtda/main.py

+22-40
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def __init__(self):
8484
self.usb_switches = []
8585
self.ctrlport = 5556
8686
self.conport = 5557
87+
self.dataport = 0
8788
self.prefix_key = self._prefix_key_code(DEFAULT_PREFIX_KEY)
8889
self.is_remote = False
8990
self.is_server = False
@@ -661,7 +662,20 @@ def storage_close(self, session=None):
661662
if self.storage is not None:
662663
self.storage_locked()
663664

664-
self.mtda.debug(3, f"main.storage_close(): {str(result)}")
665+
self.mtda.debug(3, f"main.storage_close(): {result}")
666+
return result
667+
668+
@Pyro4.expose
669+
def storage_flush(self, size, session=None):
670+
self.mtda.debug(3, "main.storage_flush()")
671+
672+
self.session_ping(session)
673+
if self.storage is None:
674+
result = False
675+
else:
676+
result = self._writer.flush(size)
677+
678+
self.mtda.debug(3, f"main.storage_flush(): {result}")
665679
return result
666680

667681
@Pyro4.expose
@@ -790,6 +804,7 @@ def storage_open(self, session=None):
790804

791805
self.session_ping(session)
792806
owner = self._storage_owner
807+
result = None
793808
status, _, _ = self.storage_status()
794809

795810
if self.storage is None:
@@ -802,12 +817,13 @@ def storage_open(self, session=None):
802817
self.storage.open()
803818
self._storage_opened = True
804819
self._storage_owner = session
805-
self._writer.start()
820+
result = self._writer.start(session)
806821
self._storage_event(CONSTS.STORAGE.OPENED, session)
807822
if self.storage is not None:
808823
self.storage_locked()
809824

810-
self.mtda.debug(3, 'main.storage_open(): success')
825+
self.mtda.debug(3, f'main.storage_open(): {result}')
826+
return result
811827

812828
@Pyro4.expose
813829
def storage_status(self, session=None):
@@ -877,42 +893,6 @@ def storage_swap(self, session=None):
877893
self.mtda.debug(3, f"main.storage_swap(): {str(result)}")
878894
return result
879895

880-
@Pyro4.expose
881-
def storage_write(self, data, session=None):
882-
self.mtda.debug(3, "main.storage_write()")
883-
884-
self.session_ping(session)
885-
if self.storage is None:
886-
raise RuntimeError('no shared storage')
887-
elif self._storage_opened is False:
888-
raise RuntimeError('shared storage was not opened')
889-
elif self._writer.failed is True:
890-
raise RuntimeError('write or decompression error '
891-
'from shared storage')
892-
elif session != self._storage_owner:
893-
raise RuntimeError('shared storage in use')
894-
895-
import queue
896-
try:
897-
if len(data) == 0:
898-
self.mtda.debug(2, "main.storage_write(): "
899-
"using queued data")
900-
data = self._writer_data
901-
self._writer_data = data
902-
self._writer.put(data, timeout=10)
903-
result = self.blksz
904-
except queue.Full:
905-
self.mtda.debug(2, "main.storage_write(): "
906-
"queue is full")
907-
result = 0
908-
909-
if self._writer.failed is True:
910-
self.error('storage_write failed: write or decompression error')
911-
result = -1
912-
913-
self.mtda.debug(3, f"main.storage_write(): {str(result)}")
914-
return result
915-
916896
def systemd_configure(self):
917897
from filecmp import dircmp
918898

@@ -1467,7 +1447,7 @@ def post_configure_storage(self, storage, config, parser):
14671447
self.mtda.debug(3, "main.post_configure_storage()")
14681448

14691449
from mtda.storage.writer import AsyncImageWriter
1470-
self._writer = AsyncImageWriter(self, storage)
1450+
self._writer = AsyncImageWriter(self, storage, self.dataport)
14711451

14721452
import atexit
14731453
atexit.register(self.storage_close)
@@ -1479,6 +1459,8 @@ def load_remote_config(self, parser):
14791459
parser.get('remote', 'console', fallback=self.conport))
14801460
self.ctrlport = int(
14811461
parser.get('remote', 'control', fallback=self.ctrlport))
1462+
self.dataport = int(
1463+
parser.get('remote', 'data', fallback=self.dataport))
14821464
if self.is_server is False:
14831465
if self.remote is None:
14841466
# Load remote setting from the configuration

0 commit comments

Comments
 (0)