Skip to content

Commit 23c756f

Browse files
test(telemetry/e2e): make TestTelemetryE2E deterministic by capturing submissions, not callbacks (#812)
test(telemetry/e2e): make TestTelemetryE2E deterministic The previous tests asserted "telemetry round-tripped to the server" by intercepting TelemetryClient._telemetry_request_callback and counting completed futures. That recording lags the actual work — the callback fires asynchronously after the HTTP request completes, and on the *last* connection close TelemetryClientFactory.close() shuts the shared executor down with wait=False (intentional, for connection-close latency in production). Two consequences: 1. A `wait(captured_futures, timeout=10)` call right after `with conn:` can return before any callbacks have fired — so the wait is "waiting on" an empty list, returns immediately, and the assertion `assert len(done) == expected_count` fails non- deterministically with `assert 0 == 2` or `assert 1 == 2`. 2. The shared-executor shutdown(wait=False) can drop in-flight submissions that haven't started running yet, so even if we drained correctly we'd be testing whether the server happened to receive the request in time, not whether the connector correctly dispatched it. Switch interception from `_telemetry_request_callback` to `_send_telemetry`. That captures the connector's *intent to submit* synchronously, which is what we actually want to test — the connector either decided to send a batch or it didn't, regardless of what happens to the future afterward. No sleep needed, no timeout-based wait needed, no race against the executor shutdown. 5 consecutive local runs pass deterministically in ~20s each (down from ~17 min when the flake hit). Co-authored-by: Isaac Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent 00dc084 commit 23c756f

1 file changed

Lines changed: 58 additions & 47 deletions

File tree

tests/e2e/test_telemetry_e2e.py

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
"""
22
E2E test for telemetry - verifies telemetry behavior with different scenarios
33
"""
4-
import time
54
import threading
65
import logging
76
from contextlib import contextmanager
87
from unittest.mock import patch
98
import pytest
10-
from concurrent.futures import wait
119

1210
import databricks.sql as sql
1311
from databricks.sql.telemetry.telemetry_client import (
@@ -91,25 +89,43 @@ def telemetry_setup_teardown(self):
9189

9290
@pytest.fixture
9391
def telemetry_interceptors(self):
94-
"""Setup reusable telemetry interceptors as a fixture"""
92+
"""Setup reusable telemetry interceptors as a fixture.
93+
94+
Captures two signals:
95+
* captured_events — every call to TelemetryClient._export_event
96+
(the connector intended to send this event).
97+
* captured_submissions — every call to TelemetryClient._send_telemetry
98+
(the connector submitted this batch to the executor).
99+
100+
We deliberately do NOT capture inside _telemetry_request_callback. That
101+
callback only fires after the HTTP round-trip completes, and the
102+
connector's TelemetryClientFactory.close() shuts down the shared
103+
executor with wait=False — so on the last connection close, in-flight
104+
futures may never get to run their callbacks before the executor is
105+
gone. That's intentional for connection-close latency in production,
106+
but it means a callback-time assertion is racing the connector's
107+
shutdown path. Capturing at submission time tests what the connector
108+
actually controls.
109+
"""
95110
capture_lock = threading.Lock()
96111
captured_events = []
97-
captured_futures = []
112+
captured_submissions = []
98113

99114
original_export = TelemetryClient._export_event
100-
original_callback = TelemetryClient._telemetry_request_callback
115+
original_send = TelemetryClient._send_telemetry
101116

102117
def export_wrapper(self_client, event):
103118
with capture_lock:
104119
captured_events.append(event)
105120
return original_export(self_client, event)
106121

107-
def callback_wrapper(self_client, future, sent_count):
122+
def send_wrapper(self_client, events):
108123
with capture_lock:
109-
captured_futures.append(future)
110-
original_callback(self_client, future, sent_count)
124+
# Record the batch (list of events) the connector submitted.
125+
captured_submissions.append(list(events))
126+
return original_send(self_client, events)
111127

112-
return captured_events, captured_futures, export_wrapper, callback_wrapper
128+
return captured_events, captured_submissions, export_wrapper, send_wrapper
113129

114130
# ==================== ASSERTION HELPERS ====================
115131

@@ -165,24 +181,33 @@ def assert_error_info(self, event, expected_error_name=None):
165181
if expected_error_name:
166182
assert error_info.error_name == expected_error_name
167183

168-
def verify_events(self, captured_events, captured_futures, expected_count):
169-
"""Common verification for event count and HTTP responses"""
184+
def verify_events(self, captured_events, captured_submissions, expected_count):
185+
"""Common verification for event count and submission count.
186+
187+
Asserts on what the connector did — exported events and submitted
188+
batches — not on what the server returned. Server-side HTTP success
189+
is asserted via end-to-end behavior elsewhere (and would race the
190+
connector's wait=False executor shutdown on connection close, see
191+
the docstring on telemetry_interceptors).
192+
193+
Because these tests use telemetry_batch_size=1, each exported event
194+
triggers its own batch submission, so the submission count should
195+
equal the event count.
196+
"""
170197
if expected_count == 0:
171-
assert len(captured_events) == 0, f"Expected 0 events, got {len(captured_events)}"
172-
assert len(captured_futures) == 0, f"Expected 0 responses, got {len(captured_futures)}"
198+
assert len(captured_events) == 0, \
199+
f"Expected 0 events, got {len(captured_events)}"
200+
assert len(captured_submissions) == 0, \
201+
f"Expected 0 submissions, got {len(captured_submissions)}"
173202
else:
174203
assert len(captured_events) == expected_count, \
175204
f"Expected {expected_count} events, got {len(captured_events)}"
205+
# batch_size=1, so one submission per event.
206+
submitted_event_count = sum(len(batch) for batch in captured_submissions)
207+
assert submitted_event_count == expected_count, \
208+
f"Expected {expected_count} submitted events across batches, " \
209+
f"got {submitted_event_count} (batches: {[len(b) for b in captured_submissions]})"
176210

177-
time.sleep(2)
178-
done, _ = wait(captured_futures, timeout=10)
179-
assert len(done) == expected_count, \
180-
f"Expected {expected_count} responses, got {len(done)}"
181-
182-
for future in done:
183-
response = future.result()
184-
assert 200 <= response.status < 300
185-
186211
# Assert common fields for all events
187212
for event in captured_events:
188213
self.assert_system_config(event)
@@ -199,11 +224,11 @@ def verify_events(self, captured_events, captured_futures, expected_count):
199224
def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry,
200225
force_enable, expected_count, test_id):
201226
"""Test telemetry behavior with different flag combinations"""
202-
captured_events, captured_futures, export_wrapper, callback_wrapper = \
227+
captured_events, captured_submissions, export_wrapper, send_wrapper = \
203228
telemetry_interceptors
204229

205230
with patch.object(TelemetryClient, "_export_event", export_wrapper), \
206-
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
231+
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):
207232

208233
extra_params = {"telemetry_batch_size": 1}
209234
if enable_telemetry is not None:
@@ -216,9 +241,7 @@ def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry,
216241
cursor.execute("SELECT 1")
217242
cursor.fetchone()
218243

219-
# Give time for async telemetry submission after connection closes
220-
time.sleep(0.5)
221-
self.verify_events(captured_events, captured_futures, expected_count)
244+
self.verify_events(captured_events, captured_submissions, expected_count)
222245

223246
# Assert statement execution on latency event (if events exist)
224247
if expected_count > 0:
@@ -230,11 +253,11 @@ def test_telemetry_flags(self, telemetry_interceptors, enable_telemetry,
230253
])
231254
def test_sql_errors(self, telemetry_interceptors, query, expected_error):
232255
"""Test telemetry captures error information for different SQL errors"""
233-
captured_events, captured_futures, export_wrapper, callback_wrapper = \
256+
captured_events, captured_submissions, export_wrapper, send_wrapper = \
234257
telemetry_interceptors
235258

236259
with patch.object(TelemetryClient, "_export_event", export_wrapper), \
237-
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
260+
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):
238261

239262
with self.connection(extra_params={
240263
"force_enable_telemetry": True,
@@ -245,9 +268,6 @@ def test_sql_errors(self, telemetry_interceptors, query, expected_error):
245268
cursor.execute(query)
246269
cursor.fetchone()
247270

248-
time.sleep(2)
249-
wait(captured_futures, timeout=10)
250-
251271
assert len(captured_events) >= 1
252272

253273
# Find event with error_info
@@ -261,11 +281,11 @@ def test_sql_errors(self, telemetry_interceptors, query, expected_error):
261281

262282
def test_metadata_operation(self, telemetry_interceptors):
263283
"""Test telemetry for metadata operations (getCatalogs)"""
264-
captured_events, captured_futures, export_wrapper, callback_wrapper = \
284+
captured_events, captured_submissions, export_wrapper, send_wrapper = \
265285
telemetry_interceptors
266286

267287
with patch.object(TelemetryClient, "_export_event", export_wrapper), \
268-
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
288+
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):
269289

270290
with self.connection(extra_params={
271291
"force_enable_telemetry": True,
@@ -275,21 +295,18 @@ def test_metadata_operation(self, telemetry_interceptors):
275295
catalogs = cursor.catalogs()
276296
catalogs.fetchall()
277297

278-
time.sleep(2)
279-
wait(captured_futures, timeout=10)
280-
281298
assert len(captured_events) >= 1
282299
for event in captured_events:
283300
self.assert_system_config(event)
284301
self.assert_connection_params(event, self.arguments["http_path"])
285302

286303
def test_direct_results(self, telemetry_interceptors):
287304
"""Test telemetry with direct results (use_cloud_fetch=False)"""
288-
captured_events, captured_futures, export_wrapper, callback_wrapper = \
305+
captured_events, captured_submissions, export_wrapper, send_wrapper = \
289306
telemetry_interceptors
290307

291308
with patch.object(TelemetryClient, "_export_event", export_wrapper), \
292-
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
309+
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):
293310

294311
with self.connection(extra_params={
295312
"force_enable_telemetry": True,
@@ -301,9 +318,6 @@ def test_direct_results(self, telemetry_interceptors):
301318
result = cursor.fetchall()
302319
assert len(result) == 1 and result[0][0] == 100
303320

304-
time.sleep(2)
305-
wait(captured_futures, timeout=10)
306-
307321
assert len(captured_events) >= 2
308322
for event in captured_events:
309323
self.assert_system_config(event)
@@ -320,11 +334,11 @@ def test_direct_results(self, telemetry_interceptors):
320334
def test_cloudfetch_with_different_close_patterns(self, telemetry_interceptors,
321335
close_type):
322336
"""Test telemetry with cloud fetch using different resource closing patterns"""
323-
captured_events, captured_futures, export_wrapper, callback_wrapper = \
337+
captured_events, captured_submissions, export_wrapper, send_wrapper = \
324338
telemetry_interceptors
325339

326340
with patch.object(TelemetryClient, "_export_event", export_wrapper), \
327-
patch.object(TelemetryClient, "_telemetry_request_callback", callback_wrapper):
341+
patch.object(TelemetryClient, "_send_telemetry", send_wrapper):
328342

329343
if close_type == "explicit_connection":
330344
# Test explicit connection close
@@ -365,9 +379,6 @@ def test_cloudfetch_with_different_close_patterns(self, telemetry_interceptors,
365379
result = cursor.fetchall()
366380
assert len(result) == 1000
367381

368-
time.sleep(2)
369-
wait(captured_futures, timeout=10)
370-
371382
assert len(captured_events) >= 2
372383
for event in captured_events:
373384
self.assert_system_config(event)

0 commit comments

Comments
 (0)