Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions lgl_android_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,19 +606,23 @@ def configure_server(conn: ADBConnect,
profile_dir: The desired output directory path for timeline. Existing
files in the directory may be overwritten.
'''
verbose = False

# Create a server instance
instance = server.CommsServer(0)
instance = server.CommsServer(0, verbose)

if timeline_file:
# Import late to avoid pulling in transitive deps when unused
from lglpy.comms import service_gpu_timeline
service_tl = service_gpu_timeline.GPUTimelineService(timeline_file)
service_tl = service_gpu_timeline.GPUTimelineService(
timeline_file, verbose)
instance.register_endpoint(service_tl)

if profile_dir:
# Import late to avoid pulling in transitive deps when unused
from lglpy.comms import service_gpu_profile
service_prof = service_gpu_profile.GPUProfileService(profile_dir)
service_prof = service_gpu_profile.GPUProfileService(
profile_dir, verbose)
instance.register_endpoint(service_prof)

# Start it running
Expand Down
115 changes: 63 additions & 52 deletions lglpy/comms/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,23 @@
# -----------------------------------------------------------------------------

'''
This module implements the server-side communications module that can accept
client connections from a layer driver, and dispatch messages to registered
service handler in the server.

This module currently only accepts a single connection at a time and message
handling is synchronous inside the server. It is therefore not possible to
implement pseudo-host-driven event loops if the layer is using multiple
services concurrently - this needs threads per service.
This module implements the server-side of the communications module that can
accept connections from client layer drivers running on the device. The
protocol is service-based, and the server will dispatch messages to the
registered service handler for each message channel.

The server is multi-threaded, allowing multiple layers to concurrently access
networked services provided by host-side implementations. However, within each
client connection messages are handled synchronously by a single worker thread.
It is therefore not possible to implement pseudo-host-driven event loops if a
layer is using multiple services concurrently - this needs threads per service
endpoint which is not yet implemented.
'''

import enum
import socket
import struct
import threading
from typing import Any, Optional


Expand Down Expand Up @@ -143,7 +147,7 @@ class CommsServer:
Class listening for client connection from a layer and handling messages.

This implementation is designed to run in a thread, so has a run method
that will setup and listen on the server socket.q
that will setup and listen on the server socket.

This implementation only handles a single layer connection at a time, but
can handle multiple connections serially without restarting.
Expand Down Expand Up @@ -173,7 +177,6 @@ def __init__(self, port: int, verbose: bool = False):
self.register_endpoint(self)

self.shutdown = False
self.sockd = None # type: Optional[socket.socket]

self.sockl = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sockl.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Expand All @@ -185,6 +188,9 @@ def __init__(self, port: int, verbose: bool = False):
# Work out which port was assigned if not user-defined
self.port = self.sockl.getsockname()[1]

# Pool of worker threads
self.workers: list[threading.Thread] = []

def register_endpoint(self, endpoint: Any) -> int:
'''
Register a new service endpoint with the server.
Expand Down Expand Up @@ -235,55 +241,60 @@ def run(self) -> None:
if self.verbose:
print('Waiting for client connection')
try:
self.sockd, _ = self.sockl.accept()
# Wait for a new client connection
sockd, _ = self.sockl.accept()
if self.verbose:
print(' + Client connected')

self.run_client()
# Spawn a worker thread for this client
thread = threading.Thread(target=self.run_client, args=(sockd,))
self.workers.append(thread)
thread.start()

if self.verbose:
print(' + Client disconnected')
self.sockd.close()
self.sockd = None

except ClientDropped:
if self.verbose:
print(' + Client disconnected')
if self.sockd:
self.sockd.close()
self.sockd = None
# Release old worker resources if they have completed
self.workers = [x for x in self.workers if x.is_alive()]

except OSError:
continue

self.sockl.close()

def run_client(self) -> None:
def run_client(self, sockd: socket.socket) -> None:
'''
Enter client message handler run loop.

Raises:
ClientDropped: The client disconnected from the socket.
'''
while not self.shutdown:
# Read the header
data = self.receive_data(Message.HEADER_LEN)
message = Message(data)

# Read the payload if there is one
if message.payload_size:
data = self.receive_data(message.payload_size)
message.add_payload(data)

# Dispatch to a service handler
endpoint = self.endpoints[message.endpoint_id]
response = endpoint.handle_message(message)
try:
while not self.shutdown:
# Read the header
data = self.receive_data(sockd, Message.HEADER_LEN)
message = Message(data)

# Read the payload if there is one
if message.payload_size:
data = self.receive_data(sockd, message.payload_size)
message.add_payload(data)

# Dispatch to a service handler
endpoint = self.endpoints[message.endpoint_id]
response = endpoint.handle_message(message)

# Send a response for all TX_RX messages
if message.message_type == MessageType.TX_RX:
header = Response(message, response)
self.send_data(sockd, header.get_header())
self.send_data(sockd, response)

except ClientDropped:
pass

finally:
if self.verbose:
print(' + Client disconnected')

# Send a response for all TX_RX messages
if message.message_type == MessageType.TX_RX:
header = Response(message, response)
self.send_data(header.get_header())
self.send_data(response)
sockd.close()

def stop(self) -> None:
'''
Expand All @@ -294,14 +305,16 @@ def stop(self) -> None:
if self.sockl is not None:
self.sockl.close()

if self.sockd is not None:
self.sockd.shutdown(socket.SHUT_RDWR)
for worker in self.workers:
worker.join()

def receive_data(self, size: int) -> bytes:
@staticmethod
def receive_data(sockd: socket.socket, size: int) -> bytes:
'''
Fetch a fixed size packet from the socket.

Args:
sockd: The data socket.
size: The length of the packet in bytes.

Returns:
Expand All @@ -310,31 +323,29 @@ def receive_data(self, size: int) -> bytes:
Raises:
ClientDropped: The client disconnected from the socket.
'''
assert self.sockd is not None

data = b''
while len(data) < size:
new_data = self.sockd.recv(size - len(data))
new_data = sockd.recv(size - len(data))
if not new_data:
raise ClientDropped()
data = data + new_data

return data

def send_data(self, data: bytes) -> None:
@staticmethod
def send_data(sockd: socket.socket, data: bytes) -> None:
'''
Send a fixed size packet to the socket.

Args:
sockd: The data socket.
data: The binary data to send.

Raises:
ClientDropped: The client disconnected from the socket.
'''
assert self.sockd is not None

while len(data):
sent_bytes = self.sockd.send(data)
sent_bytes = sockd.send(data)
if not sent_bytes:
raise ClientDropped()
data = data[sent_bytes:]
9 changes: 5 additions & 4 deletions lglpy/comms/service_gpu_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import enum
import json
import os
from typing import Any, Optional, TypedDict, Union
from typing import Optional, TypedDict, Union

from lglpy.comms.server import Message

Expand Down Expand Up @@ -89,6 +89,7 @@ def __init__(self, dir_path: str, verbose: bool = False):
dir_path: Directory to write on the filesystem
verbose: Should this use verbose logging?
'''
del verbose
self.base_dir = dir_path

# Sample mode is detected on the fly when we get our first data
Expand Down Expand Up @@ -137,7 +138,7 @@ def handle_end_frame(self, message: EndFrameMessage):
# Emit the CSV file
print(f'Generating CSV for frame {self.frame_id}')
path = os.path.join(self.base_dir, f'frame_{self.frame_id:05d}.csv')
with open(path, 'w', newline='') as handle:
with open(path, 'w', newline='', encoding='utf-8') as handle:
writer = csv.writer(handle)
writer.writerow(self.table_header)
writer.writerows(self.table_data)
Expand Down Expand Up @@ -249,8 +250,8 @@ def handle_frame_sample(self, message: FrameMessage):
self.create_frame_data(message)

print(f'Updating CSV for frame {self.frame_id}')
path = os.path.join(self.base_dir, f'capture.csv')
with open(path, 'w', newline='') as handle:
path = os.path.join(self.base_dir, 'capture.csv')
with open(path, 'w', newline='', encoding='utf-8') as handle:
writer = csv.writer(handle)
writer.writerow(self.table_header)
writer.writerows(self.table_data)
Expand Down
Loading