forked from micropython/micropython
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathasyncio_task_exception.py
64 lines (53 loc) · 1.62 KB
/
asyncio_task_exception.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# Test the Task.exception() method
try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit
async def task(t, exc=None):
if t >= 0:
await asyncio.sleep(t)
if exc:
raise exc
async def main():
# Task that is not done yet raises an InvalidStateError
print("=" * 10)
t = asyncio.create_task(task(1))
await asyncio.sleep(0)
try:
t.exception()
assert False, "Should not get here"
except Exception as e:
print("Tasks that aren't done yet raise an InvalidStateError:", repr(e))
# Task that is cancelled raises CancelledError
print("=" * 10)
t = asyncio.create_task(task(1))
t.cancel()
await asyncio.sleep(0)
try:
print(repr(t.exception()))
print(t.cancelled())
assert False, "Should not get here"
except asyncio.CancelledError as e:
print("Cancelled tasks cannot retrieve exception:", repr(e))
# Task that starts, runs and finishes without an exception should return None
print("=" * 10)
t = asyncio.create_task(task(0.01))
await t
print("None when no exception:", t.exception())
# Task that raises immediately should return that exception
print("=" * 10)
t = asyncio.create_task(task(-1, ValueError))
try:
await t
assert False, "Should not get here"
except ValueError as e:
pass
print("Returned Exception:", repr(t.exception()))
# Task returns `none` when somehow an exception isn't in data
print("=" * 10)
t = asyncio.create_task(task(-1))
await t
t.data = "Example"
print(t.exception())
asyncio.run(main())