Skip to content

Commit 0309e7c

Browse files
test(telemetry/e2e): make TestTelemetryE2E deterministic + deflake retry tests under merge-queue load (#812)
* test(telemetry/e2e): make TestTelemetryE2E deterministic The previous tests asserted "telemetry round-tripped to the server" by intercepting TelemetryClient._telemetry_request_callback and counting completed futures. That recording lags the actual work — the callback fires asynchronously after the HTTP request completes, and on the *last* connection close TelemetryClientFactory.close() shuts the shared executor down with wait=False (intentional, for connection-close latency in production). Two consequences: 1. A `wait(captured_futures, timeout=10)` call right after `with conn:` can return before any callbacks have fired — so the wait is "waiting on" an empty list, returns immediately, and the assertion `assert len(done) == expected_count` fails non- deterministically with `assert 0 == 2` or `assert 1 == 2`. 2. The shared-executor shutdown(wait=False) can drop in-flight submissions that haven't started running yet, so even if we drained correctly we'd be testing whether the server happened to receive the request in time, not whether the connector correctly dispatched it. Switch interception from `_telemetry_request_callback` to `_send_telemetry`. That captures the connector's *intent to submit* synchronously, which is what we actually want to test — the connector either decided to send a batch or it didn't, regardless of what happens to the future afterward. No sleep needed, no timeout-based wait needed, no race against the executor shutdown. 5 consecutive local runs pass deterministically in ~20s each (down from ~17 min when the flake hit). Co-authored-by: Isaac Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com> * ci/test: deflake retry tests under merge-queue load Two compounding fixes that surfaced on PR #812's first merge_group run, where test_oserror_retries failed with `assert mock_validate_conn.call_count == 6` — unexpected `/telemetry-ext` requests had been counted alongside the intended session-endpoint retries. 1. tests/e2e/common/retry_test_mixins.py — strengthen `_isolated_from_telemetry()` with two additional defensive patches: - TelemetryClient._send_telemetry → no-op - TelemetryClient._export_event → no-op The existing factory swap installs NoopTelemetryClient for new connections, but doesn't cover real TelemetryClient instances that slip in via other paths (stale module-global, code that bypasses initialize_telemetry_client, anything created before the context entered). Patching at the class level catches all of them. 2. .github/workflows/code-coverage.yml — serialise merge_group runs. Previous concurrency group keyed on github.ref, which is per-PR in the queue (gh-readonly-queue/main/pr-N-…). That allowed multiple queue entries to hammer the same warehouse in parallel, stressing telemetry / retry paths that single-PR runs don't exercise. Group merge_group + workflow_dispatch under a single fixed name (e2e-mq-serial) so they run one at a time. PR-event runs keep per-ref grouping + cancel-in-progress for fast author feedback. Trade-off: queue throughput drops to one ~17-min run at a time. Folded into PR #812 so the telemetry-test rewrite and the retry-test deflake ship as a single unit. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com> --------- Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent 00dc084 commit 0309e7c

3 files changed

Lines changed: 101 additions & 60 deletions

File tree

.github/workflows/code-coverage.yml

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,23 @@ on:
1616
# the merge has already happened and the coverage check has no power
1717
# to block. Hence we deliberately don't subscribe to `push:main`.
1818
#
19-
# Serialise E2E runs per ref so a force-push (or a fast follow-up commit)
20-
# on a PR cancels the previous run instead of racing it against shared
21-
# warehouse state (Delta tables, UC Volume files, etc.).
22-
#
23-
# Merge-queue runs are NOT cancelled — each queue entry needs its own
24-
# clean CI signal so a regression on entry N doesn't get hidden by
25-
# entry N+1 arriving seconds later. (Concurrent queue runs can still
26-
# collide on shared warehouse state, but that's the cost of preserving
27-
# per-entry signal; the uuid-suffix conventions in the e2e tests are
28-
# what keep them isolated.)
19+
# Concurrency groups:
20+
# - pull_request: per-ref + cancel-in-progress. A force-push or fast
21+
# follow-up commit on a PR cancels the previous run instead of
22+
# racing it against shared warehouse state (Delta tables, UC Volume
23+
# files, telemetry endpoints, etc.).
24+
# - merge_group: serialised globally with a fixed group name. The
25+
# warehouse can't tolerate two parallel queue entries hammering
26+
# telemetry / retry paths simultaneously — we have observed flaky
27+
# retry-test failures (extra `/telemetry-ext` retries inflating
28+
# mock.call_count) under that load. Running queue entries one at a
29+
# time costs queue throughput (one entry at a time, ~17 min each)
30+
# but keeps signal trustworthy. cancel-in-progress is off so each
31+
# entry gets a complete run.
32+
# - workflow_dispatch: shares the merge_group group; manual triggers
33+
# are rare enough that serialising them with the queue is fine.
2934
concurrency:
30-
group: e2e-${{ github.workflow }}-${{ github.ref }}
35+
group: ${{ github.event_name == 'pull_request' && format('e2e-pr-{0}', github.ref) || 'e2e-mq-serial' }}
3136
cancel-in-progress: ${{ github.event_name == 'pull_request' }}
3237

3338
jobs:

tests/e2e/common/retry_test_mixins.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818
from databricks.sql.telemetry.telemetry_client import (
1919
NoopTelemetryClient,
20+
TelemetryClient,
2021
TelemetryClientFactory,
2122
_TelemetryClientHolder,
2223
)
@@ -27,8 +28,22 @@ def _isolated_from_telemetry():
2728
# Tests that mock urllib3 globally (via _get_conn / _validate_conn) also
2829
# intercept background telemetry pushes from the shared
2930
# TelemetryClientFactory executor — inflating mock.call_count and breaking
30-
# assertions like `call_count == 6`. Drain the factory and force any new
31-
# connection to use NoopTelemetryClient for the duration of the test.
31+
# assertions like `call_count == 6`. Three layers of defence:
32+
#
33+
# 1. Drain TelemetryClientFactory and override initialize_telemetry_client
34+
# so new connections install NoopTelemetryClient (which submits nothing).
35+
# 2. Patch TelemetryClient._send_telemetry to a no-op as a backstop — covers
36+
# any real TelemetryClient instance that slips in (e.g. a stale module-
37+
# global, a code path that bypasses initialize_telemetry_client, or
38+
# anything created before this context entered).
39+
# 3. Patch TelemetryClient._export_event to a no-op so even if a real
40+
# client receives an event, the event never reaches the queue and the
41+
# flush logic never fires.
42+
#
43+
# Without layer 2/3 we have observed `/telemetry-ext` requests showing up
44+
# in merge_group runs (concurrent CI load on the warehouse stresses paths
45+
# that single-test runs don't hit), inflating retry-test counts and
46+
# producing flaky AssertionErrors.
3247
with TelemetryClientFactory._lock:
3348
saved_clients = TelemetryClientFactory._clients
3449
saved_executor = TelemetryClientFactory._executor
@@ -55,11 +70,21 @@ def _install_noop(*args, host_url=None, **kwargs):
5570
NoopTelemetryClient()
5671
)
5772

73+
def _noop_send(self, events):
74+
return None
75+
76+
def _noop_export(self, event):
77+
return None
78+
5879
try:
5980
with patch.object(
6081
TelemetryClientFactory,
6182
"initialize_telemetry_client",
6283
staticmethod(_install_noop),
84+
), patch.object(
85+
TelemetryClient, "_send_telemetry", _noop_send
86+
), patch.object(
87+
TelemetryClient, "_export_event", _noop_export
6388
):
6489
yield
6590
finally:

tests/e2e/test_telemetry_e2e.py

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
"""
22
E2E test for telemetry - verifies telemetry behavior with different scenarios
33
"""
4-
import time
54
import threading
65
import logging
76
from contextlib import contextmanager
87
from unittest.mock import patch
98
import pytest
10-
from concurrent.futures import wait
119

1210
import databricks.sql as sql
1311
from databricks.sql.telemetry.telemetry_client import (
@@ -91,25 +89,43 @@ def telemetry_setup_teardown(self):
9189

9290
@pytest.fixture
9391
def telemetry_interceptors(self):
94-
"""Setup reusable telemetry interceptors as a fixture"""
92+
"""Setup reusable telemetry interceptors as a fixture.
93+
94+
Captures two signals:
95+
* captured_events — every call to TelemetryClient._export_event
96+
(the connector intended to send this event).
97+
* captured_submissions — every call to TelemetryClient._send_telemetry
98+
(the connector submitted this batch to the executor).
99+
100+
We deliberately do NOT capture inside _telemetry_request_callback. That
101+
callback only fires after the HTTP round-trip completes, and the
102+
connector's TelemetryClientFactory.close() shuts down the shared
103+
executor with wait=False — so on the last connection close, in-flight
104+
futures may never get to run their callbacks before the executor is
105+
gone. That's intentional for connection-close latency in production,
106+
but it means a callback-time assertion is racing the connector's
107+
shutdown path. Capturing at submission time tests what the connector
108+
actually controls.
109+
"""
95110
capture_lock = threading.Lock()
96111
captured_events = []
97-
captured_futures = []
112+
captured_submissions = []
98113

99114
original_export = TelemetryClient._export_event
100-
original_callback = TelemetryClient._telemetry_request_callback
115+
original_send = TelemetryClient._send_telemetry
101116

102117
def export_wrapper(self_client, event):
103118
with capture_lock:
104119
captured_events.append(event)
105120
return original_export(self_client, event)
106121

107-
def callback_wrapper(self_client, future, sent_count):
122+
def send_wrapper(self_client, events):
108123
with capture_lock:
109-
captured_futures.append(future)
110-
original_callback(self_client, future, sent_count)
124+
# Record the batch (list of events) the connector submitted.
125+
captured_submissions.append(list(events))
126+
return original_send(self_client, events)
111127

112-
return captured_events, captured_futures, export_wrapper, callback_wrapper
128+
return captured_events, captured_submissions, export_wrapper, send_wrapper
113129

114130
# ==================== ASSERTION HELPERS ====================
115131

@@ -165,24 +181,33 @@ def assert_error_info(self, event, expected_error_name=None):
165181
if expected_error_name:
166182
assert error_info.error_name == expected_error_name
167183

168-
def verify_events(self, captured_events, captured_futures, expected_count):
169-
"""Common verification for event count and HTTP responses"""
184+
def verify_events(self, captured_events, captured_submissions, expected_count):
185+
"""Common verification for event count and submission count.
186+
187+
Asserts on what the connector did — exported events and submitted
188+
batches — not on what the server returned. Server-side HTTP success
189+
is asserted via end-to-end behavior elsewhere (and would race the
190+
connector's wait=False executor shutdown on connection close, see
191+
the docstring on telemetry_interceptors).
192+
193+
Because these tests use telemetry_batch_size=1, each exported event
194+
triggers its own batch submission, so the submission count should
195+
equal the event count.
196+
"""
170197
if expected_count == 0:
171-
assert len(captured_events) == 0, f"Expected 0 events, got {len(captured_events)}"
172-
assert len(captured_futures) == 0, f"Expected 0 responses, got {len(captured_futures)}"
198+
assert len(captured_events) == 0, \
199+
f"Expected 0 events, got {len(captured_events)}"
200+
assert len(captured_submissions) == 0, \
201+
f"Expected 0 submissions, got {len(captured_submissions)}"
173202
else:
174203
assert len(captured_events) == expected_count, \
175204
f"Expected {expected_count} events, got {len(captured_events)}"
205+
# batch_size=1, so one submission per event.
206+
submitted_event_count = sum(len(batch) for batch in captured_submissions)
207+
assert submitted_event_count == expected_count, \
208+
f"Expected {expected_count} submitted events across batches, " \
209+
f"got {submitted_event_count} (batches: {[len(b) for b in captured_submissions]})"
176210

177-
time.sleep(2)
178-
done, _ = wait(captured_futures, timeout=10)
179-
assert len(done) == expected_count, \
180-
f"Expected {expected_count} responses, got {len(done)}"
181-
182-
for future in done:
183-
response = future.result()
184-
assert 200 <= response.status < 300
185-
186211
# Assert common fields for all events
187212
for event in captured_events:
188213
self.assert_system_config(event)
@@ -199,11 +224,11 @@ def verify_events(self, captured_events, captured_futures, expected_count):
199224
def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry,
200225
force_enable, expected_count, test_id):
201226
"""Test telemetry behavior with different flag combinations"""
202-
captured_events, captured_futures, export_wrapper, callback_wrapper = \
227+
captured_events, captured_submissions, export_wrapper, send_wrapper = \
203228
telemetry_interceptors
204229

205230
with patch.object(TelemetryClient, "_export_event", export_wrapper), \
206-
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
231+
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):
207232

208233
extra_params = {"telemetry_batch_size": 1}
209234
if enable_telemetry is not None:
@@ -216,9 +241,7 @@ def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry,
216241
cursor.execute("SELECT 1")
217242
cursor.fetchone()
218243

219-
# Give time for async telemetry submission after connection closes
220-
time.sleep(0.5)
221-
self.verify_events(captured_events, captured_futures, expected_count)
244+
self.verify_events(captured_events, captured_submissions, expected_count)
222245

223246
# Assert statement execution on latency event (if events exist)
224247
if expected_count > 0:
@@ -230,11 +253,11 @@ def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry,
230253
])
231254
def test_sql_errors(self, telemetry_interceptors, query, expected_error):
232255
"""Test telemetry captures error information for different SQL errors"""
233-
captured_events, captured_futures, export_wrapper, callback_wrapper = \
256+
captured_events, captured_submissions, export_wrapper, send_wrapper = \
234257
telemetry_interceptors
235258

236259
with patch.object(TelemetryClient, "_export_event", export_wrapper), \
237-
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
260+
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):
238261

239262
with self.connection(extra_params={
240263
"force_enable_telemetry": True,
@@ -245,9 +268,6 @@ def test_sql_errors(self, telemetry_interceptors, query, expected_error):
245268
cursor.execute(query)
246269
cursor.fetchone()
247270

248-
time.sleep(2)
249-
wait(captured_futures, timeout=10)
250-
251271
assert len(captured_events) >= 1
252272

253273
# Find event with error_info
@@ -261,11 +281,11 @@ def test_sql_errors(self, telemetry_interceptors, query, expected_error):
261281

262282
def test_metadata_operation(self, telemetry_interceptors):
263283
"""Test telemetry for metadata operations (getCatalogs)"""
264-
captured_events, captured_futures, export_wrapper, callback_wrapper = \
284+
captured_events, captured_submissions, export_wrapper, send_wrapper = \
265285
telemetry_interceptors
266286

267287
with patch.object(TelemetryClient, "_export_event", export_wrapper), \
268-
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
288+
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):
269289

270290
with self.connection(extra_params={
271291
"force_enable_telemetry": True,
@@ -275,21 +295,18 @@ def test_metadata_operation(self, telemetry_interceptors):
275295
catalogs = cursor.catalogs()
276296
catalogs.fetchall()
277297

278-
time.sleep(2)
279-
wait(captured_futures, timeout=10)
280-
281298
assert len(captured_events) >= 1
282299
for event in captured_events:
283300
self.assert_system_config(event)
284301
self.assert_connection_params(event, self.arguments["http_path"])
285302

286303
def test_direct_results(self, telemetry_interceptors):
287304
"""Test telemetry with direct results (use_cloud_fetch=False)"""
288-
captured_events, captured_futures, export_wrapper, callback_wrapper = \
305+
captured_events, captured_submissions, export_wrapper, send_wrapper = \
289306
telemetry_interceptors
290307

291308
with patch.object(TelemetryClient, "_export_event", export_wrapper), \
292-
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
309+
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):
293310

294311
with self.connection(extra_params={
295312
"force_enable_telemetry": True,
@@ -301,9 +318,6 @@ def test_direct_results(self, telemetry_interceptors):
301318
result = cursor.fetchall()
302319
assert len(result) == 1 and result[0][0] == 100
303320

304-
time.sleep(2)
305-
wait(captured_futures, timeout=10)
306-
307321
assert len(captured_events) >= 2
308322
for event in captured_events:
309323
self.assert_system_config(event)
@@ -320,11 +334,11 @@ def test_direct_results(self, telemetry_interceptors):
320334
def test_cloudfetch_with_different_close_patterns(self, telemetry_interceptors,
321335
close_type):
322336
"""Test telemetry with cloud fetch using different resource closing patterns"""
323-
captured_events, captured_futures, export_wrapper, callback_wrapper = \
337+
captured_events, captured_submissions, export_wrapper, send_wrapper = \
324338
telemetry_interceptors
325339

326340
with patch.object(TelemetryClient, "_export_event", export_wrapper), \
327-
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
341+
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):
328342

329343
if close_type == "explicit_connection":
330344
# Test explicit connection close
@@ -365,9 +379,6 @@ def test_cloudfetch_with_different_close_patterns(self, telemetry_interceptors,
365379
result = cursor.fetchall()
366380
assert len(result) == 1000
367381

368-
time.sleep(2)
369-
wait(captured_futures, timeout=10)
370-
371382
assert len(captured_events) >= 2
372383
for event in captured_events:
373384
self.assert_system_config(event)

0 commit comments

Comments
 (0)