Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: asynchronous query #123

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## 0.12.0 [unreleased]

### Features

1. [123](https://github.com/InfluxCommunity/influxdb3-python/pull/123): Introduces `query_async` method.

### Bug Fixes

1. [#121](https://github.com/InfluxCommunity/influxdb3-python/pull/121): Fix use of arguments `verify_ssl` and `ssl_ca_cert` in `QueryApi`.
Expand Down
87 changes: 87 additions & 0 deletions Examples/query_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import asyncio
import random
import time

import pandas

from influxdb_client_3 import InfluxDBClient3

from config import Config


async def fibio(iterations, grit=0.5):
"""
example coroutine to run parallel with query_async
:param iterations:
:param grit:
:return:
"""
n0 = 1
n1 = 1
vals = [n0, n1]
for _ in range(iterations):
val = n0 + n1
n0 = n1
n1 = val
print(val)
vals.append(val)
await asyncio.sleep(grit)
return vals


def write_data(client: InfluxDBClient3, measurement):
"""
Synchronous write - only for preparing data
:param client:
:param measurement:
:return:
"""
ids = ['s3b1', 'dq41', 'sgw22']
lp_template = f"{measurement},id=%s speed=%f,alt=%f,bearing=%f %d"
data_size = 10
data = []
interval = 10 * 1_000_000_000
ts = time.time_ns() - (interval * data_size)
for _ in range(data_size):
data.append(lp_template % (ids[random.randint(0, len(ids) - 1)],
random.random() * 300,
random.random() * 2000,
random.random() * 360, ts))
ts += interval

client.write(data)


async def query_data(client: InfluxDBClient3, measurement):
"""
Query asynchronously - should not block other coroutines
:param client:
:param measurement:
:return:
"""
query = f"SELECT * FROM \"{measurement}\" WHERE time >= now() - interval '5 minutes' ORDER BY time DESC"
print(f"query start: {pandas.Timestamp(time.time_ns())}")
table = await client.query_async(query)
print(f"query returned: {pandas.Timestamp(time.time_ns())}")
return table.to_pandas()


async def main():
config = Config()
client = InfluxDBClient3(
host=config.host,
token=config.token,
database=config.database,
org=config.org
)
measurement = 'example_uav'
write_data(client, measurement)

# run both coroutines simultaneously
result = await asyncio.gather(fibio(10, 0.2), query_data(client, measurement))
print(f"fibio sequence = {result[0]}")
print(f"data set =\n{result[1]}")


if __name__ == "__main__":
asyncio.run(main())
33 changes: 33 additions & 0 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,39 @@
except InfluxDBError as e:
raise e

async def query_async(self, query: str, language: str = "sql", mode: str = "all", database: str = None, **kwargs):
"""Query data from InfluxDB asynchronously.

If you want to use query parameters, you can pass them as kwargs:

>>> await client.query_async("select * from cpu where host=$host", query_parameters={"host": "server01"})

:param query: The query to execute on the database.
:param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql".
:param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk",
"reader" or "schema". Defaults to "all".
:param database: The database to query from. If not provided, uses the database provided during initialization.
:param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. For example, it can be used to
set up per request headers.
:keyword query_parameters: The query parameters to use in the query.
It should be a ``dictionary`` of key-value pairs.
:return: The query result in the specified mode.
"""
if mode == "polars" and polars is False:
raise ImportError("Polars is not installed. Please install it with `pip install polars`.")

Check warning on line 300 in influxdb_client_3/__init__.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/__init__.py#L300

Added line #L300 was not covered by tests

if database is None:
database = self._database

try:
return await self._query_api.query_async(query=query,
language=language,
mode=mode,
database=database,
**kwargs)
except InfluxDBError as e:
raise e

Check warning on line 312 in influxdb_client_3/__init__.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/__init__.py#L311-L312

Added lines #L311 - L312 were not covered by tests

def close(self):
"""Close the client and clean up resources."""
self._write_api.close()
Expand Down
118 changes: 84 additions & 34 deletions influxdb_client_3/query/query_api.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Query data in InfluxDB 3."""

import asyncio
# coding: utf-8
import json

from pyarrow.flight import FlightClient, Ticket, FlightCallOptions, FlightStreamReader

from influxdb_client_3.version import USER_AGENT


Expand Down Expand Up @@ -121,19 +122,28 @@
"""
self._token = token
self._flight_client_options = flight_client_options or {}
default_user_agent = ("grpc.secondary_user_agent", USER_AGENT)
if "generic_options" in self._flight_client_options:
if "grpc.secondary_user_agent" not in dict(self._flight_client_options["generic_options"]).keys():
self._flight_client_options["generic_options"].append(default_user_agent)
else:
self._flight_client_options["generic_options"] = [default_user_agent]
self._proxy = proxy
from influxdb_client_3 import _merge_options as merge_options
if options:
if options.flight_client_options:
self._flight_client_options = options.flight_client_options
self._flight_client_options = merge_options(self._flight_client_options,
None,
options.flight_client_options)
if ('generic_options' in options.flight_client_options and
'grpc.secondary_user_agent' in dict(options.flight_client_options["generic_options"]).keys()):
self._flight_client_options['generic_options'].remove(default_user_agent)
if options.tls_root_certs:
self._flight_client_options["tls_root_certs"] = options.tls_root_certs
if options.proxy:
self._proxy = options.proxy
if options.tls_verify is not None:
self._flight_client_options["disable_server_verification"] = not options.tls_verify
self._flight_client_options["generic_options"] = [
("grpc.secondary_user_agent", USER_AGENT)
]
if self._proxy:
self._flight_client_options["generic_options"].append(("grpc.http_proxy", self._proxy))
self._flight_client = FlightClient(connection_string, **self._flight_client_options)
Expand All @@ -152,48 +162,88 @@
It should be a ``dictionary`` of key-value pairs.
:return: The query result in the specified mode.
"""
from influxdb_client_3 import polars as has_polars, _merge_options as merge_options
try:
# Create an authorization header
optargs = {
"headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))],
"timeout": 300
}
opts = merge_options(optargs, exclude_keys=['query_parameters'], custom=kwargs)
_options = FlightCallOptions(**opts)

#
# Ticket data
#
ticket_data = {
"database": database,
"sql_query": query,
"query_type": language
}
# add query parameters
query_parameters = kwargs.get("query_parameters", None)
if query_parameters:
ticket_data["params"] = query_parameters
ticket, _options = self._prepare_query(query, language, database, **kwargs)

ticket = Ticket(json.dumps(ticket_data).encode('utf-8'))
flight_reader = self._do_get(ticket, _options)

return self._translate_stream_reader(flight_reader, mode)
except Exception as e:
raise e

async def query_async(self, query: str, language: str, mode: str, database: str, **kwargs):
"""Query data from InfluxDB asynchronously.

Wraps internal FlightClient.doGet call in its own executor, so that the event_loop will not be blocked.

:param query: The query to execute on the database.
:param language: The query language.
:param mode: The mode to use for the query.
It should be one of "all", "pandas", "polars", "chunk", "reader" or "schema".
:param database: The database to query from.
:param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``.
For example, it can be used to set up per request headers.
:keyword query_parameters: The query parameters to use in the query.
It should be a ``dictionary`` of key-value pairs.
:return: The query result in the specified mode.
"""
try:
ticket, options = self._prepare_query(query, language, database, **kwargs)
loop = asyncio.get_running_loop()
_flight_reader = await loop.run_in_executor(None,
self._flight_client.do_get, ticket, options)
return await loop.run_in_executor(None, self._translate_stream_reader,
_flight_reader,
mode)
except Exception as e:
raise e

Check warning on line 199 in influxdb_client_3/query/query_api.py

View check run for this annotation

Codecov / codecov/patch

influxdb_client_3/query/query_api.py#L198-L199

Added lines #L198 - L199 were not covered by tests

def _translate_stream_reader(self, reader: FlightStreamReader, mode: str):
from influxdb_client_3 import polars as has_polars
try:
mode_funcs = {
"all": flight_reader.read_all,
"pandas": flight_reader.read_pandas,
"chunk": lambda: flight_reader,
"reader": flight_reader.to_reader,
"schema": lambda: flight_reader.schema
"all": reader.read_all,
"pandas": reader.read_pandas,
"chunk": lambda: reader,
"reader": reader.to_reader,
"schema": lambda: reader.schema
}
if has_polars:
import polars as pl
mode_funcs["polars"] = lambda: pl.from_arrow(flight_reader.read_all())
mode_func = mode_funcs.get(mode, flight_reader.read_all)
mode_funcs["polars"] = lambda: pl.from_arrow(reader.read_all())
mode_func = mode_funcs.get(mode, reader.read_all)

return mode_func() if callable(mode_func) else mode_func
except Exception as e:
raise e

def _prepare_query(self, query: str, language: str, database: str, **kwargs):
from influxdb_client_3 import _merge_options as merge_options
# Create an authorization header
optargs = {
"headers": [(b"authorization", f"Bearer {self._token}".encode('utf-8'))],
"timeout": 300
}
opts = merge_options(optargs, exclude_keys=['query_parameters'], custom=kwargs)
_options = FlightCallOptions(**opts)

#
# Ticket data
#
ticket_data = {
"database": database,
"sql_query": query,
"query_type": language
}
# add query parameters
query_parameters = kwargs.get("query_parameters", None)
if query_parameters:
ticket_data["params"] = query_parameters

ticket = Ticket(json.dumps(ticket_data).encode('utf-8'))

return ticket, _options

def _do_get(self, ticket: Ticket, options: FlightCallOptions = None) -> FlightStreamReader:
return self._flight_client.do_get(ticket, options)

Expand Down
26 changes: 26 additions & 0 deletions tests/test_influxdb_client_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from unittest.mock import patch

from influxdb_client_3 import InfluxDBClient3
from tests.util import asyncio_run
from tests.util.mocks import ConstantFlightServer, ConstantData


class TestInfluxDBClient3(unittest.TestCase):
Expand Down Expand Up @@ -48,6 +50,30 @@ def test_token_auth_scheme_explicit(self):
)
self.assertEqual(client._client.auth_header_value, "my_scheme my_token")

@asyncio_run
async def test_query_async(self):
with ConstantFlightServer() as server:
client = InfluxDBClient3(
host=f"http://localhost:{server.port}",
org="my_org",
database="my_db",
token="my_token",
)

query = "SELECT * FROM my_data"

table = await client.query_async(query=query, language="sql")

result_list = table.to_pylist()

cd = ConstantData()
for item in cd.to_list():
assert item in result_list

assert {'data': 'database', 'reference': 'my_db', 'value': -1.0} in result_list
assert {'data': 'sql_query', 'reference': query, 'value': -1.0} in result_list
assert {'data': 'query_type', 'reference': 'sql', 'value': -1.0} in result_list


if __name__ == '__main__':
unittest.main()
25 changes: 25 additions & 0 deletions tests/test_influxdb_client_3_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pyarrow._flight import FlightError

from influxdb_client_3 import InfluxDBClient3, InfluxDBError, write_client_options, WriteOptions
from tests.util import asyncio_run, lp_to_py_object


def random_hex(len=6):
Expand Down Expand Up @@ -249,3 +250,27 @@ def test_verify_ssl_false(self):
assert len(list_results) > 0
finally:
self.remove_test_cert(cert_file)

@asyncio_run
async def test_verify_query_async(self):
measurement = f'test{random_hex(6)}'
data = []
lp_template = "%s,location=%s val=%f,ival=%di,index=%di %d"
data_size = 10
interval = 1_000_000_000 * 10
ts = time.time_ns() - interval * data_size
locations = ['springfield', 'gotham', 'balbec', 'yonville']
for i in range(data_size):
data.append(lp_template % (measurement, locations[random.randint(0, len(locations) - 1)],
random.random() * 10,
random.randint(0, 6), i, ts))
ts = ts + interval

self.client.write(data)
query = f"SELECT * FROM \"{measurement}\" ORDER BY time DESC"

result = await self.client.query_async(query)

result_list = result.to_pylist()
for item in data:
assert lp_to_py_object(item) in result_list, f"original lp data \"{item}\" should be in result list"
Loading
Loading