Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): make update also use zmq #458

Merged
merged 1 commit into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 16 additions & 19 deletions mtda/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,16 @@ def storage_network(self, remote):
subprocess.check_call(cmd)

def storage_open(self, size=0, **kwargs):
session = kwargs.get('session', None)
session = kwargs.get('session', self._session)
port = self._impl.storage_open(size, session=session)
return self._storage_socket(port)

def _storage_socket(self, port):
tries = 60
while tries > 0:
tries = tries - 1
try:
host = self.remote()
port = self._impl.storage_open(size, session=session)
context = zmq.Context()
socket = context.socket(zmq.PUSH)
hwm = int(
Expand All @@ -149,30 +152,25 @@ def storage_open(self, size=0, **kwargs):
else:
raise

def storage_update(self, dest, src=None, callback=None):
def storage_update(self, dest, src=None, callback=None, **kwargs):
session = kwargs.get('session', self._session)

path = dest if src is None else src
try:
st = os.stat(path)
image_size = st.st_size
except FileNotFoundError:
return False

status = self._impl.storage_update(dest, 0, self._session)
if status is False:
return False
st = os.stat(path)
size = st.st_size

port = self._impl.storage_update(dest, size, session=session)
self._storage_socket(port)

blksz = self._agent.blksz
impl = self._impl
session = self._session

# Get file handler from specified path
file = ImageFile.new(path, impl, session, blksz, callback)

# Open the shared storage device so we own it
self.storage_open(image_size)

try:
# Prepare for download/copy
file.prepare(image_size, CONSTS.IMAGE.RAW.value)
file.prepare(self._data, size)

# Copy image to shared storage
file.copy()
Expand All @@ -181,11 +179,10 @@ def storage_update(self, dest, src=None, callback=None):
file.flush()

except Exception:
return False
raise
finally:
# Storage may be closed now
self.storage_close()
return True

def storage_write_image(self, path, callback=None):
blksz = self._agent.blksz
Expand Down
25 changes: 16 additions & 9 deletions mtda/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,21 +796,26 @@ def storage_mount(self, part=None, **kwargs):
return result

@Pyro4.expose
def storage_update(self, dst, offset, **kwargs):
def storage_update(self, dst, size, stream=None, **kwargs):
self.mtda.debug(3, "main.storage_update()")

result = False
session = kwargs.get("session", None)
self.session_ping(session)
if self.storage is None:
self.mtda.debug(4, "storage_update(): no shared storage device")
raise RuntimeError('no shared storage device')
elif self.storage_locked(session) is True:
raise RuntimeError('shared storage in use')
elif self.storage.is_storage_mounted is False:
raise RuntimeError("not mounted")
else:
try:
result = self.storage.update(dst, offset)
self.storage.update(dst)
result = self._storage_socket(session, size, stream)
except (FileNotFoundError, IOError) as e:
self.mtda.debug(1, f"main.storage_update(): {str(e.args[0])}")

self.mtda.debug(3, f"main.storage_update(): {str(result)}")
self.mtda.debug(3, f"main.storage_update(): {result}")
return result

@Pyro4.expose
Expand Down Expand Up @@ -880,15 +885,17 @@ def storage_open(self, size=0, stream=None, **kwargs):
self._storage_event(CONSTS.STORAGE.OPENED, session)
except Exception:
raise RuntimeError('shared storage could not be opened!')

if stream is None:
from mtda.storage.datastream import NetworkDataStream
stream = NetworkDataStream(self.dataport)
result = self._writer.start(session, size, stream)
result = self._storage_socket(session, size, stream)

self.mtda.debug(3, f'main.storage_open(): {result}')
return result

def _storage_socket(self, session, size, stream=None):
if stream is None:
from mtda.storage.datastream import NetworkDataStream
stream = NetworkDataStream(self.dataport)
return self._writer.start(session, size, stream)

@Pyro4.expose
def storage_status(self, **kwargs):
self.mtda.debug(3, "main.storage_status()")
Expand Down
2 changes: 1 addition & 1 deletion mtda/storage/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def status(self):
return self._mode

def update(self, dst, offset):
return False
raise RuntimeError('update not supported for docker')

def write(self, data):
self.mtda.debug(3, "storage.docker.write()")
Expand Down
21 changes: 4 additions & 17 deletions mtda/storage/helpers/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,29 +342,16 @@ def _locate(self, dst):
self.mtda.debug(3, f"storage.helpers.image._locate(): {str(result)}")
return result

def update(self, dst, offset):
def update(self, dst):
self.mtda.debug(3, "storage.helpers.image.update()")

with self.lock:
result = False
if self.handle is None:
path = self._locate(dst)
if path is not None:
mode = "ab" if offset > 0 else "wb"
self.handle = open(path, mode)
self.handle.seek(offset)
result = True
else:
self.mtda.debug(1, "storage.helpers.image.update(): "
"%s could not be found!" % str(dst))
raise FileNotFoundError(dst + " could not be found!")
self.handle = open(path, "wb")
else:
self.mtda.debug(1, "storage.helpers.image.update(): "
"shared storage already opened!")

self.mtda.debug(3, "storage.helpers.image."
f"update(): {str(result)}")
return result
raise RuntimeError("already opened")
self.mtda.debug(3, "storage.helpers.image.update(): exit")

def write(self, data):
self.mtda.debug(3, "storage.helpers.image.write()")
Expand Down