Skip to content

Commit 12ab822

Browse files
authored
docs: add an example: How to use RxPY to prepare batches by maximum bytes count. (#397)
1 parent 7a24f63 commit 12ab822

File tree

3 files changed

+120
-1
lines changed

3 files changed

+120
-1
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
### Others
1414
1. [#472](https://github.com/influxdata/influxdb-client-python/pull/472): Drop supports for Python 3.6
1515

16+
### Documentation
17+
1. [#397](https://github.com/influxdata/influxdb-client-python/pull/397): Add an example: How to use RxPY to prepare batches by maximum bytes count
18+
1619
## 1.31.0 [2022-07-29]
1720

1821
### Features

examples/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
the example requires:
1515
- manually download [NYC TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
1616
- install Apache Arrow `pip install pyarrow` dependency
17-
17+
- [write_batching_by_bytes_count.py](write_batching_by_bytes_count.py) - How to use RxPY to prepare batches by maximum bytes count.
1818

1919
## Queries
2020
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`
+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
"""
2+
How to use RxPY to prepare batches by maximum bytes count.
3+
"""
4+
5+
from csv import DictReader
6+
from functools import reduce
7+
from typing import Collection
8+
9+
import reactivex as rx
10+
from reactivex import operators as ops, Observable
11+
12+
from influxdb_client import InfluxDBClient, Point
13+
from influxdb_client.client.write.retry import WritesRetry
14+
from influxdb_client.client.write_api import SYNCHRONOUS
15+
16+
17+
def csv_to_generator(csv_file_path):
18+
"""
19+
Parse your CSV file into generator
20+
"""
21+
for row in DictReader(open(csv_file_path, 'r')):
22+
point = Point('financial-analysis') \
23+
.tag('type', 'vix-daily') \
24+
.field('open', float(row['VIX Open'])) \
25+
.field('high', float(row['VIX High'])) \
26+
.field('low', float(row['VIX Low'])) \
27+
.field('close', float(row['VIX Close'])) \
28+
.time(row['Date'])
29+
yield point
30+
31+
32+
def _buffer_bytes_size(buffer: Collection['bytes']):
33+
"""
34+
Calculate size of buffer
35+
"""
36+
return reduce(lambda total, actual: total + actual, map(lambda x: len(x), buffer)) + (
37+
len(buffer))
38+
39+
40+
def buffer_by_bytes_count(bytes_count: int = 5120):
41+
"""
42+
Buffer items until the bytes count is reached.
43+
"""
44+
45+
def _buffer_by_bytes_count(source: Observable) -> Observable:
46+
def subscribe(observer, scheduler=None):
47+
observer.buffer = []
48+
49+
def on_next(current):
50+
observer.buffer.append(current)
51+
# Emit new batch if the buffer size is greater then boundary
52+
if (_buffer_bytes_size(observer.buffer) + len(current)) >= bytes_count:
53+
# emit batch
54+
observer.on_next(observer.buffer)
55+
observer.buffer = []
56+
57+
def on_error(exception):
58+
observer.buffer = []
59+
observer.on_error(exception)
60+
61+
def on_completed():
62+
if len(observer.buffer) >= 0:
63+
# flush rest of buffer
64+
observer.on_next(observer.buffer)
65+
observer.buffer = []
66+
observer.on_completed()
67+
68+
return source.subscribe(
69+
on_next,
70+
on_error,
71+
on_completed,
72+
scheduler=scheduler)
73+
74+
return Observable(subscribe)
75+
76+
return _buffer_by_bytes_count
77+
78+
79+
"""
80+
Define Retry strategy - 3 attempts => 2, 4, 8
81+
"""
82+
retries = WritesRetry(total=3, retry_interval=1, exponential_base=2)
83+
with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', retries=retries) as client:
84+
"""
85+
Use synchronous version of WriteApi.
86+
"""
87+
write_api = client.write_api(write_options=SYNCHRONOUS)
88+
89+
"""
90+
Prepare batches from generator:
91+
1. Map Point into LineProtocol
92+
2. Map LineProtocol into bytes
93+
3. Create batches by bytes count - 5120 - 5KiB
94+
"""
95+
batches = rx \
96+
.from_iterable(csv_to_generator('vix-daily.csv')) \
97+
.pipe(ops.map(lambda point: point.to_line_protocol())) \
98+
.pipe(ops.map(lambda line_protocol: line_protocol.encode("utf-8"))) \
99+
.pipe(buffer_by_bytes_count(bytes_count=5120))
100+
101+
102+
def write_batch(batch):
103+
"""
104+
Synchronous write
105+
"""
106+
print(f'Writing batch...')
107+
write_api.write(bucket='my-bucket', record=batch)
108+
print(f' > {_buffer_bytes_size(batch)} bytes')
109+
110+
111+
"""
112+
Write batches
113+
"""
114+
batches.subscribe(on_next=lambda batch: write_batch(batch),
115+
on_error=lambda ex: print(f'Unexpected error: {ex}'),
116+
on_completed=lambda: print('Import finished!'))

0 commit comments

Comments
 (0)