-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
91 lines (81 loc) · 3.26 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#!/usr/bin/env python3
import uvicorn
import multiprocessing as mp
from fastapi import FastAPI, Request, HTTPException, Path, UploadFile, File
from fastapi.responses import PlainTextResponse, Response
import zipfile
import os
import shlex
import subprocess
import uuid
import traceback
from io import BytesIO
import shutil
import src.handler.wasm as wasm
import src.handler.arduino as arduino
BASE_DIR = "TEMP_COMPILE_FILES"
app = FastAPI()
# TODO we could make this better with classes sometime
def createShellCallAndPreprocess(tmpdir, device_name):
if device_name == "WASM" or \
device_name == "WASM-single-file":
return wasm.create_emscripten_call_and_preprocess(tmpdir, device_name)
elif device_name == "nicla" or \
device_name == "nano" or \
device_name == "xiao":
return arduino.createArduinoCliCall(tmpdir, device_name)
else:
raise Exception(f'Unknown device: {device_name}')
def post_call_read_binary(tmpdir, device_name):
if device_name == "WASM" or \
device_name == "WASM-single-file":
return wasm.read_output(tmpdir, device_name)
elif device_name == "nicla" or \
device_name == "nano" or \
device_name == "xiao":
return arduino.read_ino(tmpdir)
else:
raise Exception(f'Unknown device: {device_name}')
@app.post("/compileFirmware/{device_name}")
async def postCompileFirmware(request: Request, device_name: str = Path(...)):
data = await request.json()
binaryFile = compileFirmware(data["main"], data["header"], device_name)
if binaryFile == None:
raise HTTPException(status_code=500, detail="Server could not compile binary from supplied files")
return Response(binaryFile)
@app.post("/compile/{device_name}")
async def compileFirmware(device_name: str, file: UploadFile = File(...)):
compile_id = str(uuid.uuid4())
folder = os.path.join(BASE_DIR, compile_id, "main")
deleteFolder = os.path.join(BASE_DIR, compile_id)
try:
content = await file.read()
with zipfile.ZipFile(BytesIO(content)) as zip_file:
if not os.path.exists(folder):
os.makedirs(folder)
zip_file.extractall(folder)
shell_cmd = shlex.split(createShellCallAndPreprocess(folder, device_name))
process = subprocess.Popen(shell_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while True:
output = process.stdout.readline().decode().strip()
if output == '' and process.poll() is not None:
break
if output:
print(output)
process.wait()
if process.returncode != 0:
raise Exception("Compilation failed")
firmware = post_call_read_binary(folder, device_name)
response = Response(content=firmware, media_type="application/octet-stream")
if os.path.exists(folder):
shutil.rmtree(deleteFolder)
return response
except Exception as e:
if os.path.exists(folder):
shutil.rmtree(deleteFolder)
print(e)
print(traceback.format_exc())
raise e
mp.set_start_method("forkserver", force=True)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=3005)