@@ -69,12 +69,26 @@ def _deep_merge(target, source):
69
69
elif isinstance (target , list ) and isinstance (source , list ):
70
70
# If both target and source are lists, concatenate them
71
71
target .extend (source )
72
- else :
72
+ elif source is not None :
73
73
# For other types, simply replace the target with the source
74
74
target = source
75
75
return target
76
76
77
77
78
+ def _merge_options (defaults , exclude_keys = None , custom = None ):
79
+ """
80
+ Merge default option arguments with custom (user-provided) arguments,
81
+ excluding specific keys defined in exclude_keys.
82
+ """
83
+ if custom is None or len (custom ) == 0 :
84
+ return defaults
85
+
86
+ if exclude_keys is None :
87
+ exclude_keys = []
88
+
89
+ return _deep_merge (defaults , {key : value for key , value in custom .items () if key not in exclude_keys })
90
+
91
+
78
92
class InfluxDBClient3 :
79
93
def __init__ (
80
94
self ,
@@ -135,14 +149,6 @@ def __init__(
135
149
port = query_port_overwrite
136
150
self ._flight_client = FlightClient (f"grpc+tls://{ hostname } :{ port } " , ** self ._flight_client_options )
137
151
138
- def _merge_options (self , defaults , custom = {}):
139
- """
140
- Merge default option arguments with custom (user-provided) arguments.
141
- """
142
- if len (custom ) == 0 :
143
- return defaults
144
- return _deep_merge (defaults , {key : value for key , value in custom .items ()})
145
-
146
152
def write (self , record = None , database = None , ** kwargs ):
147
153
"""
148
154
Write data to InfluxDB.
@@ -214,20 +220,23 @@ def _process_dataframe(self, df, measurement_name, tag_columns, timestamp_column
214
220
data_frame_tag_columns = tag_columns ,
215
221
data_frame_timestamp_column = timestamp_column , ** kwargs )
216
222
217
- def query (self , query , language = "sql" , mode = "all" , database = None , ** kwargs ):
218
- """
219
- Query data from InfluxDB.
220
-
221
- :param query: The query string.
222
- :type query: str
223
- :param language: The query language; "sql" or "influxql" (default is "sql").
224
- :type language: str
225
- :param mode: The mode of fetching data (all, pandas, chunk, reader, schema).
226
- :type mode: str
223
+ def query (self , query : str , language : str = "sql" , mode : str = "all" , database : str = None , ** kwargs ):
224
+ """Query data from InfluxDB.
225
+
226
+ If you want to use query parameters, you can pass them as kwargs:
227
+
228
+ >>> client.query("select * from cpu where host=$host", query_parameters={"host": "server01"})
229
+
230
+ :param query: The query to execute on the database.
231
+ :param language: The query language to use. It should be one of "influxql" or "sql". Defaults to "sql".
232
+ :param mode: The mode to use for the query. It should be one of "all", "pandas", "polars", "chunk",
233
+ "reader" or "schema". Defaults to "all".
227
234
:param database: The database to query from. If not provided, uses the database provided during initialization.
228
- :type database: str
229
- :param kwargs: FlightClientCallOptions for the query.
230
- :return: The queried data.
235
+ :param kwargs: Additional arguments to pass to the ``FlightCallOptions headers``. For example, it can be used to
236
+ set up per request headers.
237
+ :keyword query_parameters: The query parameters to use in the query.
238
+ It should be a ``dictionary`` of key-value pairs.
239
+ :return: The query result in the specified mode.
231
240
"""
232
241
if mode == "polars" and polars is False :
233
242
raise ImportError ("Polars is not installed. Please install it with `pip install polars`." )
@@ -241,10 +250,22 @@ def query(self, query, language="sql", mode="all", database=None, **kwargs):
241
250
"headers" : [(b"authorization" , f"Bearer { self ._token } " .encode ('utf-8' ))],
242
251
"timeout" : 300
243
252
}
244
- opts = self . _merge_options (optargs , kwargs )
253
+ opts = _merge_options (optargs , exclude_keys = [ 'query_parameters' ], custom = kwargs )
245
254
_options = FlightCallOptions (** opts )
246
255
247
- ticket_data = {"database" : database , "sql_query" : query , "query_type" : language }
256
+ #
257
+ # Ticket data
258
+ #
259
+ ticket_data = {
260
+ "database" : database ,
261
+ "sql_query" : query ,
262
+ "query_type" : language
263
+ }
264
+ # add query parameters
265
+ query_parameters = kwargs .get ("query_parameters" , None )
266
+ if query_parameters :
267
+ ticket_data ["params" ] = query_parameters
268
+
248
269
ticket = Ticket (json .dumps (ticket_data ).encode ('utf-8' ))
249
270
flight_reader = self ._flight_client .do_get (ticket , _options )
250
271
0 commit comments