Skip to content

Commit 1d9ed38

Browse files
authored
Merge pull request #123 from InfluxCommunity/feat/async-query
feat: asynchronous query
2 parents 95477f8 + 774885e commit 1d9ed38

File tree

9 files changed

+737
-112
lines changed

9 files changed

+737
-112
lines changed

CHANGELOG.md

+9
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,15 @@
22

33
## 0.12.0 [unreleased]
44

5+
### Features
6+
7+
1. [#123](https://github.com/InfluxCommunity/influxdb3-python/pull/123): Introduces `query_async()` method. From this release the client now has a `query_async()` method that takes advantage of asyncio's event loop to run query calls in their own executor.
8+
9+
For example:
10+
```python
11+
table = await client.query_async(query)
12+
```
13+
514
### Bug Fixes
615

716
1. [#121](https://github.com/InfluxCommunity/influxdb3-python/pull/121): Fix use of arguments `verify_ssl` and `ssl_ca_cert` in `QueryApi`.

Examples/query_async.py

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import asyncio
2+
import random
3+
import time
4+
5+
import pandas
6+
7+
from influxdb_client_3 import InfluxDBClient3
8+
9+
from config import Config
10+
11+
12+
async def fibio(iterations, grit=0.5):
13+
"""
14+
example coroutine to run parallel with query_async
15+
:param iterations:
16+
:param grit:
17+
:return:
18+
"""
19+
n0 = 1
20+
n1 = 1
21+
vals = [n0, n1]
22+
for _ in range(iterations):
23+
val = n0 + n1
24+
n0 = n1
25+
n1 = val
26+
print(val)
27+
vals.append(val)
28+
await asyncio.sleep(grit)
29+
return vals
30+
31+
32+
def write_data(client: InfluxDBClient3, measurement):
33+
"""
34+
Synchronous write - only for preparing data
35+
:param client:
36+
:param measurement:
37+
:return:
38+
"""
39+
ids = ['s3b1', 'dq41', 'sgw22']
40+
lp_template = f"{measurement},id=%s speed=%f,alt=%f,bearing=%f %d"
41+
data_size = 10
42+
data = []
43+
interval = 10 * 1_000_000_000
44+
ts = time.time_ns() - (interval * data_size)
45+
for _ in range(data_size):
46+
data.append(lp_template % (ids[random.randint(0, len(ids) - 1)],
47+
random.random() * 300,
48+
random.random() * 2000,
49+
random.random() * 360, ts))
50+
ts += interval
51+
52+
client.write(data)
53+
54+
55+
async def query_data(client: InfluxDBClient3, measurement):
56+
"""
57+
Query asynchronously - should not block other coroutines
58+
:param client:
59+
:param measurement:
60+
:return:
61+
"""
62+
query = f"SELECT * FROM \"{measurement}\" WHERE time >= now() - interval '5 minutes' ORDER BY time DESC"
63+
print(f"query start: {pandas.Timestamp(time.time_ns())}")
64+
table = await client.query_async(query)
65+
print(f"query returned: {pandas.Timestamp(time.time_ns())}")
66+
return table.to_pandas()
67+
68+
69+
async def main():
70+
config = Config()
71+
client = InfluxDBClient3(
72+
host=config.host,
73+
token=config.token,
74+
database=config.database,
75+
org=config.org
76+
)
77+
measurement = 'example_uav'
78+
write_data(client, measurement)
79+
80+
# run both coroutines simultaneously
81+
result = await asyncio.gather(fibio(10, 0.2), query_data(client, measurement))
82+
print(f"fibio sequence = {result[0]}")
83+
print(f"data set =\n{result[1]}")
84+
85+
86+
if __name__ == "__main__":
87+
asyncio.run(main())

influxdb_client_3/__init__.py

+33
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,39 @@ def query(self, query: str, language: str = "sql", mode: str = "all", database:
278278
except InfluxDBError as e:
279279
raise e
280280

281+
async def query_async(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs):
282+
"""Query data from InfluxDB asynchronously.
283+
284+
If you want to use query parameters, you can pass them as kwargs:
285+
286+
>>> await client.query_async("select * from cpu where host=$host", query_parameters={"host": "server01"})
287+
288+
:param query: The query to execute on the database.
289+
:param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql".
290+
:param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk",
291+
"reader" or "schema". Defaults to "all".
292+
:param database: The database to query from. If not provided, uses the database provided during initialization.
293+
:param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. For example, it can be used to
294+
set up per request headers.
295+
:keyword query_parameters: The query parameters to use in the query.
296+
It should be a ``dictionary`` of key-value pairs.
297+
:return: The query result in the specified mode.
298+
"""
299+
if mode == "polars" and polars is False:
300+
raise ImportError("Polars is not installed. Please install it with `pip install polars`.")
301+
302+
if database is None:
303+
database = self._database
304+
305+
try:
306+
return await self._query_api.query_async(query=query,
307+
language=language,
308+
mode=mode,
309+
database=database,
310+
**kwargs)
311+
except InfluxDBError as e:
312+
raise e
313+
281314
def close(self):
282315
"""Close the client and clean up resources."""
283316
self._write_api.close()

influxdb_client_3/query/query_api.py

+84-34
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
"""Query data in InfluxDB 3."""
2-
2+
import asyncio
33
# coding: utf-8
44
import json
55

66
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions, FlightStreamReader
7+
78
from influxdb_client_3.version import USER_AGENT
89

910

@@ -121,19 +122,28 @@ def __init__(self,
121122
"""
122123
self._token = token
123124
self._flight_client_options = flight_client_options or {}
125+
default_user_agent = ("grpc.secondary_user_agent", USER_AGENT)
126+
if "generic_options" in self._flight_client_options:
127+
if "grpc.secondary_user_agent" not in dict(self._flight_client_options["generic_options"]).keys():
128+
self._flight_client_options["generic_options"].append(default_user_agent)
129+
else:
130+
self._flight_client_options["generic_options"] = [default_user_agent]
124131
self._proxy = proxy
132+
from influxdb_client_3 import _merge_options as merge_options
125133
if options:
126134
if options.flight_client_options:
127-
self._flight_client_options = options.flight_client_options
135+
self._flight_client_options = merge_options(self._flight_client_options,
136+
None,
137+
options.flight_client_options)
138+
if ('generic_options' in options.flight_client_options and
139+
'grpc.secondary_user_agent' in dict(options.flight_client_options["generic_options"]).keys()):
140+
self._flight_client_options['generic_options'].remove(default_user_agent)
128141
if options.tls_root_certs:
129142
self._flight_client_options["tls_root_certs"] = options.tls_root_certs
130143
if options.proxy:
131144
self._proxy = options.proxy
132145
if options.tls_verify is not None:
133146
self._flight_client_options["disable_server_verification"] = not options.tls_verify
134-
self._flight_client_options["generic_options"] = [
135-
("grpc.secondary_user_agent", USER_AGENT)
136-
]
137147
if self._proxy:
138148
self._flight_client_options["generic_options"].append(("grpc.http_proxy", self._proxy))
139149
self._flight_client = FlightClient(connection_string, **self._flight_client_options)
@@ -152,48 +162,88 @@ def query(self, query: str, language: str, mode: str, database: str, **kwargs):
152162
It should be a ``dictionary`` of key-value pairs.
153163
:return: The query result in the specified mode.
154164
"""
155-
from influxdb_client_3 import polars as has_polars, _merge_options as merge_options
156165
try:
157-
# Create an authorization header
158-
optargs = {
159-
"headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))],
160-
"timeout": 300
161-
}
162-
opts = merge_options(optargs, exclude_keys=['query_parameters'], custom=kwargs)
163-
_options = FlightCallOptions(**opts)
164-
165-
#
166-
# Ticket data
167-
#
168-
ticket_data = {
169-
"database": database,
170-
"sql_query": query,
171-
"query_type": language
172-
}
173-
# add query parameters
174-
query_parameters = kwargs.get("query_parameters", None)
175-
if query_parameters:
176-
ticket_data["params"] = query_parameters
166+
ticket, _options = self._prepare_query(query, language, database, **kwargs)
177167

178-
ticket = Ticket(json.dumps(ticket_data).encode('utf-8'))
179168
flight_reader = self._do_get(ticket, _options)
180169

170+
return self._translate_stream_reader(flight_reader, mode)
171+
except Exception as e:
172+
raise e
173+
174+
async def query_async(self, query: str, language: str, mode: str, database: str, **kwargs):
175+
"""Query data from InfluxDB asynchronously.
176+
177+
Wraps internal FlightClient.doGet call in its own executor, so that the event_loop will not be blocked.
178+
179+
:param query: The query to execute on the database.
180+
:param language: The query language.
181+
:param mode: The mode to use for the query.
182+
It should be one of "all", "pandas", "polars", "chunk", "reader" or "schema".
183+
:param database: The database to query from.
184+
:param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``.
185+
For example, it can be used to set up per request headers.
186+
:keyword query_parameters: The query parameters to use in the query.
187+
It should be a ``dictionary`` of key-value pairs.
188+
:return: The query result in the specified mode.
189+
"""
190+
try:
191+
ticket, options = self._prepare_query(query, language, database, **kwargs)
192+
loop = asyncio.get_running_loop()
193+
_flight_reader = await loop.run_in_executor(None,
194+
self._flight_client.do_get, ticket, options)
195+
return await loop.run_in_executor(None, self._translate_stream_reader,
196+
_flight_reader,
197+
mode)
198+
except Exception as e:
199+
raise e
200+
201+
def _translate_stream_reader(self, reader: FlightStreamReader, mode: str):
202+
from influxdb_client_3 import polars as has_polars
203+
try:
181204
mode_funcs = {
182-
"all": flight_reader.read_all,
183-
"pandas": flight_reader.read_pandas,
184-
"chunk": lambda: flight_reader,
185-
"reader": flight_reader.to_reader,
186-
"schema": lambda: flight_reader.schema
205+
"all": reader.read_all,
206+
"pandas": reader.read_pandas,
207+
"chunk": lambda: reader,
208+
"reader": reader.to_reader,
209+
"schema": lambda: reader.schema
187210
}
188211
if has_polars:
189212
import polars as pl
190-
mode_funcs["polars"] = lambda: pl.from_arrow(flight_reader.read_all())
191-
mode_func = mode_funcs.get(mode, flight_reader.read_all)
213+
mode_funcs["polars"] = lambda: pl.from_arrow(reader.read_all())
214+
mode_func = mode_funcs.get(mode, reader.read_all)
192215

193216
return mode_func() if callable(mode_func) else mode_func
194217
except Exception as e:
195218
raise e
196219

220+
def _prepare_query(self, query: str, language: str, database: str, **kwargs):
221+
from influxdb_client_3 import _merge_options as merge_options
222+
# Create an authorization header
223+
optargs = {
224+
"headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))],
225+
"timeout": 300
226+
}
227+
opts = merge_options(optargs, exclude_keys=['query_parameters'], custom=kwargs)
228+
_options = FlightCallOptions(**opts)
229+
230+
#
231+
# Ticket data
232+
#
233+
ticket_data = {
234+
"database": database,
235+
"sql_query": query,
236+
"query_type": language
237+
}
238+
# add query parameters
239+
query_parameters = kwargs.get("query_parameters", None)
240+
if query_parameters:
241+
ticket_data["params"] = query_parameters
242+
243+
ticket = Ticket(json.dumps(ticket_data).encode('utf-8'))
244+
245+
return ticket, _options
246+
197247
def _do_get(self, ticket: Ticket, options: FlightCallOptions = None) -> FlightStreamReader:
198248
return self._flight_client.do_get(ticket, options)
199249

tests/test_influxdb_client_3.py

+26
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from unittest.mock import patch
33

44
from influxdb_client_3 import InfluxDBClient3
5+
from tests.util import asyncio_run
6+
from tests.util.mocks import ConstantFlightServer, ConstantData
57

68

79
class TestInfluxDBClient3(unittest.TestCase):
@@ -48,6 +50,30 @@ def test_token_auth_scheme_explicit(self):
4850
)
4951
self.assertEqual(client._client.auth_header_value, "my_scheme my_token")
5052

53+
@asyncio_run
54+
async def test_query_async(self):
55+
with ConstantFlightServer() as server:
56+
client = InfluxDBClient3(
57+
host=f"http://localhost:{server.port}",
58+
org="my_org",
59+
database="my_db",
60+
token="my_token",
61+
)
62+
63+
query = "SELECT * FROM my_data"
64+
65+
table = await client.query_async(query=query, language="sql")
66+
67+
result_list = table.to_pylist()
68+
69+
cd = ConstantData()
70+
for item in cd.to_list():
71+
assert item in result_list
72+
73+
assert {'data': 'database', 'reference': 'my_db', 'value': -1.0} in result_list
74+
assert {'data': 'sql_query', 'reference': query, 'value': -1.0} in result_list
75+
assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list
76+
5177

5278
if __name__ == '__main__':
5379
unittest.main()

tests/test_influxdb_client_3_integration.py

+25
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from pyarrow._flight import FlightError
1111

1212
from influxdb_client_3 import InfluxDBClient3, InfluxDBError, write_client_options, WriteOptions
13+
from tests.util import asyncio_run, lp_to_py_object
1314

1415

1516
def random_hex(len=6):
@@ -249,3 +250,27 @@ def test_verify_ssl_false(self):
249250
assert len(list_results) > 0
250251
finally:
251252
self.remove_test_cert(cert_file)
253+
254+
@asyncio_run
255+
async def test_verify_query_async(self):
256+
measurement = f'test{random_hex(6)}'
257+
data = []
258+
lp_template = "%s,location=%s val=%f,ival=%di,index=%di %d"
259+
data_size = 10
260+
interval = 1_000_000_000 * 10
261+
ts = time.time_ns() - interval * data_size
262+
locations = ['springfield', 'gotham', 'balbec', 'yonville']
263+
for i in range(data_size):
264+
data.append(lp_template % (measurement, locations[random.randint(0, len(locations) - 1)],
265+
random.random() * 10,
266+
random.randint(0, 6), i, ts))
267+
ts = ts + interval
268+
269+
self.client.write(data)
270+
query = f"SELECT * FROM \"{measurement}\" ORDER BY time DESC"
271+
272+
result = await self.client.query_async(query)
273+
274+
result_list = result.to_pylist()
275+
for item in data:
276+
assert lp_to_py_object(item) in result_list, f"original lp data \"{item}\" should be in result list"

0 commit comments

Comments
 (0)