Skip to content

Commit e1e08b0

Browse files
committed
improved circuit breaker for handling only 429/503
Signed-off-by: Nikhil Suri <[email protected]>
1 parent dab4b38 commit e1e08b0

File tree

8 files changed

+392
-284
lines changed

8 files changed

+392
-284
lines changed

src/databricks/sql/common/unified_http_client.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,25 @@ def request_context(
264264
yield response
265265
except MaxRetryError as e:
266266
logger.error("HTTP request failed after retries: %s", e)
267-
raise RequestError(f"HTTP request failed: {e}")
267+
268+
# Try to extract HTTP status code from the MaxRetryError
269+
http_code = None
270+
if hasattr(e, 'reason') and hasattr(e.reason, 'response'):
271+
# The reason may contain a response object with status
272+
http_code = getattr(e.reason.response, 'status', None)
273+
elif hasattr(e, 'response') and hasattr(e.response, 'status'):
274+
# Or the error itself may have a response
275+
http_code = e.response.status
276+
277+
context = {}
278+
if http_code is not None:
279+
context["http-code"] = http_code
280+
logger.error("HTTP request failed with status code: %d", http_code)
281+
282+
raise RequestError(
283+
f"HTTP request failed: {e}",
284+
context=context
285+
)
268286
except Exception as e:
269287
logger.error("HTTP request error: %s", e)
270288
raise RequestError(f"HTTP request error: {e}")

src/databricks/sql/exc.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,9 @@ class SessionAlreadyClosedError(RequestError):
126126

127127
class CursorAlreadyClosedError(RequestError):
128128
"""Thrown if CancelOperation receives a code 404. ThriftBackend should gracefully proceed as this is expected."""
129+
130+
131+
class TelemetryRateLimitError(Exception):
132+
"""Raised when telemetry endpoint returns 429 or 503, indicating rate limiting or service unavailable.
133+
This exception is used exclusively by the circuit breaker to track telemetry rate limiting events."""
134+
pass

src/databricks/sql/telemetry/circuit_breaker_manager.py

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,19 @@
1414
import pybreaker
1515
from pybreaker import CircuitBreaker, CircuitBreakerError, CircuitBreakerListener
1616

17+
from databricks.sql.exc import TelemetryRateLimitError
18+
1719
logger = logging.getLogger(__name__)
1820

1921
# Circuit Breaker Configuration Constants
20-
MINIMUM_CALLS = 20
21-
RESET_TIMEOUT = 30
22-
CIRCUIT_BREAKER_NAME = "telemetry-circuit-breaker"
22+
DEFAULT_MINIMUM_CALLS = 20
23+
DEFAULT_RESET_TIMEOUT = 30
24+
DEFAULT_NAME = "telemetry-circuit-breaker"
2325

24-
# Circuit Breaker State Constants
26+
# Circuit Breaker State Constants (used in logging)
2527
CIRCUIT_BREAKER_STATE_OPEN = "open"
2628
CIRCUIT_BREAKER_STATE_CLOSED = "closed"
2729
CIRCUIT_BREAKER_STATE_HALF_OPEN = "half-open"
28-
CIRCUIT_BREAKER_STATE_DISABLED = "disabled"
2930

3031
# Logging Message Constants
3132
LOG_CIRCUIT_BREAKER_STATE_CHANGED = "Circuit breaker state changed from %s to %s for %s"
@@ -72,18 +73,47 @@ def state_change(self, cb: CircuitBreaker, old_state, new_state) -> None:
7273
logger.info(LOG_CIRCUIT_BREAKER_HALF_OPEN, cb.name)
7374

7475

76+
@dataclass(frozen=True)
77+
class CircuitBreakerConfig:
78+
"""Configuration for circuit breaker behavior.
79+
80+
This class is immutable to prevent modification of circuit breaker settings.
81+
All configuration values are set to constants defined at the module level.
82+
"""
83+
84+
# Minimum number of calls before circuit can open
85+
minimum_calls: int = DEFAULT_MINIMUM_CALLS
86+
87+
# Time to wait before trying to close circuit (in seconds)
88+
reset_timeout: int = DEFAULT_RESET_TIMEOUT
89+
90+
# Name for the circuit breaker (for logging)
91+
name: str = DEFAULT_NAME
92+
93+
7594
class CircuitBreakerManager:
7695
"""
7796
Manages circuit breaker instances for telemetry requests.
7897
7998
This class provides a singleton pattern to manage circuit breaker instances
8099
per host, ensuring that telemetry failures don't impact main SQL operations.
81-
82-
Circuit breaker configuration is fixed and cannot be overridden.
83100
"""
84101

85102
_instances: Dict[str, CircuitBreaker] = {}
86103
_lock = threading.RLock()
104+
_config: Optional[CircuitBreakerConfig] = None
105+
106+
@classmethod
107+
def initialize(cls, config: CircuitBreakerConfig) -> None:
108+
"""
109+
Initialize the circuit breaker manager with configuration.
110+
111+
Args:
112+
config: Circuit breaker configuration
113+
"""
114+
with cls._lock:
115+
cls._config = config
116+
logger.debug("CircuitBreakerManager initialized with config: %s", config)
87117

88118
@classmethod
89119
def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
@@ -96,6 +126,10 @@ def get_circuit_breaker(cls, host: str) -> CircuitBreaker:
96126
Returns:
97127
CircuitBreaker instance for the host
98128
"""
129+
if not cls._config:
130+
# Return a no-op circuit breaker if not initialized
131+
return cls._create_noop_circuit_breaker()
132+
99133
with cls._lock:
100134
if host not in cls._instances:
101135
cls._instances[host] = cls._create_circuit_breaker(host)
@@ -114,16 +148,39 @@ def _create_circuit_breaker(cls, host: str) -> CircuitBreaker:
114148
Returns:
115149
New CircuitBreaker instance
116150
"""
117-
# Create circuit breaker with fixed configuration
151+
config = cls._config
152+
if config is None:
153+
raise RuntimeError("CircuitBreakerManager not initialized")
154+
155+
# Create circuit breaker with configuration
118156
breaker = CircuitBreaker(
119-
fail_max=MINIMUM_CALLS,
120-
reset_timeout=RESET_TIMEOUT,
121-
name=f"{CIRCUIT_BREAKER_NAME}-{host}",
157+
fail_max=config.minimum_calls, # Number of failures before circuit opens
158+
reset_timeout=config.reset_timeout,
159+
name=f"{config.name}-{host}",
122160
)
161+
162+
# Add state change listeners for logging
123163
breaker.add_listener(CircuitBreakerStateListener())
124164

125165
return breaker
126166

167+
@classmethod
168+
def _create_noop_circuit_breaker(cls) -> CircuitBreaker:
169+
"""
170+
Create a no-op circuit breaker that always allows calls.
171+
172+
Returns:
173+
CircuitBreaker that never opens
174+
"""
175+
# Create a circuit breaker with very high thresholds so it never opens
176+
breaker = CircuitBreaker(
177+
fail_max=1000000, # Very high threshold
178+
reset_timeout=1, # Short reset time
179+
name="noop-circuit-breaker",
180+
)
181+
return breaker
182+
183+
127184

128185
def is_circuit_breaker_error(exception: Exception) -> bool:
129186
"""

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,6 @@
4646
TelemetryPushClient,
4747
CircuitBreakerTelemetryPushClient,
4848
)
49-
from databricks.sql.telemetry.circuit_breaker_manager import (
50-
is_circuit_breaker_error,
51-
)
5249

5350
if TYPE_CHECKING:
5451
from databricks.sql.client import Connection
@@ -275,21 +272,23 @@ def _send_telemetry(self, events):
275272
logger.debug("Failed to submit telemetry request: %s", e)
276273

277274
def _send_with_unified_client(self, url, data, headers, timeout=900):
278-
"""Helper method to send telemetry using the telemetry push client."""
275+
"""
276+
Helper method to send telemetry using the telemetry push client.
277+
278+
The push client implementation handles circuit breaker logic internally,
279+
so this method just forwards the request and handles any errors generically.
280+
"""
279281
try:
280282
response = self._telemetry_push_client.request(
281283
HttpMethod.POST, url, body=data, headers=headers, timeout=timeout
282284
)
283285
return response
284286
except Exception as e:
285-
if is_circuit_breaker_error(e):
286-
logger.warning(
287-
"Telemetry request blocked by circuit breaker for connection %s: %s",
288-
self._session_id_hex,
289-
e,
290-
)
291-
else:
292-
logger.error("Failed to send telemetry: %s", e)
287+
logger.debug(
288+
"Failed to send telemetry for connection %s: %s",
289+
self._session_id_hex,
290+
e,
291+
)
293292
raise
294293

295294
def _telemetry_request_callback(self, future, sent_count: int):

0 commit comments

Comments
 (0)