Non blocking read of input from the terminal #11448
Replies: 4 comments 5 replies
-
If I run
This suggests that read(1) does in fact reads 1 utf-8 character, which can consist of arbitrarily many bytes (I think up to 4 or 6 atm), so is a wrong thing to use here.
Proposed fix: use |
Beta Was this translation helpful? Give feedback.
-
Much more complicated version of the example above, that should buffer incomplete utf-8 reads instead of blocking on those, and avoid reads after poll hup/err events: import sys, select, time
# NOTE: This code does not work under Thonny because
# in Thonny input is sent to program only after [Enter] is clicked.
# NOTE: Do not run this via "mpremote run ..." either, as it does not connect stdin
class TermReader:
def __init__(self, byte_stream, buffer_bytes=100):
self.stream, self.poller = byte_stream, select.poll()
self.rb, self.rb_n, self.rb_len = bytearray(buffer_bytes), 0, buffer_bytes
self.poller.register(self.stream, select.POLLIN)
def rb_decode(self, a, b, max_char_len=6):
'Returns decoded ring-buffer contents from a to b, and a-offset for next call'
buff = self.rb[a:b] if a < b else self.rb[a:] + self.rb[:b]
for n in range(max_char_len):
try: result = buff[:-n or len(buff)].decode()
except UnicodeError: pass
else: return result, (self.rb_len + (b - n)) % self.rb_len
else:
if len(buff) > max_char_len: raise UnicodeError('Non-UTF-8 stream data')
return '', a
def read(self):
n0, text = self.rb_n, list()
poll_err = select.POLLHUP | select.POLLERR
while ev := self.poller.poll(0):
if ev[0][1] & poll_err or not (byte := self.stream.read(1)): break
self.rb[self.rb_n] = byte[0]
self.rb_n = (self.rb_n + 1) % self.rb_len
if self.rb_n == n0:
chunk, n0 = self.rb_decode(n0, self.rb_n)
text.append(chunk)
if self.rb_n != n0:
chunk, self.rb_n = self.rb_decode(n0, self.rb_n)
text.append(chunk)
return ''.join(text)
term_reader = TermReader(sys.stdin.buffer)
while True:
time.sleep(3)
text = term_reader.read()
print(f'Input during last 3s: {repr(text)}') It uses ring-buffer to store input bytes until it is able to decode them without UnicodeError. |
Beta Was this translation helpful? Give feedback.
-
If I might be so bold, this one decodes UTF-8 and returns either ›None‹ or the UTF-8 character… #!/usr/bin/python3
# -*- coding: UTF-8 -*-
# vim:fileencoding=UTF-8:ts=4
import sys
import select
class StdioReader:
def __init__(self):
self._selpoll = select.poll()
self._selpoll.register(sys.stdin, select.POLLIN)
self._bytes, self._index, self._expecting = bytearray(4), 0, 0
def __enter__(self):
return self
def __exit__(self, _type, value, traceback):
return _type, value, traceback
def getchar(self):
if not len(self._selpoll.poll(0)):
return None
charval = sys.stdin.buffer.read(1) # get a single byte (of a possible UTF-8 sequence)
charval = charval[0] # turn bytes array into an integer
if charval & 0x80 == 0: # is a single byte
character = chr(charval)
self._index = 0
self._expecting = 0
return character
else:
if charval & 0xe0 == 0xc0: # is first of two bytes
self._bytes[0], self._index, self._expecting = charval, 1, 1
elif charval & 0xf0 == 0xe0: # is first of three bytes
self._bytes[0], self._index, self._expecting = charval, 1, 2
elif charval & 0xf8 == 0xf0: # is first of four bytes
self._bytes[0], self._index, self._expecting = charval, 1, 3
elif charval & 0xc0 == 0x80: # is a sequence byte
self._bytes[self._index] = charval
self._index += 1
if self._index > self._expecting:
character = self._bytes[0:self._expecting+1].decode()
self._index, self._expecting = 0, 0
return character
else:
raise UnicodeError
return None
if __name__ == '__main__':
from time import sleep_ms
def classtest1():
rdr = StdioReader()
while True:
char = rdr.getchar()
if char:
print(f'[{char}]', end=' ')
else:
sleep_ms(20)
def classtest2():
with StdioReader() as rdr:
while True:
char = rdr.getchar()
if char:
print(f'[{char}]', end=' ')
else:
sleep_ms(20)
# classtest1()
classtest2() |
Beta Was this translation helpful? Give feedback.
-
Not sure if still relevant, import sys
import select
import time
from machine import Pin, RTC
def handle_exit():
led.low()
print("bye, bye")
global stop
stop = True
print("USAGE:")
print("time - to get the time on the MCU")
print("quit, exit or Ctrl+C to stop execution")
led = Pin("LED", Pin.OUT)
rtc = RTC()
selpoll = select.poll()
selpoll.register(sys.stdin, select.POLLIN)
stop = False
command = ""
try:
while not stop:
available = len(selpoll.poll(0))
if available > 0:
stravailable = sys.stdin.read(available)
command += stravailable
#hex_array = ' '.join(f'{ord(c):02x}' for c in command)
#print(f"command: {command} hex: {hex_array}")
print(stravailable, end="")
if len(command) > 0 and command.endswith("\n"):
command = command.rstrip()
if command == "time":
led.toggle()
print(f"The time on this MCU is: {rtc.datetime()}")
led.toggle()
elif command == "quit" or command == "exit":
handle_exit()
command = ""
time.sleep_ms(100)
led.toggle()
time.sleep_ms(100)
led.toggle()
except KeyboardInterrupt:
handle_exit() |
Beta Was this translation helpful? Give feedback.
-
This is a knowledge sharing post, because I did not find it documented anywhere, and had to dig into other questions and answers in order to get an idea for how to do it. Please be aware that if using Thonny, then characters are sent to the application only after ENTER is pressed.
I also created a gist which demonstrates how to listen to both wifi connections and serial commands in parallel: gist
Beta Was this translation helpful? Give feedback.
All reactions