bluetooth : After timeout, reconnection is successful. (Peripherals are always powered off) #16172
-
The other devise is turned off. However, after a timeout, when you retry, it appears to be connected. In reality, it has failed. The solution is described below.I would like the fix code to be put in the correct place. Where can I report this?
from micropython import const
import asyncio
import aioble
# import test_aioble as aioble
import sys
_TARGET_DEVICE_ADDR = const("d7:de:4c:f4:56:e5")
async def gather(ai):
ret=[]
async for x in ai: ret.append(x)
return ret
async def device_details(connection):
if connection is None:
return
try:
services = await gather(connection.services())
except asyncio.TimeoutError as e:
print("Timeout. (discovering services)")
sys.print_exception(e)
except Exception as e:
print("Error (discovering services): {}".format(e))
sys.print_exception(e)
else:
if services is None or len(services) <= 0:
print("\n\"Service\" not found.")
return
for s in services:
print("\t", s)
async def print_details(device):
connection = None
# connecting to device
try:
print("\nConnecting to {} ... ".format(device), end="")
connection = await device.connect(timeout_ms=2000)
except asyncio.TimeoutError as e:
print("Timeout.")
sys.print_exception(e)
except Exception as e:
print("Error: {}".format(e))
sys.print_exception(e)
else:
print("Connected.")
await device_details(connection)
await connection.disconnect()
async def main():
device = aioble.Device(aioble.ADDR_RANDOM, _TARGET_DEVICE_ADDR)
while True:
await print_details(device)
r = input("\nenter return (to retry) / q(uit). > ").strip()
if r.upper() == "Q":
sys.exit(0)
asyncio.run(main())
# https://github.com/micropython/micropython-lib/blob/master/micropython/bluetooth/aioble/aioble/central.py#L107
async def _connect(
connection, timeout_ms, scan_duration_ms, min_conn_interval_us, max_conn_interval_us
):
<< Omitted >>
try:
with DeviceTimeout(None, timeout_ms):
ble.gap_connect(
device.addr_type,
device.addr,
scan_duration_ms,
min_conn_interval_us,
max_conn_interval_us,
)
# Wait for the connected IRQ.
await connection._event.wait()
assert connection._conn_handle is not None
# Register connection handle -> device.
DeviceConnection._connected[connection._conn_handle] = connection
except:
device._connection = None
ble.gap_connect(None)
raise
finally:
# After timeout, don't hold a reference and ignore future events.
_connecting.remove(device) |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 2 replies
-
I think it will be better if you create an 'issue' in the repo. |
Beta Was this translation helpful? Give feedback.
-
While I might be not quite sure of the intension of your code, I would suggest you to test the state by using async def print_details(device):
connection = None
# connecting to device
try:
print("\nConnecting to {} ... ".format(device), end="")
connection = await device.connect(timeout_ms=2000)
except asyncio.TimeoutError as e:
print("Timeout.")
sys.print_exception(e)
except Exception as e:
print("Error: {}".format(e))
sys.print_exception(e)
else:
if connection.is_connected():
print("Connected.")
await device_details(connection)
else:
print("Disconnected.")
await connection.disconnect() |
Beta Was this translation helpful? Give feedback.
I think it will be better if you create an 'issue' in the repo.