You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
CharlesFarhat opened this issue
Jan 22, 2025
· 1 comment
Labels
bugSomething that is supposed to be working; but isn'tcoreIssues that should be addressed in Ray CoretriageNeeds triage (eg: priority, bug/not-bug, and owning component)
I am deploying inside a docker my FastApi and the ray cluster head. My lifespan app is something like :
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup event
await init_db()
await init_ray()
# create workers
workers = [TestWorker.remote(f"worker_{i}") for i in range(1)]
for worker in workers:
worker.run.remote() # type: ignore
async def cleanup():
logfire.info(f"Cleanup triggered for FastAPI")
for worker in workers:
await worker.__ray_terminate__.remote() # type: ignore
await cleanup_db()
shutdown_ray()
try:
yield
finally:
await cleanup() # start cleanup
and my actor (the worker) is :
class AbstractWorkerDB():
def __init__(self, worker_name: str):
self.name = worker_name
self.running = True
atexit.register(self.__del__)
def __del__(self):
"""Handle SIGTERM signal by running destroy_worker synchronously"""
self.logfire.info(
f"Received SIGTERM signal, initiating graceful shutdown for worker {self.name}"
)
asyncio.run(self.destroy_worker())
async def _async_init(self):
# init db connection
async def destroy_worker(self):
"""Destroy the worker and close DB"""
async def run(self):
"""Main worker loop"""
try:
# Always wait for async initialization to complete
await self._async_init()
async with self.queue.iterator() as queue_iter:
async for message in queue_iter:
if not self.running:
break
async with message.process():
try:
task = self._task_adapter.validate_json(
message.body.decode()
)
asyncio.create_task(self.process_task(task))
except Exception as e:
self.logfire.error(
f"Error processing message: {str(e)}", exc_info=True
)
continue
finally:
# Ensure cleanup happens when Ray stops the worker
await self.destroy_worker()
each worker is pulling from a RabbitMQ queue this is why I have the loop on the run function.
But when fastAPI is hot reloading of docker compose down, I will get a :
backend-dev-1 | WARNING: StatReload detected changes in '***/app.py'. Reloading...
backend-dev-1 | *** SIGTERM received at time=1737543906 on cpu 1 ***
backend-dev-1 | PC: @ 0x7f043882aea6 (unknown) epoll_wait
backend-dev-1 | @ 0x7f043875e050 (unknown) (unknown)
backend-dev-1 | [2025-01-22 11:05:06,608 E 2484 2484] logging.cc:447: *** SIGTERM received at time=1737543906 on cpu 1 ***
backend-dev-1 | [2025-01-22 11:05:06,608 E 2484 2484] logging.cc:447: PC: @ 0x7f043882aea6 (unknown) epoll_wait
backend-dev-1 | [2025-01-22 11:05:06,608 E 2484 2484] logging.cc:447: @ 0x7f043875e050 (unknown) (unknown)
backend-dev-1 | 11:05:06.612 Cleanup triggered for FastAPI
backend-rabbitmq-dev-1 | 2025-01-22 11:05:06.613657+00:00 [warning] <0.1036.0> closing AMQP connection <0.1036.0> (172.20.0.7:52406 -> 172.20.0.4:5672, vhost: '/', user: 'guest'):
backend-rabbitmq-dev-1 | 2025-01-22 11:05:06.613657+00:00 [warning] <0.1036.0> client unexpectedly closed TCP connection
backend-dev-1 | 11:05:06.614 Shutting down Ray
backend-rabbitmq-dev-1 | 2025-01-22 11:05:06.649017+00:00 [warning] <0.1049.0> closing AMQP connection <0.1049.0> (172.20.0.7:52408 -> 172.20.0.4:5672, vhost: '/', user: 'guest'):
backend-rabbitmq-dev-1 | 2025-01-22 11:05:06.649017+00:00 [warning] <0.1049.0> client unexpectedly closed TCP connection
backend-dev-1 | ERROR: Traceback (most recent call last):
backend-dev-1 | File "/usr/local/lib/python3.12/asyncio/runners.py", line 194, in run
backend-dev-1 | return runner.run(main)
backend-dev-1 | ^^^^^^^^^^^^^^^^
backend-dev-1 | File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
backend-dev-1 | return self._loop.run_until_complete(task)
backend-dev-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
backend-dev-1 | File "/usr/local/lib/python3.12/asyncio/base_events.py", line 673, in run_until_complete
backend-dev-1 | self.run_forever()
backend-dev-1 | File "/usr/local/lib/python3.12/asyncio/base_events.py", line 640, in run_forever
backend-dev-1 | self._run_once()
backend-dev-1 | File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1954, in _run_once
backend-dev-1 | event_list = self._selector.select(timeout)
backend-dev-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
backend-dev-1 | File "/usr/local/lib/python3.12/selectors.py", line 468, in select
backend-dev-1 | fd_event_list = self._selector.poll(timeout, max_ev)
backend-dev-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
backend-dev-1 | File "/usr/local/lib/python3.12/site-packages/ray/_private/worker.py", line 1481, in sigterm_handler
backend-dev-1 | sys.exit(signum)
backend-dev-1 | SystemExit: 15
backend-dev-1 |
backend-dev-1 | During handling of the above exception, another exception occurred:
backend-dev-1 |
backend-dev-1 | Traceback (most recent call last):
backend-dev-1 | File "/usr/local/lib/python3.12/site-packages/starlette/routing.py", line 700, in lifespan
backend-dev-1 | await receive()
backend-dev-1 | File "/usr/local/lib/python3.12/site-packages/uvicorn/lifespan/on.py", line 137, in receive
backend-dev-1 | return await self.receive_queue.get()
backend-dev-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
backend-dev-1 | File "/usr/local/lib/python3.12/asyncio/queues.py", line 158, in get
backend-dev-1 | await getter
backend-dev-1 | asyncio.exceptions.CancelledError
backend-dev-1 |
backend-dev-1 | 2025-01-22 11:05:06,711 INFO Connection to pyamqp://guest:******@backend-rabbitmq-dev:5672// closed. Reconnecting after 5 seconds.
backend-dev-1 | Error in sys.excepthook:
backend-dev-1 |
backend-dev-1 | Original exception was:
atexit is never called and the actor is killed.
Same thing for the docker compose... I am doing something wrong here...
What I was ecxpecting :
The atexit worker function is called and it his gracefully destroyed
Thanks all for your help !
Versions / Dependencies
Ray 2.40.0, Python 3.12, Ubuntu 24
Reproduction script
Create an actor with an atexit function and send a SIGTERM to ray cluster
Issue Severity
High: It blocks me from completing my task.
The text was updated successfully, but these errors were encountered:
CharlesFarhat
added
bug
Something that is supposed to be working; but isn't
triage
Needs triage (eg: priority, bug/not-bug, and owning component)
labels
Jan 22, 2025
bugSomething that is supposed to be working; but isn'tcoreIssues that should be addressed in Ray CoretriageNeeds triage (eg: priority, bug/not-bug, and owning component)
What happened + What you expected to happen
Hello to all !
I am deploying inside a docker my FastApi and the ray cluster head. My lifespan app is something like :
and my actor (the worker) is :
each worker is pulling from a RabbitMQ queue this is why I have the loop on the run function.
But when fastAPI is hot reloading of docker compose down, I will get a :
atexit is never called and the actor is killed.
Same thing for the docker compose... I am doing something wrong here...
What I was ecxpecting :
The atexit worker function is called and it his gracefully destroyed
Thanks all for your help !
Versions / Dependencies
Ray 2.40.0, Python 3.12, Ubuntu 24
Reproduction script
Create an actor with an atexit function and send a SIGTERM to ray cluster
Issue Severity
High: It blocks me from completing my task.
The text was updated successfully, but these errors were encountered: