Skip to content

Commit 9d7ef9b

Browse files
authored
feat: Accept None as column values in Buffer.row() API. (#5)
1 parent 0485112 commit 9d7ef9b

File tree

4 files changed

+88
-24
lines changed

4 files changed

+88
-24
lines changed

docs/examples.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ Explicit Buffers
3535

3636
For more advanced use cases where the same messages need to be sent to multiple
3737
questdb instances or you want to decouple serialization and sending (as may be
38-
in a multi-threaded application) construct :class:``questdb.ingress.Buffer``
39-
objects explicitly, then pass them to the :func:``questdb.ingress.Sender.flush``
38+
in a multi-threaded application) construct :class:`questdb.ingress.Buffer`
39+
objects explicitly, then pass them to the :func:`questdb.ingress.Sender.flush`
4040
method.
4141

4242
Note that this bypasses ``auto-flush`` logic

proj.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ def clean():
136136
_rm(PROJ_ROOT / 'src', '**/*.dylib')
137137
_rm(PROJ_ROOT / 'src', '**/*.c')
138138
_rm(PROJ_ROOT / 'src', '**/*.html')
139+
_rm(PROJ_ROOT, 'rustup-init.exe')
140+
_rm(PROJ_ROOT, 'rustup-init.sh')
139141

140142

141143
@command

src/questdb/ingress.pyx

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -462,15 +462,15 @@ cdef class Buffer:
462462
"""
463463
line_sender_buffer_clear(self._impl)
464464
465-
def __len__(self):
465+
def __len__(self) -> int:
466466
"""
467467
The current number of bytes currently in the buffer.
468468

469469
Equivalent (but cheaper) to ``len(str(sender))``.
470470
"""
471471
return line_sender_buffer_size(self._impl)
472472
473-
def __str__(self):
473+
def __str__(self) -> str:
474474
"""Return the constructed buffer as a string. Use for debugging."""
475475
return self._to_str()
476476
@@ -595,22 +595,19 @@ cdef class Buffer:
595595
cdef line_sender_error* err = NULL
596596
if not line_sender_buffer_at(self._impl, ts.value, &err):
597597
raise c_err_to_py(err)
598-
self._may_trigger_row_complete()
599598
return 0
600599
601600
cdef inline int _at_dt(self, datetime dt) except -1:
602601
cdef int64_t value = datetime_to_nanos(dt)
603602
cdef line_sender_error* err = NULL
604603
if not line_sender_buffer_at(self._impl, value, &err):
605604
raise c_err_to_py(err)
606-
self._may_trigger_row_complete()
607605
return 0
608606
609607
cdef inline int _at_now(self) except -1:
610608
cdef line_sender_error* err = NULL
611609
if not line_sender_buffer_at_now(self._impl, &err):
612610
raise c_err_to_py(err)
613-
self._may_trigger_row_complete()
614611
return 0
615612
616613
cdef inline int _at(self, object ts) except -1:
@@ -634,6 +631,7 @@ cdef class Buffer:
634631
"""
635632
Add a row to the buffer.
636633
"""
634+
cdef bint wrote_fields = False
637635
self._set_marker()
638636
try:
639637
self._table(table_name)
@@ -644,14 +642,22 @@ cdef class Buffer:
644642
if symbols is not None:
645643
for name, value in symbols.items():
646644
self._symbol(name, value)
645+
wrote_fields = True
647646
if columns is not None:
648647
for name, value in columns.items():
649-
self._column(name, value)
650-
self._at(at)
651-
self._clear_marker()
648+
if value is not None:
649+
self._column(name, value)
650+
wrote_fields = True
651+
if wrote_fields:
652+
self._at(at)
653+
self._clear_marker()
654+
else:
655+
self._rewind_to_marker()
652656
except:
653657
self._rewind_to_marker()
654658
raise
659+
if wrote_fields:
660+
self._may_trigger_row_complete()
655661
656662
def row(
657663
self,
@@ -660,7 +666,8 @@ cdef class Buffer:
660666
symbols: Optional[Dict[str, str]]=None,
661667
columns: Optional[Dict[
662668
str,
663-
Union[bool, int, float, str, TimestampMicros, datetime]]]=None,
669+
Union[None, bool, int, float, str, TimestampMicros, datetime]]
670+
]=None,
664671
at: Union[None, TimestampNanos, datetime]=None):
665672
"""
666673
Add a single row (line) to the buffer.
@@ -679,7 +686,8 @@ cdef class Buffer:
679686
'col3': 3.14,
680687
'col4': 'xyz',
681688
'col5': TimestampMicros(123456789),
682-
'col6': datetime(2019, 1, 1, 12, 0, 0)},
689+
'col6': datetime(2019, 1, 1, 12, 0, 0),
690+
'col7': None},
683691
at=TimestampNanos(123456789))
684692

685693
# Only symbols specified. Designated timestamp assigned by the db.
@@ -707,10 +715,38 @@ cdef class Buffer:
707715
understand the difference between the ``SYMBOL`` and ``STRING`` types
708716
(TL;DR: symbols are interned strings).
709717

718+
Column values can be specified with Python types directly and map as so:
719+
720+
.. list-table::
721+
:header-rows: 1
722+
723+
* - Python type
724+
- Serialized as ILP type
725+
* - ``bool``
726+
- `BOOLEAN <https://questdb.io/docs/reference/api/ilp/columnset-types#boolean>`_
727+
* - ``int``
728+
- `INTEGER <https://questdb.io/docs/reference/api/ilp/columnset-types#integer>`_
729+
* - ``float``
730+
- `FLOAT <https://questdb.io/docs/reference/api/ilp/columnset-types#float>`_
731+
* - ``str``
732+
- `STRING <https://questdb.io/docs/reference/api/ilp/columnset-types#string>`_
733+
* - ``datetime.datetime`` and ``TimestampMicros``
734+
- `TIMESTAMP <https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp>`_
735+
* - ``None``
736+
- *Column is skipped and not serialized.*
737+
738+
If the destination table was already created, then the columns types
739+
will be cast to the types of the existing columns whenever possible
740+
(Refer to the QuestDB documentation pages linked above).
741+
710742
:param table_name: The name of the table to which the row belongs.
711743
:param symbols: A dictionary of symbol column names to ``str`` values.
712744
:param columns: A dictionary of column names to ``bool``, ``int``,
713745
``float``, ``str``, ``TimestampMicros`` or ``datetime`` values.
746+
As a convenience, you can also pass a ``None`` value, however - due
747+
to ILP protocol limitations - this will skip the column rather
748+
necessarily writing a ``NULL`` value, so if the column did not exist
749+
yet it will not be created.
714750
:param at: The timestamp of the row. If ``None``, timestamp is assigned
715751
by the server. If ``datetime``, the timestamp is converted to
716752
nanoseconds. A nanosecond unix epoch timestamp can be passed
@@ -764,9 +800,9 @@ cdef class Buffer:
764800

765801
# buffer.tabular(
766802
# 'table_name',
767-
# [[True, 123, 3.14, 'xyz'],
768-
# [False, 456, 6.28, 'abc'],
769-
# [True, 789, 9.87, 'def']],
803+
# [[True, None, 3.14, 'xyz'],
804+
# [False, 123, 6.28, 'abc'],
805+
# [True, 456, 9.87, 'def']],
770806
# header=['col1', 'col2', 'col3', 'col4'],
771807
# at=datetime.datetime.utcnow())
772808

@@ -848,7 +884,7 @@ cdef class Buffer:
848884
# buffer.tabular(
849885
# 'table_name',
850886
# [['abc', 123, 3.14, 'xyz'],
851-
# ['def', 456, 6.28, 'abc'],
887+
# ['def', 456, None, 'abc'],
852888
# ['ghi', 789, 9.87, 'def']],
853889
# header=['col1', 'col2', 'col3', 'col4'],
854890
# symbols=True) # `col1` and `col4` are SYMBOL columns.
@@ -941,7 +977,7 @@ cdef class Sender:
941977
sender.flush()
942978
943979
944-
**Auto-flushing (on by default)**
980+
**Auto-flushing (on by default, watermark at 63KiB)**
945981
946982
To avoid accumulating very large buffers, the sender will flush the buffer
947983
automatically once its buffer reaches a certain byte-size watermark.
@@ -987,14 +1023,14 @@ cdef class Sender:
9871023
* A special ``'insecure_skip_verify'`` string: Dangerously disable all
9881024
TLS certificate verification (do *NOT* use in production environments).
9891025
990-
**Positional constructor arguments for the ``Sender(..)``**
1026+
**Positional constructor arguments for the Sender(..)**
9911027
9921028
* ``host``: Hostname or IP address of the QuestDB server.
9931029
9941030
* ``port``: Port number of the QuestDB server.
9951031
9961032
997-
**Keyword-only constructor arguments for the ``Sender(..)``**
1033+
**Keyword-only constructor arguments for the Sender(..)**
9981034
9991035
* ``interface`` (``str``): Network interface to bind to.
10001036
Set this if you have an accelerated network interface (e.g. Solarflare)
@@ -1011,13 +1047,15 @@ cdef class Sender:
10111047
This field is expressed in milliseconds. The default is 15 seconds.
10121048
10131049
* ``init_capacity`` (``int``): Initial buffer capacity of the internal buffer.
1014-
*See :class:`Buffer`'s constructor for more details.*
1050+
*Default: 65536 (64KiB).*
1051+
*See Buffer's constructor for more details.*
10151052
10161053
* ``max_name_length`` (``int``): Maximum length of a table or column name.
1017-
*See :class:`Buffer`'s constructor for more details.*
1054+
*See Buffer's constructor for more details.*
10181055
10191056
* ``auto_flush`` (``bool`` or ``int``): Whether to automatically flush the
10201057
buffer when it reaches a certain byte-size watermark.
1058+
*Default: 64512 (63KiB).*
10211059
*See above for details.*
10221060
"""
10231061

@@ -1195,7 +1233,7 @@ cdef class Sender:
11951233
if self._buffer is not None:
11961234
self._buffer._row_complete_sender = PyWeakref_NewRef(self, None)
11971235

1198-
def __enter__(self):
1236+
def __enter__(self) -> Sender:
11991237
"""Call :func:`Sender.connect` at the start of a ``with`` block."""
12001238
self.connect()
12011239
return self
@@ -1210,7 +1248,7 @@ cdef class Sender:
12101248
"""
12111249
return str(self._buffer)
12121250

1213-
def __len__(self):
1251+
def __len__(self) -> int:
12141252
"""
12151253
Number of bytes of unsent data in the internal buffer.
12161254

test/test.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,25 @@ def test_column(self):
6868
'col4': 0.5,
6969
'col5': 'val',
7070
'col6': qi.TimestampMicros(12345),
71-
'col7': two_h_after_epoch})
71+
'col7': two_h_after_epoch,
72+
'col8': None})
7273
exp = (
7374
'tbl1 col1=t,col2=f,col3=-1i,col4=0.5,'
7475
'col5="val",col6=12345t,col7=7200000000t\n')
7576
self.assertEqual(str(buf), exp)
7677

78+
def test_none_column(self):
79+
buf = qi.Buffer()
80+
buf.row('tbl1', columns={'col1': 1})
81+
exp = 'tbl1 col1=1i\n'
82+
self.assertEqual(str(buf), exp)
83+
self.assertEqual(len(buf), len(exp))
84+
85+
# No fields to write, no fields written, therefore a no-op.
86+
buf.row('tbl1', columns={'col1': None, 'col2': None})
87+
self.assertEqual(str(buf), exp)
88+
self.assertEqual(len(buf), len(exp))
89+
7790
def test_unicode(self):
7891
buf = qi.Buffer()
7992
buf.row('tbl1', symbols={'questdb1': '❤️'}, columns={'questdb2': '❤️'})
@@ -262,6 +275,17 @@ def test_immediate_auto_flush(self):
262275
msgs = server.recv()
263276
self.assertEqual(msgs, [b'tbl1,sym1=val1'])
264277

278+
def test_auto_flush_on_closed_socket(self):
279+
with Server() as server:
280+
with qi.Sender('localhost', server.port, auto_flush=True) as sender:
281+
server.accept()
282+
server.close()
283+
exp_err = 'Could not flush buffer'
284+
with self.assertRaisesRegexp(qi.IngressError, exp_err):
285+
for _ in range(1000):
286+
time.sleep(0.01)
287+
sender.row('tbl1', symbols={'a': 'b'})
288+
265289
def test_dont_auto_flush(self):
266290
with Server() as server:
267291
with qi.Sender('localhost', server.port, auto_flush=0) as sender:

0 commit comments

Comments
 (0)