-
Notifications
You must be signed in to change notification settings - Fork 225
/
Copy pathdummy_app.py
144 lines (109 loc) · 4.48 KB
/
dummy_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import asyncio
import contextlib
import time
from contextlib import contextmanager
from datetime import datetime, timedelta
from multiprocessing.context import Process
import aiohttp
import pytest
import requests
from async_generator import yield_, async_generator
from flask import Flask, jsonify
from flask import request
from flask_api.status import HTTP_404_NOT_FOUND, HTTP_200_OK
@contextmanager
def setup_and_teardown_flask_app(app: Flask, host: str, port: int):
"""
Manages setup of provided flask app on given `host` and `port` and its teardown.
As for setup process following things are done:
* `/health` endpoint is added to provided flask app,
* app is launched in separate process,
* function waits for flask app to fully launch - to do this it repetitively checks `/health` endpoint if it will
return status code 200.
Example use of this function in fixture:
>>> with setup_and_teardown_flask_app(Flask(__name__), "localhost", 10000):
>>> yield
:param app: app to launch
:param host: host on which to launch app
:param port: port on which to launch app
"""
def wait_for_flask_app_to_be_accessible():
timeout = 1
end_time = datetime.now() + timedelta(seconds=timeout)
response = requests.Response()
response.status_code = HTTP_404_NOT_FOUND
while response.status_code != HTTP_200_OK and datetime.now() < end_time:
with contextlib.suppress(requests.exceptions.ConnectionError):
response = requests.request("POST", "http://{}:{}/health".format(host, port))
time.sleep(0.01)
fail_message = "Timeout expired: failed to start mock REST API in {} seconds".format(timeout)
assert response.status_code == HTTP_200_OK, fail_message
app.route("/health", methods=["POST"])(lambda: "OK")
process = Process(target=app.run, args=(host, port))
process.start()
wait_for_flask_app_to_be_accessible()
yield
process.terminate()
process.join()
def create_server():
app = Flask(__name__)
app.pre_computation_value = 0
app.post_computation_value = 0
@app.route("/pre-computation-value", methods=["PUT"])
def set_pre_computation_value():
app.pre_computation_value = request.json["value"]
return ""
@app.route("/pre-computation-value", methods=["GET"])
def get_pre_computation_value():
return jsonify(app.pre_computation_value)
@app.route("/post-computation-value", methods=["PUT"])
def set_post_computation_value():
app.post_computation_value = request.json["value"]
return ""
@app.route("/post-computation-value", methods=["GET"])
def get_post_computation_value():
return jsonify(app.post_computation_value)
return app
class DummyApp:
"""
This has to simulate real application that gets input from server, processes it and posts it.
"""
def __init__(self, host, port, tick_rate_s):
self.host = host
self.port = port
self.tick_rate_s = tick_rate_s
self.stored_value = 0
async def run(self):
await asyncio.gather(self.run_getter(), self.run_poster())
async def run_getter(self):
async with aiohttp.ClientSession() as session:
while True:
response = await session.get("http://{}:{}/pre-computation-value".format(self.host, self.port))
self.stored_value = int(await response.text())
await asyncio.sleep(self.tick_rate_s)
async def run_poster(self):
async with aiohttp.ClientSession() as session:
while True:
await session.put(
"http://{}:{}/post-computation-value".format(self.host, self.port),
json={"value": self.stored_value + 1},
)
await asyncio.sleep(self.tick_rate_s)
@pytest.fixture
def dummy_server_host():
return "localhost"
@pytest.fixture
def launch_dummy_server(dummy_server_host, unused_tcp_port):
with setup_and_teardown_flask_app(create_server(), dummy_server_host, unused_tcp_port):
yield
@pytest.fixture
def app_tick_interval():
return 0.01
@pytest.fixture
@async_generator
async def launch_dummy_app(event_loop, launch_dummy_server, dummy_server_host, unused_tcp_port, app_tick_interval):
app = DummyApp(dummy_server_host, unused_tcp_port, app_tick_interval)
task = event_loop.create_task(app.run())
await yield_(None)
task.cancel()
await asyncio.sleep(0)