Skip to content

Commit a3f5b7a

Browse files
committed
feat: disable grpc response compression
1 parent fac1793 commit a3f5b7a

File tree

6 files changed

+342
-2
lines changed

6 files changed

+342
-2
lines changed

influxdb_client_3/__init__.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
INFLUX_WRITE_NO_SYNC = "INFLUX_WRITE_NO_SYNC"
3232
INFLUX_WRITE_TIMEOUT = "INFLUX_WRITE_TIMEOUT"
3333
INFLUX_QUERY_TIMEOUT = "INFLUX_QUERY_TIMEOUT"
34+
INFLUX_DISABLE_GRPC_COMPRESSION = "INFLUX_DISABLE_GRPC_COMPRESSION"
3435

3536

3637
def write_client_options(**kwargs):
@@ -190,6 +191,7 @@ def __init__(
190191
flight_client_options=None,
191192
write_port_overwrite=None,
192193
query_port_overwrite=None,
194+
disable_grpc_compression=False,
193195
**kwargs):
194196
"""
195197
Initialize an InfluxDB client.
@@ -206,6 +208,8 @@ def __init__(
206208
:type write_client_options: dict[str, any]
207209
:param flight_client_options: dictionary for providing additional arguments for the FlightClient.
208210
:type flight_client_options: dict[str, any]
211+
:param disable_grpc_compression: Disable gRPC compression for Flight query responses. Default is False.
212+
:type disable_grpc_compression: bool
209213
:key auth_scheme: token authentication scheme. Set to "Bearer" for Edge.
210214
:key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
211215
:key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.
@@ -291,6 +295,8 @@ def __init__(
291295
connection_string = f"grpc+tcp://{hostname}:{port}"
292296

293297
q_opts_builder = QueryApiOptionsBuilder()
298+
if disable_grpc_compression:
299+
q_opts_builder.disable_grpc_compression(True)
294300
if kw_keys.__contains__('ssl_ca_cert'):
295301
q_opts_builder.root_certs(kwargs.get('ssl_ca_cert', None))
296302
if kw_keys.__contains__('verify_ssl'):
@@ -361,13 +367,20 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
361367
if os.getenv(INFLUX_AUTH_SCHEME) is not None:
362368
kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME)
363369

370+
disable_grpc_compression = os.getenv(INFLUX_DISABLE_GRPC_COMPRESSION)
371+
if disable_grpc_compression is not None:
372+
disable_grpc_compression = disable_grpc_compression.strip().lower() in ['true', '1', 't', 'y', 'yes']
373+
else:
374+
disable_grpc_compression = False
375+
364376
org = os.getenv(INFLUX_ORG, "default")
365377
return InfluxDBClient3(
366378
host=required_vars[INFLUX_HOST],
367379
token=required_vars[INFLUX_TOKEN],
368380
database=required_vars[INFLUX_DATABASE],
369381
write_client_options=write_client_option,
370382
org=org,
383+
disable_grpc_compression=disable_grpc_compression,
371384
**kwargs
372385
)
373386

influxdb_client_3/query/query_api.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,22 @@ class QueryApiOptions(object):
1919
proxy (str): URL to a proxy server
2020
flight_client_options (dict): base set of flight client options passed to internal pyarrow.flight.FlightClient
2121
timeout(float): timeout in seconds to wait for a response
22+
disable_grpc_compression (bool): disable gRPC compression for query responses
2223
"""
2324
_DEFAULT_TIMEOUT = 300.0
2425
tls_root_certs: bytes = None
2526
tls_verify: bool = None
2627
proxy: str = None
2728
flight_client_options: dict = None
2829
timeout: float = None
30+
disable_grpc_compression: bool = False
2931

3032
def __init__(self, root_certs_path: str,
3133
verify: bool,
3234
proxy: str,
3335
flight_client_options: dict,
34-
timeout: float = _DEFAULT_TIMEOUT):
36+
timeout: float = _DEFAULT_TIMEOUT,
37+
disable_grpc_compression: bool = False):
3538
"""
3639
Initialize a set of QueryApiOptions
3740
@@ -41,13 +44,15 @@ def __init__(self, root_certs_path: str,
4144
:param flight_client_options: set of flight_client_options
4245
to be passed to internal pyarrow.flight.FlightClient.
4346
:param timeout: timeout in seconds to wait for a response.
47+
:param disable_grpc_compression: disable gRPC compression for query responses.
4448
"""
4549
if root_certs_path:
4650
self.tls_root_certs = self._read_certs(root_certs_path)
4751
self.tls_verify = verify
4852
self.proxy = proxy
4953
self.flight_client_options = flight_client_options
5054
self.timeout = timeout
55+
self.disable_grpc_compression = disable_grpc_compression
5156

5257
def _read_certs(self, path: str) -> bytes:
5358
with open(path, "rb") as certs_file:
@@ -75,6 +80,7 @@ class QueryApiOptionsBuilder(object):
7580
_proxy: str = None
7681
_flight_client_options: dict = None
7782
_timeout: float = None
83+
_disable_grpc_compression: bool = False
7884

7985
def root_certs(self, path: str):
8086
self._root_certs_path = path
@@ -96,6 +102,11 @@ def timeout(self, timeout: float):
96102
self._timeout = timeout
97103
return self
98104

105+
def disable_grpc_compression(self, disable: bool):
106+
"""Disable gRPC compression for query responses."""
107+
self._disable_grpc_compression = disable
108+
return self
109+
99110
def build(self) -> QueryApiOptions:
100111
"""Build a QueryApiOptions object with previously set values"""
101112
return QueryApiOptions(
@@ -104,6 +115,7 @@ def build(self) -> QueryApiOptions:
104115
proxy=self._proxy,
105116
flight_client_options=self._flight_client_options,
106117
timeout=self._timeout,
118+
disable_grpc_compression=self._disable_grpc_compression,
107119
)
108120

109121

@@ -162,6 +174,13 @@ def __init__(self,
162174
self._flight_client_options["disable_server_verification"] = not options.tls_verify
163175
if options.timeout is not None:
164176
self._default_timeout = options.timeout
177+
if options.disable_grpc_compression:
178+
# Disable gRPC response compression by only enabling identity algorithm
179+
# Bitset: bit 0 = identity, bit 1 = deflate, bit 2 = gzip
180+
# Setting to 1 (0b001) enables only identity (no compression)
181+
self._flight_client_options["generic_options"].append(
182+
("grpc.compression_enabled_algorithms_bitset", 1)
183+
)
165184
if self._proxy:
166185
self._flight_client_options["generic_options"].append(("grpc.http_proxy", self._proxy))
167186
self._flight_client = FlightClient(connection_string, **self._flight_client_options)

setup.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ def get_version():
5151
'pandas': ['pandas'],
5252
'polars': ['polars'],
5353
'dataframe': ['pandas', 'polars'],
54-
'test': ['pytest', 'pytest-cov', 'pytest-httpserver']
54+
'test': [
55+
'pytest',
56+
'pytest-cov',
57+
'pytest-httpserver',
58+
'mitmproxy>=11.0.0,<12.0.0; python_version>="3.10"',
59+
]
5560
},
5661
install_requires=requires,
5762
python_requires='>=3.8',

tests/test_influxdb_client_3.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,74 @@ def test_parse_invalid_write_timeout_range(self):
302302
with self.assertRaisesRegex(ValueError, ".*Must be non-negative.*"):
303303
InfluxDBClient3.from_env()
304304

305+
def assertGrpcCompressionDisabled(self, client, disabled):
306+
"""Assert whether gRPC compression is disabled for the client."""
307+
self.assertIsInstance(client, InfluxDBClient3)
308+
generic_options = dict(client._query_api._flight_client_options['generic_options'])
309+
if disabled:
310+
self.assertEqual(generic_options.get('grpc.compression_enabled_algorithms_bitset'), 1)
311+
else:
312+
self.assertIsNone(generic_options.get('grpc.compression_enabled_algorithms_bitset'))
313+
314+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
315+
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': 'true'})
316+
def test_from_env_disable_grpc_compression_true(self):
317+
client = InfluxDBClient3.from_env()
318+
self.assertGrpcCompressionDisabled(client, True)
319+
320+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
321+
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': 'TrUe'})
322+
def test_from_env_disable_grpc_compression_true_mixed_case(self):
323+
client = InfluxDBClient3.from_env()
324+
self.assertGrpcCompressionDisabled(client, True)
325+
326+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
327+
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': '1'})
328+
def test_from_env_disable_grpc_compression_one(self):
329+
client = InfluxDBClient3.from_env()
330+
self.assertGrpcCompressionDisabled(client, True)
331+
332+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
333+
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': 'false'})
334+
def test_from_env_disable_grpc_compression_false(self):
335+
client = InfluxDBClient3.from_env()
336+
self.assertGrpcCompressionDisabled(client, False)
337+
338+
@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
339+
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': 'anything-else'})
340+
def test_from_env_disable_grpc_compression_anything_else_is_false(self):
341+
client = InfluxDBClient3.from_env()
342+
self.assertGrpcCompressionDisabled(client, False)
343+
344+
def test_disable_grpc_compression_parameter_true(self):
345+
client = InfluxDBClient3(
346+
host="localhost",
347+
org="my_org",
348+
database="my_db",
349+
token="my_token",
350+
disable_grpc_compression=True
351+
)
352+
self.assertGrpcCompressionDisabled(client, True)
353+
354+
def test_disable_grpc_compression_parameter_false(self):
355+
client = InfluxDBClient3(
356+
host="localhost",
357+
org="my_org",
358+
database="my_db",
359+
token="my_token",
360+
disable_grpc_compression=False
361+
)
362+
self.assertGrpcCompressionDisabled(client, False)
363+
364+
def test_disable_grpc_compression_default_is_false(self):
365+
client = InfluxDBClient3(
366+
host="localhost",
367+
org="my_org",
368+
database="my_db",
369+
token="my_token",
370+
)
371+
self.assertGrpcCompressionDisabled(client, False)
372+
305373
def test_query_with_arrow_error(self):
306374
f = ErrorFlightServer()
307375
with InfluxDBClient3(f"http://localhost:{f.port}", "my_org", "my_db", "my_token") as c:

tests/test_influxdb_client_3_integration.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pytest
55
import random
66
import string
7+
import sys
78
import time
89
import unittest
910

@@ -455,3 +456,115 @@ def retry_cb(args, data, excp):
455456
self.assertEqual(lp, ErrorResult["rd"].decode('utf-8'))
456457
self.assertIsNotNone(ErrorResult["rx"])
457458
self.assertIsInstance(ErrorResult["rx"], Url3TimeoutError)
459+
460+
def test_disable_grpc_compression(self):
461+
"""
462+
Test that disable_grpc_compression parameter controls query response compression.
463+
464+
On Python 3.10+: Uses mitmproxy to intercept and verify request+response headers.
465+
On Python <3.10: Only verifies data integrity (mitmproxy not available).
466+
"""
467+
# Test cases
468+
test_cases = [
469+
{
470+
'name': 'default',
471+
'disable_grpc_compression': None,
472+
'expected_req_encoding': 'identity, deflate, gzip',
473+
'expected_resp_encoding': 'gzip',
474+
},
475+
{
476+
'name': 'disabled=False',
477+
'disable_grpc_compression': False,
478+
'expected_req_encoding': 'identity, deflate, gzip',
479+
'expected_resp_encoding': 'gzip',
480+
},
481+
{
482+
'name': 'disabled=True',
483+
'disable_grpc_compression': True,
484+
'expected_req_encoding': 'identity',
485+
'expected_resp_encoding': None,
486+
},
487+
]
488+
489+
# Check if mitmproxy is available (Python 3.10+ only)
490+
mitmproxy_available = sys.version_info >= (3, 10)
491+
proxy = None
492+
proxy_url = None
493+
494+
if mitmproxy_available:
495+
try:
496+
from tests.util.mitmproxy import MitmproxyServer
497+
proxy = MitmproxyServer()
498+
proxy.__enter__()
499+
proxy_url = proxy.url
500+
except ImportError as e:
501+
logging.error(f"mitmproxy not available: {e}")
502+
mitmproxy_available = False
503+
504+
try:
505+
test_id = time.time_ns()
506+
measurement = f'grpc_compression_test_{random_hex(6)}'
507+
508+
# Write test data points
509+
num_points = 10
510+
lines = [
511+
f'{measurement},type=test value={i}.0,counter={i}i,test_id={test_id}i {test_id + i * 1000000}'
512+
for i in range(num_points)
513+
]
514+
self.client.write('\n'.join(lines))
515+
516+
test_query = f"SELECT * FROM \"{measurement}\" WHERE test_id = {test_id} ORDER BY counter"
517+
518+
# Wait for data to be available
519+
result = None
520+
start = time.time()
521+
while time.time() - start < 10:
522+
result = self.client.query(test_query, mode="all")
523+
if len(result) >= num_points:
524+
break
525+
time.sleep(0.5)
526+
self.assertEqual(len(result), num_points, "Data not available after write")
527+
528+
for tc in test_cases:
529+
name = tc['name']
530+
531+
# Build client kwargs
532+
client_kwargs = {
533+
'host': self.host,
534+
'database': self.database,
535+
'token': self.token,
536+
'proxy': proxy_url,
537+
'verify_ssl': False if proxy_url else True,
538+
}
539+
if tc['disable_grpc_compression'] is not None:
540+
client_kwargs['disable_grpc_compression'] = tc['disable_grpc_compression']
541+
542+
client = InfluxDBClient3(**client_kwargs)
543+
try:
544+
result = client.query(test_query, mode="all")
545+
self.assertEqual(len(result), num_points, f"[{name}] Should return {num_points} rows")
546+
finally:
547+
client.close()
548+
549+
# Verify headers (Python 3.10+ only)
550+
if mitmproxy_available and proxy:
551+
req_encoding = proxy.capture.get_last_request_header('grpc-accept-encoding')
552+
resp_encoding = proxy.capture.get_last_response_header('grpc-encoding')
553+
554+
print(f"\n[{name}] Request grpc-accept-encoding: {req_encoding}")
555+
print(f"[{name}] Response grpc-encoding: {resp_encoding}")
556+
557+
self.assertEqual(req_encoding, tc['expected_req_encoding'],
558+
f"[{name}] Unexpected request encoding")
559+
560+
if tc['expected_resp_encoding']:
561+
self.assertEqual(resp_encoding, tc['expected_resp_encoding'],
562+
f"[{name}] Unexpected response encoding")
563+
else:
564+
self.assertTrue(resp_encoding is None or resp_encoding == 'identity',
565+
f"[{name}] Expected no compression, got: {resp_encoding}")
566+
567+
proxy.capture.clear()
568+
finally:
569+
if proxy:
570+
proxy.__exit__(None, None, None)

0 commit comments

Comments
 (0)