Skip to content

Commit e6ab341

Browse files
authored
Fix: support very large number of open websockets (#422)
Problem: opening more than ~2000 message websockets hits the channel limit of RabbitMQ and resets the connection. Solution: create one channel per API process, as they are long-lived and should not be created for each operation according to the doc.
1 parent dd5557c commit e6ab341

File tree

5 files changed

+40
-30
lines changed

5 files changed

+40
-30
lines changed

src/aleph/api_entrypoint.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
APP_STATE_NODE_CACHE,
2222
APP_STATE_P2P_CLIENT,
2323
APP_STATE_SESSION_FACTORY,
24-
APP_STATE_STORAGE_SERVICE,
24+
APP_STATE_STORAGE_SERVICE, APP_STATE_MQ_CHANNEL,
2525
)
2626

2727

@@ -52,10 +52,16 @@ async def configure_aiohttp_app(
5252

5353
app = create_aiohttp_app()
5454

55+
# Reuse the connection of the P2P client to avoid opening two connections
56+
mq_conn = p2p_client.mq_client.connection
57+
# Channels are long-lived, so we create one at startup. Otherwise, we end up hitting
58+
# the channel limit if we create a channel for each operation.
59+
mq_channel = await mq_conn.channel()
60+
5561
app[APP_STATE_CONFIG] = config
5662
app[APP_STATE_P2P_CLIENT] = p2p_client
57-
# Reuse the connection of the P2P client to avoid opening two connections
58-
app[APP_STATE_MQ_CONN] = p2p_client.mq_client.connection
63+
app[APP_STATE_MQ_CONN] = mq_conn
64+
app[APP_STATE_MQ_CHANNEL] = mq_channel
5965
app[APP_STATE_NODE_CACHE] = node_cache
6066
app[APP_STATE_STORAGE_SERVICE] = storage_service
6167
app[APP_STATE_SESSION_FACTORY] = session_factory

src/aleph/web/controllers/app_state_getters.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
APP_STATE_CONFIG = "config"
2020
APP_STATE_MQ_CONN = "mq_conn"
21+
APP_STATE_MQ_CHANNEL = "mq_channel"
2122
APP_STATE_NODE_CACHE = "node_cache"
2223
APP_STATE_P2P_CLIENT = "p2p_client"
2324
APP_STATE_SESSION_FACTORY = "session_factory"
@@ -45,6 +46,10 @@ def get_mq_conn_from_request(request: web.Request) -> aio_pika.abc.AbstractConne
4546
return cast(aio_pika.abc.AbstractConnection, request.app[APP_STATE_MQ_CONN])
4647

4748

49+
def get_mq_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractChannel:
50+
return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_CHANNEL])
51+
52+
4853
def get_node_cache_from_request(request: web.Request) -> NodeCache:
4954
return cast(NodeCache, request.app[APP_STATE_NODE_CACHE])
5055

src/aleph/web/controllers/messages.py

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@
3131
from aleph.types.db_session import DbSessionFactory, DbSession
3232
from aleph.types.message_status import MessageStatus
3333
from aleph.types.sort_order import SortOrder, SortBy
34-
from aleph.web.controllers.app_state_getters import get_session_factory_from_request
34+
from aleph.web.controllers.app_state_getters import (
35+
get_session_factory_from_request,
36+
get_mq_channel_from_request,
37+
get_config_from_request,
38+
)
3539
from aleph.web.controllers.utils import (
3640
DEFAULT_MESSAGES_PER_PAGE,
3741
DEFAULT_PAGE,
@@ -229,29 +233,16 @@ async def view_messages_list(request: web.Request) -> web.Response:
229233
)
230234

231235

232-
async def declare_mq_queue(
233-
mq_conn: aio_pika.abc.AbstractConnection, config: Config
234-
) -> aio_pika.abc.AbstractQueue:
235-
channel = await mq_conn.channel()
236-
mq_message_exchange = await channel.declare_exchange(
237-
name=config.rabbitmq.message_exchange.value,
238-
type=aio_pika.ExchangeType.TOPIC,
239-
auto_delete=False,
240-
)
241-
mq_queue = await channel.declare_queue(auto_delete=True)
242-
await mq_queue.bind(mq_message_exchange, routing_key="processed.*")
243-
return mq_queue
244-
245-
246-
async def messages_ws(request: web.Request):
236+
async def messages_ws(request: web.Request) -> web.WebSocketResponse:
247237
ws = web.WebSocketResponse()
248238
await ws.prepare(request)
249239

250-
mq_conn: aio_pika.abc.AbstractConnection = request.app["mq_conn"]
251-
session_factory: DbSessionFactory = request.app["session_factory"]
252-
config = request.app["config"]
240+
mq_channel: aio_pika.abc.AbstractChannel = get_mq_channel_from_request(request)
241+
session_factory: DbSessionFactory = get_session_factory_from_request(request)
242+
config = get_config_from_request(request)
243+
253244
mq_queue = await mq_make_aleph_message_topic_queue(
254-
mq_conn=mq_conn, config=config, routing_key="processed.*"
245+
channel=mq_channel, config=config, routing_key="processed.*"
255246
)
256247

257248
query_params = WsMessageQueryParams.parse_obj(request.query)

src/aleph/web/controllers/p2p.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
get_ipfs_service_from_request,
2525
get_p2p_client_from_request,
2626
get_mq_conn_from_request,
27+
get_mq_channel_from_request,
2728
)
2829
from aleph.web.controllers.utils import mq_make_aleph_message_topic_queue
2930

@@ -65,16 +66,22 @@ def _validate_request_data(config: Config, request_data: Dict) -> None:
6566
# Only accept publishing on the message topic.
6667
message_topic = config.aleph.queue_topic.value
6768
if topic != message_topic:
68-
raise web.HTTPForbidden(reason=f"Unauthorized P2P topic: {topic}. Use {message_topic}.")
69+
raise web.HTTPForbidden(
70+
reason=f"Unauthorized P2P topic: {topic}. Use {message_topic}."
71+
)
6972

7073
data = request_data.get("data")
7174
if not isinstance(data, str):
72-
raise web.HTTPUnprocessableEntity(reason="'data': expected a serialized JSON string.")
75+
raise web.HTTPUnprocessableEntity(
76+
reason="'data': expected a serialized JSON string."
77+
)
7378

7479
try:
7580
message_dict = json.loads(cast(str, request_data.get("data")))
7681
except ValueError:
77-
raise web.HTTPUnprocessableEntity(reason="'data': must be deserializable as JSON.")
82+
raise web.HTTPUnprocessableEntity(
83+
reason="'data': must be deserializable as JSON."
84+
)
7885

7986
_validate_message_dict(message_dict)
8087

@@ -187,9 +194,11 @@ async def pub_message(request: web.Request):
187194
config = get_config_from_request(request)
188195

189196
if request_data.sync:
190-
mq_conn = get_mq_conn_from_request(request)
197+
mq_channel = get_mq_channel_from_request(request)
191198
mq_queue = await mq_make_aleph_message_topic_queue(
192-
mq_conn=mq_conn, config=config, routing_key=f"*.{pending_message.item_hash}"
199+
channel=mq_channel,
200+
config=config,
201+
routing_key=f"*.{pending_message.item_hash}",
193202
)
194203
else:
195204
mq_queue = None

src/aleph/web/controllers/utils.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,10 @@ def cond_output(request, context, template):
143143

144144

145145
async def mq_make_aleph_message_topic_queue(
146-
mq_conn: aio_pika.abc.AbstractConnection,
146+
channel: aio_pika.abc.AbstractChannel,
147147
config: Config,
148148
routing_key: Optional[str] = None,
149149
) -> aio_pika.abc.AbstractQueue:
150-
channel = await mq_conn.channel()
151150
mq_message_exchange = await channel.declare_exchange(
152151
name=config.rabbitmq.message_exchange.value,
153152
type=aio_pika.ExchangeType.TOPIC,

0 commit comments

Comments
 (0)