Skip to content

Commit f9b7f43

Browse files
authored
Added example for async execute query (#537)
Added examples and fixed the async execute not working without pyarrow
1 parent a7cbde1 commit f9b7f43

File tree

4 files changed

+58
-22
lines changed

4 files changed

+58
-22
lines changed

examples/query_async_execute.py

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from databricks import sql
2+
import os
3+
import time
4+
5+
with sql.connect(
6+
server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"),
7+
http_path=os.getenv("DATABRICKS_HTTP_PATH"),
8+
access_token=os.getenv("DATABRICKS_TOKEN"),
9+
) as connection:
10+
11+
with connection.cursor() as cursor:
12+
long_running_query = """
13+
SELECT COUNT(*) FROM RANGE(10000 * 16) x
14+
JOIN RANGE(10000) y
15+
ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'
16+
"""
17+
18+
# Non-blocking call
19+
cursor.execute_async(long_running_query)
20+
21+
# Polling every 5 seconds until the query is no longer pending
22+
while cursor.is_query_pending():
23+
print("POLLING")
24+
time.sleep(5)
25+
26+
# Blocking call: fetch results when execution completes
27+
cursor.get_async_execution_result()
28+
29+
result = cursor.fetchall()
30+
31+
for res in result:
32+
print(res)

src/databricks/sql/client.py

+14-7
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,19 @@ def get_query_state(self) -> "TOperationState":
896896
self._check_not_closed()
897897
return self.thrift_backend.get_query_state(self.active_op_handle)
898898

899+
def is_query_pending(self):
900+
"""
901+
Checks whether the async executing query is in pending state or not
902+
903+
:return:
904+
"""
905+
operation_state = self.get_query_state()
906+
907+
return not operation_state or operation_state in [
908+
ttypes.TOperationState.RUNNING_STATE,
909+
ttypes.TOperationState.PENDING_STATE,
910+
]
911+
899912
def get_async_execution_result(self):
900913
"""
901914
@@ -905,13 +918,7 @@ def get_async_execution_result(self):
905918
"""
906919
self._check_not_closed()
907920

908-
def is_executing(operation_state) -> "bool":
909-
return not operation_state or operation_state in [
910-
ttypes.TOperationState.RUNNING_STATE,
911-
ttypes.TOperationState.PENDING_STATE,
912-
]
913-
914-
while is_executing(self.get_query_state()):
921+
while self.is_query_pending():
915922
# Poll after some default time
916923
time.sleep(self.ASYNC_DEFAULT_POLLING_INTERVAL)
917924

src/databricks/sql/thrift_backend.py

+9-6
Original file line numberDiff line numberDiff line change
@@ -797,12 +797,15 @@ def get_execution_result(self, op_handle, cursor):
797797
t_result_set_metadata_resp.schema
798798
)
799799

800-
schema_bytes = (
801-
t_result_set_metadata_resp.arrowSchema
802-
or self._hive_schema_to_arrow_schema(t_result_set_metadata_resp.schema)
803-
.serialize()
804-
.to_pybytes()
805-
)
800+
if pyarrow:
801+
schema_bytes = (
802+
t_result_set_metadata_resp.arrowSchema
803+
or self._hive_schema_to_arrow_schema(t_result_set_metadata_resp.schema)
804+
.serialize()
805+
.to_pybytes()
806+
)
807+
else:
808+
schema_bytes = None
806809

807810
queue = ResultSetQueueFactory.build_queue(
808811
row_set_type=resp.resultSetMetadata.resultFormat,

tests/e2e/test_driver.py

+3-9
Original file line numberDiff line numberDiff line change
@@ -179,20 +179,14 @@ def test_cloud_fetch(self):
179179

180180

181181
class TestPySQLAsyncQueriesSuite(PySQLPytestTestCase):
182-
def isExecuting(self, operation_state):
183-
return not operation_state or operation_state in [
184-
ttypes.TOperationState.RUNNING_STATE,
185-
ttypes.TOperationState.PENDING_STATE,
186-
]
187-
188182
def test_execute_async__long_running(self):
189183

190184
long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'"
191185
with self.cursor() as cursor:
192186
cursor.execute_async(long_running_query)
193187

194188
## Polling after every POLLING_INTERVAL seconds
195-
while self.isExecuting(cursor.get_query_state()):
189+
while cursor.is_query_pending():
196190
time.sleep(self.POLLING_INTERVAL)
197191
log.info("Polling the status in test_execute_async")
198192

@@ -211,7 +205,7 @@ def test_execute_async__small_result(self):
211205
time.sleep(5)
212206

213207
## Polling after every POLLING_INTERVAL seconds
214-
while self.isExecuting(cursor.get_query_state()):
208+
while cursor.is_query_pending():
215209
time.sleep(self.POLLING_INTERVAL)
216210
log.info("Polling the status in test_execute_async")
217211

@@ -241,7 +235,7 @@ def test_execute_async__large_result(self):
241235
time.sleep(5)
242236

243237
## Polling after every POLLING_INTERVAL seconds
244-
while self.isExecuting(cursor.get_query_state()):
238+
while cursor.is_query_pending():
245239
time.sleep(self.POLLING_INTERVAL)
246240
log.info("Polling the status in test_execute_async")
247241

0 commit comments

Comments
 (0)