1
1
import os
2
2
import shutil
3
+ from functools import wraps
3
4
from pathlib import Path
4
- from typing import Any , List , Optional
5
+ from typing import Any , Callable , List , Optional , Tuple
5
6
6
7
from commonwealth .mavlink_comm .exceptions import (
7
8
FetchUpdatedMessageFail ,
17
18
from loguru import logger
18
19
19
20
from autopilot_manager import AutoPilotManager
20
- from exceptions import InvalidFirmwareFile
21
+ from exceptions import InvalidFirmwareFile , NoDefaultFirmwareAvailable
21
22
from typedefs import Firmware , FlightController , Parameters , Serial , SITLFrame , Vehicle
22
23
23
24
index_router_v1 = APIRouter (
29
30
autopilot = AutoPilotManager ()
30
31
31
32
33
+ def index_to_http_exception (endpoint : Callable [..., Any ]) -> Callable [..., Any ]:
34
+ @wraps (endpoint )
35
+ async def wrapper (* args : Tuple [Any ], ** kwargs : dict [str , Any ]) -> Any :
36
+ try :
37
+ return await endpoint (* args , ** kwargs )
38
+ except HTTPException as error :
39
+ raise error
40
+ except Exception as error :
41
+ raise HTTPException (status_code = status .HTTP_500_INTERNAL_SERVER_ERROR , detail = str (error )) from error
42
+
43
+ return wrapper
44
+
45
+
32
46
# By default, all REST resources should have its own router, but as some of them does not implement
33
47
# all the CRUD operations, we gonna keep ones that have less than 2 endpoints in the index router.
34
48
@@ -63,16 +77,19 @@ def raise_lock(*raise_args: str, **kwargs: int) -> None:
63
77
64
78
65
79
@index_router_v1 .put ("/serials" , status_code = status .HTTP_200_OK )
80
+ @index_to_http_exception
66
81
def update_serials (serials : List [Serial ] = Body (...)) -> Any :
67
82
autopilot .update_serials (serials )
68
83
69
84
70
85
@index_router_v1 .get ("/serials" , response_model = List [Serial ])
86
+ @index_to_http_exception
71
87
def get_serials () -> Any :
72
88
return autopilot .get_serials ()
73
89
74
90
75
91
@index_router_v1 .get ("/firmware_info" , response_model = FirmwareInfo , summary = "Get version and type of current firmware." )
92
+ @index_to_http_exception
76
93
async def get_firmware_info () -> Any :
77
94
if not autopilot .current_board :
78
95
message = "No board running, firmware information is unavailable"
@@ -87,6 +104,7 @@ async def get_firmware_info() -> Any:
87
104
88
105
89
106
@index_router_v1 .get ("/vehicle_type" , response_model = MavlinkVehicleType , summary = "Get mavlink vehicle type." )
107
+ @index_to_http_exception
90
108
async def get_vehicle_type () -> Any :
91
109
if not autopilot .current_board :
92
110
message = "No board running, vehicle type is unavailable"
@@ -101,11 +119,13 @@ async def get_vehicle_type() -> Any:
101
119
102
120
103
121
@index_router_v1 .post ("/sitl_frame" , summary = "Set SITL Frame type." )
122
+ @index_to_http_exception
104
123
async def set_sitl_frame (frame : SITLFrame ) -> Any :
105
124
return autopilot .set_sitl_frame (frame )
106
125
107
126
108
127
@index_router_v1 .get ("/firmware_vehicle_type" , response_model = str , summary = "Get firmware vehicle type." )
128
+ @index_to_http_exception
109
129
async def get_firmware_vehicle_type () -> Any :
110
130
if not autopilot .current_board :
111
131
raise RuntimeError ("Cannot fetch vehicle type info as there's no board running." )
@@ -117,11 +137,13 @@ async def get_firmware_vehicle_type() -> Any:
117
137
response_model = List [Firmware ],
118
138
summary = "Retrieve dictionary of available firmwares versions with their respective URL." ,
119
139
)
140
+ @index_to_http_exception
120
141
async def get_available_firmwares (vehicle : Vehicle , board_name : Optional [str ] = None ) -> Any :
121
142
return autopilot .get_available_firmwares (vehicle , (await target_board (board_name )).platform )
122
143
123
144
124
145
@index_router_v1 .post ("/install_firmware_from_url" , summary = "Install firmware for given URL." )
146
+ @index_to_http_exception
125
147
@single_threaded (callback = raise_lock )
126
148
async def install_firmware_from_url (
127
149
url : str ,
@@ -145,6 +167,7 @@ async def install_firmware_from_url(
145
167
146
168
147
169
@index_router_v1 .post ("/install_firmware_from_file" , summary = "Install firmware from user file." )
170
+ @index_to_http_exception
148
171
@single_threaded (callback = raise_lock )
149
172
async def install_firmware_from_file (
150
173
binary : UploadFile = File (...),
@@ -171,65 +194,77 @@ async def install_firmware_from_file(
171
194
@index_router_v1 .get (
172
195
"/board" , response_model = Optional [FlightController ], summary = "Check what is the current running board."
173
196
)
197
+ @index_to_http_exception
174
198
def get_board () -> Any :
175
199
return autopilot .current_board
176
200
177
201
178
202
@index_router_v1 .post ("/board" , summary = "Set board to be used." )
203
+ @index_to_http_exception
179
204
async def set_board (board : FlightController , sitl_frame : SITLFrame = SITLFrame .VECTORED ) -> Any :
180
205
autopilot .current_sitl_frame = sitl_frame
181
206
await autopilot .change_board (board )
182
207
183
208
184
209
@index_router_v1 .post ("/restart" , summary = "Restart the autopilot with current set options." )
210
+ @index_to_http_exception
185
211
async def restart () -> Any :
186
212
logger .debug ("Restarting ardupilot..." )
187
213
await autopilot .restart_ardupilot ()
188
214
logger .debug ("Ardupilot successfully restarted." )
189
215
190
216
191
217
@index_router_v1 .post ("/start" , summary = "Start the autopilot." )
218
+ @index_to_http_exception
192
219
async def start () -> Any :
193
220
logger .debug ("Starting ardupilot..." )
194
221
await autopilot .start_ardupilot ()
195
222
logger .debug ("Ardupilot successfully started." )
196
223
197
224
198
225
@index_router_v1 .post ("/preferred_router" , summary = "Set the preferred MAVLink router." )
226
+ @index_to_http_exception
199
227
async def set_preferred_router (router : str ) -> Any :
200
228
logger .debug ("Setting preferred Router" )
201
229
await autopilot .set_preferred_router (router )
202
230
logger .debug (f"Preferred Router successfully set to { router } " )
203
231
204
232
205
233
@index_router_v1 .get ("/preferred_router" , summary = "Retrieve preferred router" )
234
+ @index_to_http_exception
206
235
def preferred_router () -> Any :
207
236
return autopilot .load_preferred_router ()
208
237
209
238
210
239
@index_router_v1 .get ("/available_routers" , summary = "Retrieve preferred router" )
240
+ @index_to_http_exception
211
241
def available_routers () -> Any :
212
242
return autopilot .get_available_routers ()
213
243
214
244
215
245
@index_router_v1 .post ("/stop" , summary = "Stop the autopilot." )
246
+ @index_to_http_exception
216
247
async def stop () -> Any :
217
248
logger .debug ("Stopping ardupilot..." )
218
249
await autopilot .kill_ardupilot ()
219
250
logger .debug ("Ardupilot successfully stopped." )
220
251
221
252
222
253
@index_router_v1 .post ("/restore_default_firmware" , summary = "Restore default firmware." )
254
+ @index_to_http_exception
223
255
async def restore_default_firmware (board_name : Optional [str ] = None ) -> Any :
224
256
try :
225
257
await autopilot .kill_ardupilot ()
226
258
autopilot .restore_default_firmware (await target_board (board_name ))
259
+ except (NoDefaultFirmwareAvailable , ValueError ) as error :
260
+ raise HTTPException (status_code = status .HTTP_404_NOT_FOUND , detail = str (error )) from error
227
261
finally :
228
262
await autopilot .start_ardupilot ()
229
263
230
264
231
265
@index_router_v1 .get (
232
266
"/available_boards" , response_model = List [FlightController ], summary = "Retrieve list of connected boards."
233
267
)
268
+ @index_to_http_exception
234
269
async def available_boards () -> Any :
235
270
return await autopilot .available_boards (True )
0 commit comments