Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/admin console #11

Merged
merged 2 commits into from
Feb 13, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 145 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1387,13 +1387,157 @@ def get_admin(req):
if not user.is_admin:
return RedirectResponse("/dashboard", status_code=303)

success = req.query_params.get('success')

admin_page = Container(
H2("Admin Dashboard"),
P("Welcome to the admin dashboard. More features coming soon.")
P("Welcome to the admin dashboard."),
Div(P("Database uploaded successfully!", cls="success"), cls="alert") if success else None,
Article(
H2("Database Management"),
A(Button("Download Database"), href="/admin/download-db", cls="primary"),
Form(
Input(type="file", name="dbfile", accept=".db", required=True),
Button("Upload Database", type="submit"),
action="/admin/upload-db",
method="post",
enctype="multipart/form-data"
)
)
)

return generate_themed_page(admin_page, auth=auth, page_title="Admin Dashboard")

@app.get("/admin/download-db")
def download_db(req):
auth = req.scope.get("auth")
if not auth:
return RedirectResponse("/login", status_code=303)

user = users("id=?", (auth,))[0]
if not user.is_admin:
return RedirectResponse("/dashboard", status_code=303)

try:
import sqlite3
import os
from datetime import datetime

# Create a backup file with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"data/feedback_backup_{timestamp}.db"

# Create a proper SQLite backup
source = sqlite3.connect("data/feedback.db")
dest = sqlite3.connect(backup_path)
source.backup(dest)
source.close()
dest.close()

# Serve the backup file
response = FileResponse(backup_path, filename="feedback-backup.db", media_type="application/x-sqlite3")

# Delete the backup file after sending
def cleanup(response):
if os.path.exists(backup_path):
os.remove(backup_path)
return response

response.background = cleanup
return response

except Exception as e:
logger.error(f"Database backup failed: {str(e)}")
return Titled("Error", P("Failed to create database backup."))

@app.post("/admin/upload-db")
async def upload_db(req):
auth = req.scope.get("auth")
if not auth:
return RedirectResponse("/login", status_code=303)

user = users("id=?", (auth,))[0]
if not user.is_admin:
return RedirectResponse("/dashboard", status_code=303)

try:
import sqlite3
import os
from datetime import datetime

form = await req.form()
file = form["dbfile"]
if not file.filename.endswith('.db'):
return Titled("Error", P("Invalid file type. Please upload a .db file."))

# Create a temporary file for the uploaded content
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
temp_path = f"data/upload_temp_{timestamp}.db"

try:
# Save uploaded content to temporary file
contents = await file.read()
with open(temp_path, "wb") as f:
f.write(contents)

# Verify it's a valid SQLite database
try:
temp_conn = sqlite3.connect(temp_path)
temp_conn.cursor().execute("SELECT name FROM sqlite_master WHERE type='table'")
temp_conn.close()
except sqlite3.Error:
os.remove(temp_path)
return Titled("Error", P("Invalid SQLite database file."))

# Create backup of current database using SQLite backup API
backup_path = f"data/feedback_backup_{timestamp}.db"
source = sqlite3.connect("data/feedback.db")
backup = sqlite3.connect(backup_path)
source.backup(backup)
source.close()
backup.close()

try:
# Replace current database with uploaded one using SQLite backup API
dest = sqlite3.connect("data/feedback.db")
source = sqlite3.connect(temp_path)
source.backup(dest)
source.close()
dest.close()

# Clean up temporary files
os.remove(temp_path)
os.remove(backup_path)

return RedirectResponse("/admin?success=true", status_code=303)

except Exception as e:
# Restore from backup if something went wrong
if os.path.exists(backup_path):
dest = sqlite3.connect("data/feedback.db")
backup = sqlite3.connect(backup_path)
backup.backup(dest)
backup.close()
dest.close()
os.remove(backup_path)
if os.path.exists(temp_path):
os.remove(temp_path)
logger.error(f"Database upload failed: {str(e)}")
return Titled("Error", P("Failed to upload database. The previous database has been restored."))

except Exception as e:
# Clean up temporary files
if os.path.exists(temp_path):
os.remove(temp_path)
if os.path.exists(backup_path):
os.remove(backup_path)
logger.error(f"Database upload failed: {str(e)}")
return Titled("Error", P("Failed to process uploaded file."))

except Exception as e:
logger.error(f"Database upload failed: {str(e)}")
return Titled("Error", P("Failed to process upload request."))

@app.post("/feedback-process/{process_id}/send_email")
def send_feedback_email_route(process_id: str, token: str, recipient_first_name: str = "", recipient_company: str = ""):
try:
Expand Down