Skip to content

Commit 5e6569c

Browse files
authored
chore: influxdb_client/client/write: fix data_frame_to_list_of_points (#183)
Fix the possibility of data corruption by using a much simpler regular expression to fix up the results. Also avoid mutating the DataFrame that's been passed in by making a shallow copy. This changes semantics so that if a column mentioned in `PointSettings` exists in the actual data too, it won't be overridden. Also change the `test_write_data_frame` test, which is essentially benchmarking the `data_frame_to_list_of_points` function, so that it just does that so it can easily be run on a local machine and is independent of network speed. It runs in about 10s, which is comparable to the previous performance.
1 parent d172f26 commit 5e6569c

File tree

5 files changed

+320
-124
lines changed

5 files changed

+320
-124
lines changed

.gitignore

-1
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,3 @@ sandbox
114114
# OpenAPI-generator
115115
/.openapi-generator*
116116
**/writer.pickle
117-
/tests/data_frame_file.csv

CHANGELOG.md

+17-14
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@
66
### CI
77
1. [#179](https://github.com/influxdata/influxdb-client-python/pull/179): Updated default docker image to v2.0.3
88

9+
### Bug Fixes
10+
1. [#183](https://github.com/influxdata/influxdb-client-python/pull/183): Fixes to DataFrame writing.
11+
912
## 1.13.0 [2020-12-04]
1013

1114
### Features
1215
1. [#171](https://github.com/influxdata/influxdb-client-python/pull/171): CSV parser is able to parse export from UI
1316

1417
### Bug Fixes
15-
1. [#170](https://github.com/influxdata/influxdb-client-python/pull/170): Skip DataFrame rows without data - all fields are nan.
18+
1. [#170](https://github.com/influxdata/influxdb-client-python/pull/170): Skip DataFrame rows without data - all fields are nan.
1619

1720
### CI
1821
1. [#175](https://github.com/influxdata/influxdb-client-python/pull/175): Updated default docker image to v2.0.2
@@ -25,7 +28,7 @@
2528
1. [#161](https://github.com/influxdata/influxdb-client-python/pull/161): Added logging message for retries
2629

2730
### Bug Fixes
28-
1. [#164](https://github.com/influxdata/influxdb-client-python/pull/164): Excluded tests from packaging
31+
1. [#164](https://github.com/influxdata/influxdb-client-python/pull/164): Excluded tests from packaging
2932

3033
## 1.11.0 [2020-10-02]
3134

@@ -38,16 +41,16 @@
3841
1. [#156](https://github.com/influxdata/influxdb-client-python/pull/156): Removed labels in organization API, removed Pkg* structure and package service
3942

4043
### Bug Fixes
41-
1. [#154](https://github.com/influxdata/influxdb-client-python/pull/154): Fixed escaping string fields in DataFrame serialization
44+
1. [#154](https://github.com/influxdata/influxdb-client-python/pull/154): Fixed escaping string fields in DataFrame serialization
4245

4346
## 1.10.0 [2020-08-14]
4447

4548
### Features
46-
1. [#140](https://github.com/influxdata/influxdb-client-python/pull/140): Added exponential backoff strategy for batching writes, Allowed to configure default retry strategy. Default value for `retry_interval` is 5_000 milliseconds.
49+
1. [#140](https://github.com/influxdata/influxdb-client-python/pull/140): Added exponential backoff strategy for batching writes, Allowed to configure default retry strategy. Default value for `retry_interval` is 5_000 milliseconds.
4750
1. [#136](https://github.com/influxdata/influxdb-client-python/pull/136): Allows users to skip of verifying SSL certificate
4851
1. [#143](https://github.com/influxdata/influxdb-client-python/pull/143): Skip of verifying SSL certificate could be configured via config file or environment properties
4952
1. [#141](https://github.com/influxdata/influxdb-client-python/pull/141): Added possibility to use datetime nanoseconds precision by `pandas.Timestamp`
50-
1. [#145](https://github.com/influxdata/influxdb-client-python/pull/145): Api generator was moved to influxdb-clients-apigen
53+
1. [#145](https://github.com/influxdata/influxdb-client-python/pull/145): Api generator was moved to influxdb-clients-apigen
5154

5255
## 1.9.0 [2020-07-17]
5356

@@ -58,10 +61,10 @@
5861
1. [#132](https://github.com/influxdata/influxdb-client-python/pull/132): Use microseconds resolutions for data points
5962

6063
### Bug Fixes
61-
1. [#117](https://github.com/influxdata/influxdb-client-python/pull/117): Fixed appending default tags for single Point
62-
1. [#115](https://github.com/influxdata/influxdb-client-python/pull/115): Fixed serialization of `\n`, `\r` and `\t` to Line Protocol, `=` is valid sign for measurement name
64+
1. [#117](https://github.com/influxdata/influxdb-client-python/pull/117): Fixed appending default tags for single Point
65+
1. [#115](https://github.com/influxdata/influxdb-client-python/pull/115): Fixed serialization of `\n`, `\r` and `\t` to Line Protocol, `=` is valid sign for measurement name
6366
1. [#118](https://github.com/influxdata/influxdb-client-python/issues/118): Fixed serialization of DataFrame with empty (NaN) values
64-
1. [#130](https://github.com/influxdata/influxdb-client-python/pull/130): Use `Retry-After` header value for Retryable error codes
67+
1. [#130](https://github.com/influxdata/influxdb-client-python/pull/130): Use `Retry-After` header value for Retryable error codes
6568

6669
## 1.8.0 [2020-06-19]
6770

@@ -153,13 +156,13 @@
153156
1. [#27](https://github.com/influxdata/influxdb-client-python/issues/27): Added possibility to write bytes type of data
154157
1. [#30](https://github.com/influxdata/influxdb-client-python/issues/30): Added support for streaming a query response
155158
1. [#35](https://github.com/influxdata/influxdb-client-python/pull/35): FluxRecord supports dictionary-style access
156-
1. [#31](https://github.com/influxdata/influxdb-client-python/issues/31): Added support for delete metrics
159+
1. [#31](https://github.com/influxdata/influxdb-client-python/issues/31): Added support for delete metrics
157160

158161
### API
159162
1. [#28](https://github.com/bonitoo-io/influxdb-client-python/pull/28): Updated swagger to latest version
160163

161164
### Bug Fixes
162-
1. [#19](https://github.com/bonitoo-io/influxdb-client-python/pull/19): Removed strict checking of enum values
165+
1. [#19](https://github.com/bonitoo-io/influxdb-client-python/pull/19): Removed strict checking of enum values
163166

164167
### Documentation
165168
1. [#22](https://github.com/bonitoo-io/influxdb-client-python/issues/22): Documented how to connect to InfluxCloud
@@ -168,8 +171,8 @@
168171

169172
### Features
170173
1. [#2](https://github.com/bonitoo-io/influxdb-client-python/issues/2): The write client is able to write data in batches (configuration: `batch_size`, `flush_interval`, `jitter_interval`, `retry_interval`)
171-
1. [#5](https://github.com/bonitoo-io/influxdb-client-python/issues/5): Added support for gzip compression of query response and write body
172-
174+
1. [#5](https://github.com/bonitoo-io/influxdb-client-python/issues/5): Added support for gzip compression of query response and write body
175+
173176
### API
174177
1. [#10](https://github.com/bonitoo-io/influxdb-client-python/pull/10): Updated swagger to latest version
175178

@@ -178,5 +181,5 @@
178181
1. [#7](https://github.com/bonitoo-io/influxdb-client-python/issues/7): Drop NaN and infinity values from fields when writing to InfluxDB
179182

180183
### CI
181-
1. [#11](https://github.com/bonitoo-io/influxdb-client-python/pull/11): Switch CI to CircleCI
182-
1. [#12](https://github.com/bonitoo-io/influxdb-client-python/pull/12): CI generate code coverage report on CircleCI
184+
1. [#11](https://github.com/bonitoo-io/influxdb-client-python/pull/11): Switch CI to CircleCI
185+
1. [#12](https://github.com/bonitoo-io/influxdb-client-python/pull/12): CI generate code coverage report on CircleCI

influxdb_client/client/write/dataframe_serializer.py

+144-60
Original file line numberDiff line numberDiff line change
@@ -5,115 +5,199 @@
55
"""
66

77
import re
8-
from functools import reduce
9-
from itertools import chain
8+
import math
109

1110
from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT
1211

1312

14-
def _replace(data_frame):
15-
from ...extras import np
16-
17-
# string columns
18-
obj_cols = {k for k, v in dict(data_frame.dtypes).items() if v is np.dtype('O')}
19-
20-
# number columns
21-
other_cols = set(data_frame.columns) - obj_cols
22-
23-
obj_nans = (f'{k}=nan' for k in obj_cols)
24-
other_nans = (f'{k}=nani?' for k in other_cols)
25-
26-
replacements = [
27-
('|'.join(chain(obj_nans, other_nans)), ''),
28-
(',{2,}', ','),
29-
('|'.join([', ,', ', ', ' ,']), ' '),
30-
]
31-
32-
return replacements
33-
34-
3513
def _itertuples(data_frame):
3614
cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
3715
return zip(data_frame.index, *cols)
3816

3917

40-
def _is_nan(x):
41-
return x != x
18+
def _not_nan(x):
19+
return x == x
4220

4321

4422
def _any_not_nan(p, indexes):
45-
return any(map(lambda inx: not _is_nan(p[inx]), indexes))
23+
return any(map(lambda x: _not_nan(p[x]), indexes))
4624

4725

4826
def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
4927
"""Serialize DataFrame into LineProtocols."""
28+
# This function is hard to understand but for good reason:
29+
# the approach used here is considerably more efficient
30+
# than the alternatives.
31+
#
32+
# We build up a Python expression that efficiently converts a data point
33+
# tuple into line-protocol entry, and then evaluate the expression
34+
# as a lambda so that we can call it. This avoids the overhead of
35+
# invoking a function on every data value - we only have one function
36+
# call per row instead. The expression consists of exactly
37+
# one f-string, so we build up the parts of it as segments
38+
# that are concatenated together to make the full f-string inside
39+
# the lambda.
40+
#
41+
# Things are made a little more complex because fields and tags with NaN
42+
# values and empty tags are omitted from the generated line-protocol
43+
# output.
44+
#
45+
# As an example, say we have a data frame with two value columns:
46+
# a float
47+
# b int
48+
#
49+
# This will generate a lambda expression to be evaluated that looks like
50+
# this:
51+
#
52+
# lambda p: f"""{measurement_name} {keys[0]}={p[1]},{keys[1]}={p[2]}i {p[0].value}"""
53+
#
54+
# This lambda is then executed for each row p.
55+
#
56+
# When NaNs are present, the expression looks like this (split
57+
# across two lines to satisfy the code-style checker)
58+
#
59+
# lambda p: f"""{measurement_name} {"" if math.isnan(p[1])
60+
# else f"{keys[0]}={p[1]}"},{keys[1]}={p[2]}i {p[0].value}"""
61+
#
62+
# When there's a NaN value in column a, we'll end up with a comma at the start of the
63+
# fields, so we run a regexp substitution after generating the line-protocol entries
64+
# to remove this.
65+
#
66+
# We're careful to run these potentially costly extra steps only when NaN values actually
67+
# exist in the data.
68+
5069
from ...extras import pd, np
5170
if not isinstance(data_frame, pd.DataFrame):
5271
raise TypeError('Must be DataFrame, but type was: {0}.'
5372
.format(type(data_frame)))
5473

55-
if 'data_frame_measurement_name' not in kwargs:
74+
data_frame_measurement_name = kwargs.get('data_frame_measurement_name')
75+
if data_frame_measurement_name is None:
5676
raise TypeError('"data_frame_measurement_name" is a Required Argument')
5777

78+
data_frame = data_frame.copy(deep=False)
5879
if isinstance(data_frame.index, pd.PeriodIndex):
5980
data_frame.index = data_frame.index.to_timestamp()
6081
else:
82+
# TODO: this is almost certainly not what you want
83+
# when the index is the default RangeIndex.
84+
# Instead, it would probably be better to leave
85+
# out the timestamp unless a time column is explicitly
86+
# enabled.
6187
data_frame.index = pd.to_datetime(data_frame.index)
6288

6389
if data_frame.index.tzinfo is None:
6490
data_frame.index = data_frame.index.tz_localize('UTC')
6591

66-
measurement_name = str(kwargs.get('data_frame_measurement_name')).translate(_ESCAPE_MEASUREMENT)
6792
data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
6893
data_frame_tag_columns = set(data_frame_tag_columns or [])
6994

95+
# keys holds a list of string keys.
96+
keys = []
97+
# tags holds a list of tag f-string segments ordered alphabetically by tag key.
7098
tags = []
99+
# fields holds a list of field f-string segments ordered alphebetically by field key
71100
fields = []
72-
fields_indexes = []
73-
keys = []
101+
# field_indexes holds the index into each row of all the fields.
102+
field_indexes = []
74103

75104
if point_settings.defaultTags:
76105
for key, value in point_settings.defaultTags.items():
77-
data_frame[key] = value
78-
data_frame_tag_columns.add(key)
79-
80-
for index, (key, value) in enumerate(data_frame.dtypes.items()):
106+
# Avoid overwriting existing data if there's a column
107+
# that already exists with the default tag's name.
108+
# Note: when a new column is added, the old DataFrame
109+
# that we've made a shallow copy of is unaffected.
110+
# TODO: when there are NaN or empty values in
111+
# the column, we could make a deep copy of the
112+
# data and fill in those values with the default tag value.
113+
if key not in data_frame.columns:
114+
data_frame[key] = value
115+
data_frame_tag_columns.add(key)
116+
117+
# Get a list of all the columns sorted by field/tag key.
118+
# We want to iterate through the columns in sorted order
119+
# so that we know when we're on the first field so we
120+
# can know whether a comma is needed for that
121+
# field.
122+
columns = sorted(enumerate(data_frame.dtypes.items()), key=lambda col: col[1][0])
123+
124+
# null_columns has a bool value for each column holding
125+
# whether that column contains any null (NaN or None) values.
126+
null_columns = data_frame.isnull().any()
127+
128+
# Iterate through the columns building up the expression for each column.
129+
for index, (key, value) in columns:
81130
key = str(key)
131+
key_format = f'{{keys[{len(keys)}]}}'
82132
keys.append(key.translate(_ESCAPE_KEY))
83-
key_format = f'{{keys[{index}]}}'
133+
# The field index is one more than the column index because the
134+
# time index is at column zero in the finally zipped-together
135+
# result columns.
136+
field_index = index + 1
137+
val_format = f'p[{field_index}]'
84138

85-
index_value = index + 1
86139
if key in data_frame_tag_columns:
87-
tags.append({'key': key, 'value': f"{key_format}={{str(p[{index_value}]).translate(_ESCAPE_KEY)}}"})
88-
elif issubclass(value.type, np.integer):
89-
fields.append(f"{key_format}={{p[{index_value}]}}i")
90-
fields_indexes.append(index_value)
91-
elif issubclass(value.type, (np.float, np.bool_)):
92-
fields.append(f"{key_format}={{p[{index_value}]}}")
93-
fields_indexes.append(index_value)
140+
# This column is a tag column.
141+
if null_columns[index]:
142+
key_value = f"""{{
143+
'' if {val_format} == '' or type({val_format}) == float and math.isnan({val_format}) else
144+
f',{key_format}={{str({val_format}).translate(_ESCAPE_STRING)}}'
145+
}}"""
146+
else:
147+
key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}'
148+
tags.append(key_value)
149+
continue
150+
151+
# This column is a field column.
152+
# Note: no comma separator is needed for the first field.
153+
# It's important to omit it because when the first
154+
# field column has no nulls, we don't run the comma-removal
155+
# regexp substitution step.
156+
sep = '' if len(field_indexes) == 0 else ','
157+
if issubclass(value.type, np.integer):
158+
field_value = f"{sep}{key_format}={{{val_format}}}i"
159+
elif issubclass(value.type, np.bool_):
160+
field_value = f'{sep}{key_format}={{{val_format}}}'
161+
elif issubclass(value.type, np.float):
162+
if null_columns[index]:
163+
field_value = f"""{{"" if math.isnan({val_format}) else f"{sep}{key_format}={{{val_format}}}"}}"""
164+
else:
165+
field_value = f'{sep}{key_format}={{{val_format}}}'
94166
else:
95-
fields.append(f"{key_format}=\"{{str(p[{index_value}]).translate(_ESCAPE_STRING)}}\"")
96-
fields_indexes.append(index_value)
97-
98-
tags.sort(key=lambda x: x['key'])
99-
tags = ','.join(map(lambda y: y['value'], tags))
100-
101-
fmt = ('{measurement_name}', f'{"," if tags else ""}', tags,
102-
' ', ','.join(fields), ' {p[0].value}')
103-
f = eval("lambda p: f'{}'".format(''.join(fmt)),
104-
{'measurement_name': measurement_name, '_ESCAPE_KEY': _ESCAPE_KEY, '_ESCAPE_STRING': _ESCAPE_STRING,
105-
'keys': keys})
167+
if null_columns[index]:
168+
field_value = f"""{{
169+
'' if type({val_format}) == float and math.isnan({val_format}) else
170+
f'{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"'
171+
}}"""
172+
else:
173+
field_value = f'''{sep}{key_format}="{{str({val_format}).translate(_ESCAPE_STRING)}}"'''
174+
field_indexes.append(field_index)
175+
fields.append(field_value)
176+
177+
measurement_name = str(data_frame_measurement_name).translate(_ESCAPE_MEASUREMENT)
178+
179+
tags = ''.join(tags)
180+
fields = ''.join(fields)
181+
timestamp = '{p[0].value}'
182+
183+
f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', {
184+
'measurement_name': measurement_name,
185+
'_ESCAPE_KEY': _ESCAPE_KEY,
186+
'_ESCAPE_STRING': _ESCAPE_STRING,
187+
'keys': keys,
188+
'math': math,
189+
})
106190

107191
for k, v in dict(data_frame.dtypes).items():
108192
if k in data_frame_tag_columns:
109193
data_frame[k].replace('', np.nan, inplace=True)
110194

111-
isnull = data_frame.isnull().any(axis=1)
112-
113-
if isnull.any():
114-
rep = _replace(data_frame)
115-
lp = (reduce(lambda a, b: re.sub(*b, a), rep, f(p))
116-
for p in filter(lambda x: _any_not_nan(x, fields_indexes), _itertuples(data_frame)))
195+
first_field_maybe_null = null_columns[field_indexes[0] - 1]
196+
if first_field_maybe_null:
197+
# When the first field is null (None/NaN), we'll have
198+
# a spurious leading comma which needs to be removed.
199+
lp = (re.sub('^((\\ |[^ ])* ),', '\\1', f(p))
200+
for p in filter(lambda x: _any_not_nan(x, field_indexes), _itertuples(data_frame)))
117201
return list(lp)
118202
else:
119203
return list(map(f, _itertuples(data_frame)))

influxdb_client/client/write/point.py

+25-3
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,32 @@
1414
from influxdb_client.domain.write_precision import WritePrecision
1515

1616
EPOCH = UTC.localize(datetime.utcfromtimestamp(0))
17+
1718
DEFAULT_WRITE_PRECISION = WritePrecision.NS
18-
_ESCAPE_MEASUREMENT = str.maketrans({'\\': '\\\\', ',': r'\,', ' ': r'\ ', '\n': '\\n', '\t': '\\t', '\r': '\\r'})
19-
_ESCAPE_KEY = str.maketrans({'\\': '\\\\', ',': r'\,', ' ': r'\ ', '=': r'\=', '\n': '\\n', '\t': '\\t', '\r': '\\r'})
20-
_ESCAPE_STRING = str.maketrans({'\"': r"\"", "\\": r"\\"})
19+
20+
_ESCAPE_MEASUREMENT = str.maketrans({
21+
'\\': r'\\', # Note: this is wrong. Backslashes are not escaped like this in measurements.
22+
',': r'\,',
23+
' ': r'\ ',
24+
'\n': r'\n',
25+
'\t': r'\t',
26+
'\r': r'\r',
27+
})
28+
29+
_ESCAPE_KEY = str.maketrans({
30+
'\\': r'\\', # Note: this is wrong. Backslashes are not escaped like this in keys.
31+
',': r'\,',
32+
'=': r'\=',
33+
' ': r'\ ',
34+
'\n': r'\n',
35+
'\t': r'\t',
36+
'\r': r'\r',
37+
})
38+
39+
_ESCAPE_STRING = str.maketrans({
40+
'"': r'\"',
41+
'\\': r'\\',
42+
})
2143

2244

2345
class Point(object):

0 commit comments

Comments
 (0)