Skip to content

Fix progress bar cursor not being restored when interrupted #3690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 91 additions & 34 deletions rich/live.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
import sys
import os
import threading
from contextlib import contextmanager
from copy import copy
from datetime import datetime
from itertools import islice
from threading import Event, RLock, Thread
from time import monotonic
from types import TracebackType
from typing import IO, Any, Callable, List, Optional, TextIO, Type, cast
from typing import (
IO,
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
Iterable,
List,
Optional,
TextIO,
Tuple,
Type,
TypeVar,
Union,
cast,
)
import signal
import atexit

from . import get_console
from .console import Console, ConsoleRenderable, RenderableType, RenderHook
Expand Down Expand Up @@ -37,14 +62,18 @@ class Live(JupyterMixin, RenderHook):

Args:
renderable (RenderableType, optional): The renderable to live display. Defaults to displaying nothing.
console (Console, optional): Optional Console instance. Defaults to an internal Console instance writing to stdout.
console (Console, optional): Optional Console instance. Default will derive from
get_console().
screen (bool, optional): Enable alternate screen mode. Defaults to False.
auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()` or `update()` with refresh flag. Defaults to True
refresh_per_second (float, optional): Number of times per second to refresh the live display. Defaults to 4.
auto_refresh (bool, optional): Enable auto refresh. If disabled, you will need to call `refresh()`.
Defaults to True.
refresh_per_second (float, optional): Number of times per second to refresh the live display.
Defaults to 4.
transient (bool, optional): Clear the renderable on exit (has no effect when screen=True). Defaults to False.
redirect_stdout (bool, optional): Enable redirection of stdout, so ``print`` may be used. Defaults to True.
redirect_stderr (bool, optional): Enable redirection of stderr. Defaults to True.
vertical_overflow (VerticalOverflowMethod, optional): How to handle renderable when it is too tall for the console. Defaults to "ellipsis".
vertical_overflow (VerticalOverflowMethod, optional): How to handle renderable when it is too tall for the
console. Defaults to "ellipsis".
get_renderable (Callable[[], RenderableType], optional): Optional callable to get renderable. Defaults to None.
"""

Expand All @@ -63,30 +92,41 @@ def __init__(
get_renderable: Optional[Callable[[], RenderableType]] = None,
) -> None:
assert refresh_per_second > 0, "refresh_per_second must be > 0"
self.console = console or get_console()
self._renderable = renderable
self.console = console if console is not None else get_console()
self._screen = screen
self._alt_screen = False

self.auto_refresh = auto_refresh
self.refresh_per_second = refresh_per_second
self.transient = True if screen else transient
self._redirect_stdout = redirect_stdout
self._redirect_stderr = redirect_stderr
self.vertical_overflow = vertical_overflow
self._get_renderable = get_renderable
self._alt_screen: bool = False
self._restore_stdout: Optional[IO[str]] = None
self._restore_stderr: Optional[IO[str]] = None

self._lock = RLock()
self.ipy_widget: Optional[Any] = None
self.auto_refresh = auto_refresh
self._started: bool = False
self.transient = True if screen else transient

self._refresh_thread: Optional[_RefreshThread] = None
self.refresh_per_second = refresh_per_second

self.vertical_overflow = vertical_overflow
self._get_renderable = get_renderable
self._started: bool = False
self.ipy_widget: Any = None
self._live_render = LiveRender(
self.get_renderable(), vertical_overflow=vertical_overflow
)
self._original_sigint_handler = None
self._exit_handler_added = False

def _handle_sigint(self, sig, frame):
"""Handle SIGINT (Ctrl+C) to ensure cursor is shown."""
# Restore cursor
if self.console.is_terminal:
self.console.show_cursor(True)
# Re-raise KeyboardInterrupt to allow program to exit
raise KeyboardInterrupt()

def _ensure_cursor_visible_at_exit(self):
"""Ensure cursor is visible when program exits."""
if self.console.is_terminal:
self.console.show_cursor(True)

@property
def is_started(self) -> bool:
Expand All @@ -110,6 +150,13 @@ def start(self, refresh: bool = False) -> None:
with self._lock:
if self._started:
return
# Set up signal handler for Ctrl+C (only on non-Windows platforms)
if not self._exit_handler_added:
atexit.register(self._ensure_cursor_visible_at_exit)
self._exit_handler_added = True
# Only set up SIGINT handler on platforms that support it (not Windows)
if os.name != "nt" and hasattr(signal, "SIGINT"):
self._original_sigint_handler = signal.signal(signal.SIGINT, self._handle_sigint)
self.console.set_live(self)
self._started = True
if self._screen:
Expand Down Expand Up @@ -144,23 +191,33 @@ def stop(self) -> None:
self._refresh_thread = None
# allow it to fully render on the last even if overflow
self.vertical_overflow = "visible"
with self.console:
try:
if not self._alt_screen and not self.console.is_jupyter:
self.refresh()
finally:
self._disable_redirect_io()
self.console.pop_render_hook()
if not self._alt_screen and self.console.is_terminal:
self.console.line()
try:
with self.console:
try:
if not self._alt_screen and not self.console.is_jupyter:
self.refresh()
finally:
self._disable_redirect_io()
self.console.pop_render_hook()
if not self._alt_screen and self.console.is_terminal:
self.console.line()
self.console.show_cursor(True)
if self._alt_screen:
self.console.set_alt_screen(False)

if self.transient and not self._alt_screen:
self.console.control(self._live_render.restore_cursor())
if self.ipy_widget is not None and self.transient:
self.ipy_widget.close() # pragma: no cover
except:
# Ensure cursor is shown even if something goes wrong during cleanup
if self.console.is_terminal:
self.console.show_cursor(True)
if self._alt_screen:
self.console.set_alt_screen(False)

if self.transient and not self._alt_screen:
self.console.control(self._live_render.restore_cursor())
if self.ipy_widget is not None and self.transient:
self.ipy_widget.close() # pragma: no cover
raise
# Restore original signal handler (only on non-Windows platforms)
if os.name != "nt" and hasattr(signal, "SIGINT") and self._original_sigint_handler is not None:
signal.signal(signal.SIGINT, self._original_sigint_handler)
self._original_sigint_handler = None

def __enter__(self) -> "Live":
self.start(refresh=self._renderable is not None)
Expand Down
43 changes: 39 additions & 4 deletions rich/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
TypeVar,
Union,
)
import signal
import atexit
import os

if sys.version_info >= (3, 8):
from typing import Literal
Expand Down Expand Up @@ -391,7 +394,7 @@ def open(
finished_style: StyleType = "bar.finished",
pulse_style: StyleType = "bar.pulse",
disable: bool = False,
) -> ContextManager[TextIO]:
) -> ContextManager[BinaryIO]:
pass


Expand Down Expand Up @@ -1111,6 +1114,21 @@ def __init__(
self.get_time = get_time or self.console.get_time
self.print = self.console.print
self.log = self.console.log
self._original_sigint_handler = None
self._exit_handler_added = False

def _handle_sigint(self, sig, frame):
"""Handle SIGINT (Ctrl+C) to ensure cursor is shown."""
# Restore cursor
if self.console.is_terminal:
self.console.show_cursor(True)
# Re-raise KeyboardInterrupt to allow program to exit
raise KeyboardInterrupt()

def _ensure_cursor_visible_at_exit(self):
"""Ensure cursor is visible when program exits."""
if self.console.is_terminal:
self.console.show_cursor(True)

@classmethod
def get_default_columns(cls) -> Tuple[ProgressColumn, ...]:
Expand Down Expand Up @@ -1170,13 +1188,30 @@ def finished(self) -> bool:
def start(self) -> None:
"""Start the progress display."""
if not self.disable:
# Set up signal handler for Ctrl+C (only on non-Windows platforms)
if not self._exit_handler_added:
atexit.register(self._ensure_cursor_visible_at_exit)
self._exit_handler_added = True
# Only set up SIGINT handler on platforms that support it (not Windows)
if os.name != "nt" and hasattr(signal, "SIGINT"):
self._original_sigint_handler = signal.signal(signal.SIGINT, self._handle_sigint)
self.live.start(refresh=True)

def stop(self) -> None:
"""Stop the progress display."""
self.live.stop()
if not self.console.is_interactive and not self.console.is_jupyter:
self.console.print()
try:
self.live.stop()
if not self.console.is_interactive and not self.console.is_jupyter:
self.console.print()
# Restore original signal handler (only on non-Windows platforms)
if os.name != "nt" and hasattr(signal, "SIGINT") and self._original_sigint_handler is not None:
signal.signal(signal.SIGINT, self._original_sigint_handler)
self._original_sigint_handler = None
except:
# Ensure cursor is visible even if an exception occurs during stop
if self.console.is_terminal:
self.console.show_cursor(True)
raise

def __enter__(self) -> Self:
self.start()
Expand Down