|
1 | 1 | import firebase_admin
|
2 |
| -from firebase_admin import credentials , db |
3 |
| - |
| 2 | +from firebase_admin import credentials, db |
| 3 | +import os |
| 4 | +import json |
4 | 5 | from datetime import datetime
|
5 | 6 |
|
6 |
| -current_datetime = datetime.now() |
7 |
| -formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S") |
8 |
| - |
9 |
| -cred = credentials.Certificate("serviceAccountKey.json") |
| 7 | +# Read Firebase credentials from an environment variable |
| 8 | +firebase_credentials = os.getenv("FIREBASE_CREDENTIALS") |
10 | 9 |
|
11 |
| -firebase_admin.initialize_app(cred, { |
12 |
| - 'databaseURL': "https://speed-typing-test-shashi-singh-default-rtdb.firebaseio.com/" # For Realtime Database |
13 |
| -}) |
| 10 | +if firebase_credentials: |
| 11 | + firebase_credentials = json.loads(firebase_credentials) |
| 12 | + cred = credentials.Certificate(firebase_credentials) |
| 13 | + firebase_admin.initialize_app(cred, { |
| 14 | + 'databaseURL': "https://speed-typing-test-shashi-singh-default-rtdb.firebaseio.com/" |
| 15 | + }) |
| 16 | +else: |
| 17 | + raise ValueError("Firebase credentials not found. Set the FIREBASE_CREDENTIALS environment variable.") |
14 | 18 |
|
| 19 | +# Get reference to the Firebase Realtime Database |
15 | 20 | ref = db.reference("test")
|
16 |
| -data = ref.get() |
17 | 21 |
|
18 |
| -def checkUniqueUser(username): |
19 |
| - for key,value in data.items(): |
20 |
| - for names,entry in value.items(): |
21 |
| - if entry==username: |
22 |
| - # print("found") |
23 |
| - return True |
| 22 | +def checkUniqueUser(username): |
| 23 | + """Check if a user exists in the database.""" |
| 24 | + data = ref.get() # Fetch fresh data every time |
| 25 | + if data: |
| 26 | + for key, value in data.items(): |
| 27 | + if isinstance(value, dict): # Ensure it's a dictionary before iterating |
| 28 | + for names, entry in value.items(): |
| 29 | + if entry == username: |
| 30 | + return True |
24 | 31 | return False
|
25 | 32 |
|
26 |
| -def initialiseNewUser(username,password): |
27 |
| - newValue = { |
28 |
| - "name":username, |
29 |
| - "password":password |
| 33 | +def initialiseNewUser(username, password): |
| 34 | + """Add a new user to the database.""" |
| 35 | + new_value = { |
| 36 | + "name": username, |
| 37 | + "password": password |
30 | 38 | }
|
31 |
| - ref.child(username).update(newValue) |
| 39 | + ref.child(username).set(new_value) # Use `set()` instead of `update()` to avoid partial overwrites |
32 | 40 |
|
33 | 41 | def uploadCurrentData(username, wpm, accuracy):
|
34 |
| - date = formatted_datetime |
35 |
| - newValue = { |
36 |
| - date: { |
37 |
| - "wpm": wpm, |
38 |
| - "accuracy": accuracy, |
39 |
| - }, |
| 42 | + """Upload user WPM and accuracy data with a timestamp.""" |
| 43 | + current_datetime = datetime.now() |
| 44 | + formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M:%S") |
| 45 | + |
| 46 | + new_value = { |
| 47 | + "wpm": wpm, |
| 48 | + "accuracy": accuracy, |
40 | 49 | }
|
| 50 | + |
41 | 51 | try:
|
42 |
| - if username != "Login": |
43 |
| - ref.child(username).child("history").push(newValue) |
44 |
| - except firebase_admin.exceptions.UnavailableError as e: |
| 52 | + if username and username != "Login": |
| 53 | + ref.child(username).child("history").child(formatted_datetime).set(new_value) |
| 54 | + except firebase_admin.exceptions.FirebaseError as e: |
45 | 55 | print(f"Failed to upload data: {e}")
|
46 | 56 | except Exception as e:
|
47 | 57 | print(f"Unexpected error: {e}")
|
48 | 58 |
|
49 |
| - |
50 | 59 | def getHistory(username):
|
51 |
| - newData = ref.child(username).child("history").get() |
52 |
| - output_list = [] |
53 |
| - final_list = [] |
54 |
| - temporary = [] |
| 60 | + """Retrieve the user's typing history.""" |
| 61 | + new_data = ref.child(username).child("history").get() |
| 62 | + if not new_data: |
| 63 | + return [] |
55 | 64 |
|
56 |
| - for a,b in newData.items(): |
57 |
| - for date,pairValue in b.items(): |
58 |
| - timestamp = date |
| 65 | + output_list = [] |
| 66 | + |
| 67 | + for timestamp, values in new_data.items(): |
| 68 | + try: |
59 | 69 | datetime_obj = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
|
60 |
| - |
61 | 70 | real_date = datetime_obj.date()
|
62 | 71 | real_time = datetime_obj.time()
|
63 | 72 |
|
64 |
| - # print("Date: ",date) |
65 |
| - |
66 |
| - final_list.append(real_date) |
67 |
| - final_list.append(real_time) |
68 |
| - |
69 |
| - # print("finallist",final_list) |
70 |
| - |
71 |
| - for key,value in pairValue.items(): |
72 |
| - # print(key,value) |
73 |
| - temporary.append(key) |
74 |
| - temporary.append(value) |
75 |
| - |
76 |
| - # print("pairValue",pairValue) |
77 |
| - # print("temporary", temporary) |
78 |
| - final_list.append(temporary) |
79 |
| - |
80 |
| - temporary = [] |
81 |
| - |
82 |
| - output_list.append(final_list) |
83 |
| - final_list = [] |
84 |
| - |
85 |
| - # count = len(output_list) |
86 |
| - # print(count) |
87 |
| - # for i in output_list: |
88 |
| - # print(i) |
| 73 | + entry = { |
| 74 | + "date": real_date, |
| 75 | + "time": real_time, |
| 76 | + "wpm": values.get("wpm", 0), |
| 77 | + "accuracy": values.get("accuracy", 0), |
| 78 | + } |
| 79 | + output_list.append(entry) |
| 80 | + except ValueError: |
| 81 | + print(f"Skipping invalid timestamp: {timestamp}") |
89 | 82 |
|
90 | 83 | return output_list
|
91 | 84 |
|
|
0 commit comments