-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
149 lines (117 loc) · 4.62 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from flask import Flask, jsonify
import os, json, requests, time
from werkzeug.serving import is_running_from_reloader
from multiprocessing import Process, Manager
app = Flask(__name__)
# credits: dict[
# str, list[
# dict[
# str, str | int
# ]
# ]
# ] = {}
manager = Manager()
credits = manager.dict()
last_refreshed: int = 0
PORT = os.getenv("PORT", 8000)
DEBUG = os.getenv("DEBUG", False)
GJ_USER_INFO_URL = "http://www.boomlings.com/database/getGJUserInfo20.php"
# originally written by Prevter in JS, translated to Python by ~~me~~ chatgpt
def parse_key_map(key_map):
keys_values = key_map.split(":")
return {keys_values[i]: keys_values[i + 1] for i in range(0, len(keys_values), 2)}
def retrieve_credits(credits):
if not is_running_from_reloader() and DEBUG:
# flask's default debug functionality runs the script twice
# while I could disable it, I don't wanna lose live reloading (in debug)
# so this works as a workaround
# source: https://stackoverflow.com/a/25504196/20616402
print("not running from reloader! returning...")
return
old_credits: dict[
str, list[
dict[
str, str | int
]
]
] = {}
# doing this to copy the types over
new_credits = old_credits.copy()
temp_cache: dict[int, dict[
str, str | int
]] = {}
with open("credits.json", "r") as f:
old_credits = json.load(f)
for role in old_credits.keys():
list_len = len(old_credits[role])
new_credits[role] = [{}] * list_len
for index in range(list_len):
new_credit = old_credits[role][index]
if new_credit["accountID"] in temp_cache:
print(f"user {new_credit['name']} already in cache, using cached value instead")
new_credits[role][index] = temp_cache[new_credit["accountID"]]
continue
req_start_time = time.time()
response_text = requests.post(GJ_USER_INFO_URL, data={
"secret": "Wmfd2893gb7",
"targetAccountID": new_credit["accountID"]
}, headers={
"User-Agent": ""
}).text
req_duration = time.time() - req_start_time
if response_text.split(":")[1] == " 1015": # rate limiting!! (unlikely because we sleep in between requests but whatever)
print("somehow we're getting rate limited! ending cache update")
return
if response_text.split(":")[1] == " 1006": # ip blocked!! please setup a proxy
print("error code 1006 recieved from boomlings. i'm sure you know what to do now")
return
response = parse_key_map(response_text)
color1 = int(response["10"])
color2 = int(response["11"])
color3 = int(response["51"])
if int(response["28"]) == 0: color3 = -1
iconID = int(response["21"])
gameName = response["1"]
new_credit["color1"] = color1
new_credit["color2"] = color2
new_credit["color3"] = color3
new_credit["iconID"] = iconID
new_credit["gameName"] = gameName
print(new_credit)
temp_cache[new_credit["accountID"]] = new_credit
new_credits[role][index] = new_credit
time.sleep(2 - req_duration)
_credits = new_credits
for role in _credits.keys():
credits[role] = _credits[role]
@app.get("/")
def index():
return jsonify("Credit server running!")
@app.get("/credits")
def send_credits():
return jsonify(credits.copy())
last_modified = 0
def check_credits(credits):
global last_modified
global last_refreshed
while True:
modified_time = os.path.getmtime("credits.json")
last_modified = last_modified if last_modified != 0 else modified_time
if modified_time > last_modified:
last_modified = modified_time
print("file changed! retriving credits...")
retrieve_credits(credits)
last_refreshed = time.time()
if time.time() - last_refreshed >= 3600 * 24: # 3600 * 24 = one day (24hrs) in seconds
print("reloading cache...")
retrieve_credits(credits)
last_refreshed = time.time()
time.sleep(1)
process = None
if is_running_from_reloader() or not DEBUG:
process = Process(target=check_credits, args=(credits,))
process.start()
if __name__ == "__main__":
app.run(host="0.0.0.0", port=PORT, debug=DEBUG)
if process is not None and DEBUG:
process.join()