-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathroutes.py
144 lines (120 loc) · 5.22 KB
/
routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask_login import login_user, logout_user, current_user, login_required
from .models import API, User
from .forms import LoginForm, APIForm, APIRequestForm
import requests
from .utils import login_manager
main = Blueprint('main', __name__)
@login_manager.unauthorized_handler
def unauthorized_callback():
return redirect('/login?next=' + request.path)
@main.route('/')
@login_required
def index():
user_apis = API.select().where(API.user == current_user.id)
return render_template('index.html', apis=user_apis)
@main.route('/api/<int:api_id>', methods=['GET', 'POST'])
@login_required
def api_detail(api_id):
api = API.get_or_none(API.ID == api_id)
if not api or api.user.username != current_user.id:
flash("API not found or you don't have permission to access it", "error")
print(f'API not found or you dont have permission to access it {current_user.id}')
return redirect(url_for('main.index'))
form = APIRequestForm()
response = None
if form.validate_on_submit():
method = form.request_method.data
body_type = form.body_type.data
headers = {}
payload = None
# Process headers
if form.headers.data:
headers = dict(line.split('=', 1) for line in form.headers.data.splitlines())
try:
if body_type == 'json':
headers['Content-Type'] = 'application/json'
payload = form.request_payload.data
response = requests.request(method, api.endpoint, headers=headers, json=payload)
elif body_type == 'form':
headers['Content-Type'] = 'application/x-www-form-urlencoded'
form_data = dict(line.split('=', 1) for line in form.form_data.data.splitlines())
response = requests.request(method, api.endpoint, headers=headers, data=form_data)
elif body_type == 'file':
file = form.file_upload.data
files = {'file': (file.filename, file.stream, file.mimetype)}
response = requests.request(method, api.endpoint, headers=headers, files=files)
else: # 'none'
response = requests.request(method, api.endpoint, headers=headers)
except Exception as e:
flash(f"Error sending request: {e}", "error")
return render_template('api_detail.html', api=api, response=response, form=form)
@main.route('/api/new', methods=['GET', 'POST'])
@login_required
def new_api():
form = APIForm()
if form.validate_on_submit():
API.create(
user=current_user.id,
name=form.name.data,
endpoint=form.endpoint.data,
method=form.method.data,
description=form.description.data
)
#API.save()
flash("API created successfully!", "success")
return redirect(url_for('main.index'))
return render_template('create_api.html', form=form)
@main.route('/api/<int:api_id>/delete', methods=['POST'])
@login_required
def delete_api(api_id):
api = API.get_or_none(API.ID == api_id)
if api and api.user == current_user.id:
api.delete_instance()
flash("API deleted successfully!", "success")
else:
flash("API not found or you don't have permission to delete it", "error")
return redirect(url_for('main.index'))
@main.route('/api/<int:api_id>/request', methods=['POST'])
@login_required
def api_request(api_id):
api = API.get_or_none(API.ID == api_id)
if not api or api.user != current_user.id:
flash("API not found or you don't have permission to access it", "error")
return redirect(url_for('main.index'))
form = APIRequestForm()
if form.validate_on_submit():
method = form.request_method.data
payload = form.request_payload.data
response = None
try:
headers = {'Content-Type': 'application/json'}
if method == 'GET':
response = requests.get(api.endpoint, headers=headers)
elif method == 'POST':
response = requests.post(api.endpoint, headers=headers, data=payload)
elif method == 'PUT':
response = requests.put(api.endpoint, headers=headers, data=payload)
elif method == 'DELETE':
response = requests.delete(api.endpoint, headers=headers)
except Exception as e:
flash(f"Error sending request: {e}", "error")
return render_template('api_detail.html', api=api, response=response, form=form)
@main.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = User.get_or_none(User.username == form.username.data)
if user and user.check_password(form.password.data):
login_user(user)
flash("Logged in successfully!", "success")
return redirect(url_for('main.index'))
else:
flash("Invalid username or password", "error")
return render_template('login.html', form=form)
@main.route('/logout')
@login_required
def logout():
logout_user()
flash("Logged out successfully!", "success")
return redirect(url_for('main.login'))