-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_benchmark.py
More file actions
executable file
·147 lines (122 loc) · 4.93 KB
/
test_benchmark.py
File metadata and controls
executable file
·147 lines (122 loc) · 4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python
import datetime
import time
from urllib import request
import json
import sys
RECHECK_RESULT_TIME = 10
STATUS_FINISHED = 'FINISHED'
GET_RESULT_ENDPOINT = '/api/v1.0/get_result/'
SET_RESULT_ENDPOINT = '/api/v1.0/set_result/'
RUN_BENCHMARK_ENDPOINT = '/api/v1.0/run_benchmark'
TOTAL_EXECUTION_TIMEOUT_LIMIT = 60 * 5
def build_url(base, endpoint):
return base + endpoint
def assert_evaluations_ok(result_id, result):
evaluations = result.get('evaluations', {})
eval_passed = evaluations.get('passed', False)
assert eval_passed, f"Evaluations didn't pass. Evaluations: {evaluations}"
return result
def force_result_by_timeout(base_url, result_id):
endpoint = SET_RESULT_ENDPOINT + result_id
url = build_url(base_url, endpoint)
params = {
'error': f'timeout_limit_exceded:{TOTAL_EXECUTION_TIMEOUT_LIMIT}'
}
res = make_request_post(url, params)
if res.status == 200:
print('Sucessfully forced result')
else:
content = res.read().decode('utf8')
print('Problem forcing timeout result')
print(content)
def check_has_timed_out(start_time):
total_duration = datetime.datetime.now() - start_time
total_duration = total_duration.total_seconds()
print(f'Current execution time:{total_duration}/{TOTAL_EXECUTION_TIMEOUT_LIMIT}')
has_timed_out = total_duration > TOTAL_EXECUTION_TIMEOUT_LIMIT
return has_timed_out
def check_results(base_url, result_id, start_time):
endpoint = GET_RESULT_ENDPOINT + result_id
url = build_url(base_url, endpoint)
print(f'Checking results for "{result_id}" on url: {url}')
res = make_request_get(url)
if res.status == 200:
content = res.read().decode('utf8')
data = json.loads(content)
result = data.get('result')
status = data['status']
if result is not None or status == STATUS_FINISHED:
return assert_evaluations_ok(result_id, result)
else:
if check_has_timed_out(start_time):
force_result_by_timeout(base_url, result_id)
assert False, f"Benchmark timed out, execution took more than {TOTAL_EXECUTION_TIMEOUT_LIMIT} seconds"
print((
f'Resuts not ready yet for "{result_id}". '
f'Waiting {RECHECK_RESULT_TIME} seconds before checking results...'
))
time.sleep(RECHECK_RESULT_TIME)
return check_results(base_url, result_id, start_time)
def run(base_url, service_name, image_name, tag, start_time):
run_benchmark_url = build_url(base_url, RUN_BENCHMARK_ENDPOINT)
tag_to_use = tag
params = {
"override_services": {
service_name: {
'image': f'{image_name}:{tag_to_use}'
}
}
}
# params = {
# "override_services": {
# 'object-detection': {
# 'image': f'registry.insight-centre.org/sit/mps/content-extraction-service:{tag_to_use}'
# },
# 'forwarder': {
# 'image': f'registry.insight-centre.org/sit/mps/forwarder:{tag_to_use}'
# },
# 'matcher': {
# 'image': f'registry.insight-centre.org/sit/mps/matcher:{tag_to_use}'
# },
# 'event-dispatcher': {
# 'image': f'registry.insight-centre.org/sit/mps/event-dispatcher:{tag_to_use}'
# },
# 'preprocessor': {
# 'image': f'registry.insight-centre.org/sit/mps/preprocessing-service:{tag_to_use}'
# },
# }
# }
print(f'Sending requests for benchmark on url "{run_benchmark_url}" with params: {params}')
res = make_request_post(run_benchmark_url, params)
if res.status == 200:
content = res.read().decode('utf8')
data = json.loads(content)
if 'wait' in data:
wait_time = int(data['wait'])
print(f'Service is busy, waiting for {wait_time} seconds before next try...')
time.sleep(wait_time)
return run(base_url, service_name, image_name, tag, start_time)
else:
result_id = data['result_id']
print(f'Waiting {RECHECK_RESULT_TIME} seconds before checking results...')
time.sleep(RECHECK_RESULT_TIME)
return check_results(base_url, result_id, start_time)
def make_request_get(url):
response = request.urlopen(url)
return response
def make_request_post(url, data):
params = json.dumps(data).encode('utf8')
req = request.Request(url,
data=params, headers={'content-type': 'application/json'})
response = request.urlopen(req)
return response
if __name__ == '__main__':
benchmark_platform_controller_url = sys.argv[1]
service_name = sys.argv[2]
image_name = sys.argv[3]
tag = sys.argv[4]
start_time = datetime.datetime.now()
result = run(benchmark_platform_controller_url, service_name, image_name, tag, start_time)
print(result)
exit(0)