Skip to content

BREAKING CHANGE: buffer expose as bytes instead of str #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion c-questdb-client
2 changes: 1 addition & 1 deletion ci/cibuildwheel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ stages:
inputs: {pathtoPublish: 'wheelhouse'}

- job: macos_x64
pool: {vmImage: 'macOS-12'}
pool: {vmImage: 'macOS-13'}
timeoutInMinutes: 90
steps:
- task: UsePythonVersion@0
Expand Down
22 changes: 11 additions & 11 deletions src/questdb/ingress.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,11 @@ class Buffer:
"""
The current number of bytes currently in the buffer.

Equivalent (but cheaper) to ``len(str(sender))``.
Equivalent (but cheaper) to ``len(bytes(buffer))``.
"""

def __str__(self) -> str:
"""Return the constructed buffer as a string. Use for debugging."""
def __bytes__(self) -> bytes:
"""Return the constructed buffer as bytes. Use for debugging."""

def row(
self,
Expand Down Expand Up @@ -941,20 +941,20 @@ class Sender:
def __enter__(self) -> Sender:
"""Call :func:`Sender.establish` at the start of a ``with`` block."""

def __str__(self) -> str:
def __len__(self) -> int:
"""
Inspect the contents of the internal buffer.

The ``str`` value returned represents the unsent data.
Number of bytes of unsent data in the internal buffer.

Also see :func:`Sender.__len__`.
Equivalent (but cheaper) to ``len(bytes(sender))``.
"""

def __len__(self) -> int:
def __bytes__(self) -> bytes:
"""
Number of bytes of unsent data in the internal buffer.
Inspect the contents of the internal buffer.

Equivalent (but cheaper) to ``len(str(sender))``.
The ``bytes`` value returned represents the unsent data.

Also see :func:`Sender.__len__`.
"""

def transaction(self, table_name: str) -> SenderTransaction:
Expand Down
24 changes: 12 additions & 12 deletions src/questdb/ingress.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ from enum import Enum
from typing import List, Tuple, Dict, Union, Any, Optional, Callable, \
Iterable
import pathlib
from cpython.bytes cimport PyBytes_FromStringAndSize

import sys
import os
Expand Down Expand Up @@ -825,18 +826,17 @@ cdef class Buffer:
"""
The current number of bytes currently in the buffer.

Equivalent (but cheaper) to ``len(str(sender))``.
Equivalent (but cheaper) to ``len(bytes(buffer))``.
"""
return line_sender_buffer_size(self._impl)

def __str__(self) -> str:
"""Return the constructed buffer as a string. Use for debugging."""
return self._to_str()
def __bytes__(self) -> bytes:
"""Return the constructed buffer as bytes. Use for debugging."""
return self._to_bytes()

cdef inline object _to_str(self):
cdef size_t size = 0
cdef const char* utf8 = line_sender_buffer_peek(self._impl, &size)
return PyUnicode_FromStringAndSize(utf8, <Py_ssize_t>size)
cdef inline object _to_bytes(self):
cdef line_sender_buffer_view view = line_sender_buffer_peek(self._impl)
return PyBytes_FromStringAndSize(<const char *> view.buf, <Py_ssize_t> view.len)

cdef inline void_int _set_marker(self) except -1:
cdef line_sender_error* err = NULL
Expand Down Expand Up @@ -2281,21 +2281,21 @@ cdef class Sender:
self.establish()
return self

def __str__(self) -> str:
def __bytes__(self) -> bytes:
"""
Inspect the contents of the internal buffer.

The ``str`` value returned represents the unsent data.
The ``bytes`` value returned represents the unsent data.

Also see :func:`Sender.__len__`.
"""
return str(self._buffer)
return bytes(self._buffer)

def __len__(self) -> int:
"""
Number of bytes of unsent data in the internal buffer.

Equivalent (but cheaper) to ``len(str(sender))``.
Equivalent (but cheaper) to ``len(bytes(sender))``.
"""
return len(self._buffer)

Expand Down
11 changes: 7 additions & 4 deletions src/questdb/line_sender.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
##
################################################################################

from libc.stdint cimport int64_t, uint16_t, uint64_t
from libc.stdint cimport int64_t, uint16_t, uint64_t, uint8_t

cdef extern from "questdb/ingress/line_sender.h":
cdef struct line_sender_error:
Expand Down Expand Up @@ -102,6 +102,10 @@ cdef extern from "questdb/ingress/line_sender.h":
size_t len
const char* buf

cdef struct line_sender_buffer_view:
size_t len
const uint8_t* buf

bint line_sender_column_name_init(
line_sender_column_name* name,
size_t len,
Expand Down Expand Up @@ -171,9 +175,8 @@ cdef extern from "questdb/ingress/line_sender.h":
const line_sender_buffer* buffer
) noexcept nogil

const char* line_sender_buffer_peek(
const line_sender_buffer* buffer,
size_t* len_out
line_sender_buffer_view line_sender_buffer_peek(
const line_sender_buffer* buffer
) noexcept nogil

bint line_sender_buffer_table(
Expand Down
71 changes: 35 additions & 36 deletions test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_basic(self):
buf = qi.Buffer()
buf.row('tbl1', symbols={'sym1': 'val1', 'sym2': 'val2'}, at=qi.ServerTimestamp)
self.assertEqual(len(buf), 25)
self.assertEqual(str(buf), 'tbl1,sym1=val1,sym2=val2\n')
self.assertEqual(bytes(buf), b'tbl1,sym1=val1,sym2=val2\n')

def test_bad_table(self):
buf = qi.Buffer()
Expand All @@ -92,7 +92,7 @@ def test_bad_table(self):
def test_symbol(self):
buf = qi.Buffer()
buf.row('tbl1', symbols={'sym1': 'val1', 'sym2': 'val2'}, at=qi.ServerTimestamp)
self.assertEqual(str(buf), 'tbl1,sym1=val1,sym2=val2\n')
self.assertEqual(bytes(buf), b'tbl1,sym1=val1,sym2=val2\n')

def test_bad_symbol_column_name(self):
buf = qi.Buffer()
Expand Down Expand Up @@ -121,38 +121,38 @@ def test_column(self):
'col7': two_h_after_epoch,
'col8': None}, at=qi.ServerTimestamp)
exp = (
'tbl1 col1=t,col2=f,col3=-1i,col4=0.5,'
'col5="val",col6=12345t,col7=7200000000t\n')
self.assertEqual(str(buf), exp)
b'tbl1 col1=t,col2=f,col3=-1i,col4=0.5,'
b'col5="val",col6=12345t,col7=7200000000t\n')
self.assertEqual(bytes(buf), exp)

def test_none_symbol(self):
buf = qi.Buffer()
buf.row('tbl1', symbols={'sym1': 'val1', 'sym2': None}, at=qi.ServerTimestamp)
exp = 'tbl1,sym1=val1\n'
self.assertEqual(str(buf), exp)
exp = b'tbl1,sym1=val1\n'
self.assertEqual(bytes(buf), exp)
self.assertEqual(len(buf), len(exp))

# No fields to write, no fields written, therefore a no-op.
buf.row('tbl1', symbols={'sym1': None, 'sym2': None}, at=qi.ServerTimestamp)
self.assertEqual(str(buf), exp)
self.assertEqual(bytes(buf), exp)
self.assertEqual(len(buf), len(exp))

def test_none_column(self):
buf = qi.Buffer()
buf.row('tbl1', columns={'col1': 1}, at=qi.ServerTimestamp)
exp = 'tbl1 col1=1i\n'
self.assertEqual(str(buf), exp)
exp = b'tbl1 col1=1i\n'
self.assertEqual(bytes(buf), exp)
self.assertEqual(len(buf), len(exp))

# No fields to write, no fields written, therefore a no-op.
buf.row('tbl1', columns={'col1': None, 'col2': None}, at=qi.ServerTimestamp)
self.assertEqual(str(buf), exp)
self.assertEqual(bytes(buf), exp)
self.assertEqual(len(buf), len(exp))

def test_no_symbol_or_col_args(self):
buf = qi.Buffer()
buf.row('table_name', at=qi.ServerTimestamp)
self.assertEqual(str(buf), '')
self.assertEqual(bytes(buf), b'')

def test_unicode(self):
buf = qi.Buffer()
Expand All @@ -171,15 +171,15 @@ def test_unicode(self):
'questdb2': '嚜꓂', # UCS-2, 3 bytes for UTF-8.
'questdb3': '💩🦞'},
at=qi.ServerTimestamp) # UCS-4, 4 bytes for UTF-8.
self.assertEqual(str(buf),
f'tbl1,questdb1=q❤️p questdb2="{"❤️" * 1200}"\n' +
self.assertEqual(bytes(buf),
(f'tbl1,questdb1=q❤️p questdb2="{"❤️" * 1200}"\n' +
'tbl1,Questo\\ è\\ il\\ nome\\ di\\ una\\ colonna=' +
'Це\\ символьне\\ значення ' +
'questdb1="",questdb2="嚜꓂",questdb3="💩🦞"\n')
'questdb1="",questdb2="嚜꓂",questdb3="💩🦞"\n').encode('utf-8'))

buf.clear()
buf.row('tbl1', symbols={'questdb1': 'q❤️p'}, at=qi.ServerTimestamp)
self.assertEqual(str(buf), 'tbl1,questdb1=q❤️p\n')
self.assertEqual(bytes(buf), 'tbl1,questdb1=q❤️p\n'.encode('utf-8'))

# A bad char in Python.
with self.assertRaisesRegex(
Expand All @@ -191,30 +191,30 @@ def test_unicode(self):
# Ensure we can continue using the buffer after an error.
buf.row('tbl1', symbols={'questdb1': 'another line of input'}, at=qi.ServerTimestamp)
self.assertEqual(
str(buf),
'tbl1,questdb1=q❤️p\n' +
bytes(buf),
('tbl1,questdb1=q❤️p\n' +
# Note: No partially written failed line here.
'tbl1,questdb1=another\\ line\\ of\\ input\n')
'tbl1,questdb1=another\\ line\\ of\\ input\n').encode('utf-8'))

def test_float(self):
buf = qi.Buffer()
buf.row('tbl1', columns={'num': 1.2345678901234567}, at=qi.ServerTimestamp)
self.assertEqual(str(buf), f'tbl1 num=1.2345678901234567\n')
self.assertEqual(bytes(buf), f'tbl1 num=1.2345678901234567\n'.encode('utf-8'))

def test_int_range(self):
buf = qi.Buffer()
buf.row('tbl1', columns={'num': 0}, at=qi.ServerTimestamp)
self.assertEqual(str(buf), f'tbl1 num=0i\n')
self.assertEqual(bytes(buf), f'tbl1 num=0i\n'.encode('utf-8'))
buf.clear()

# 32-bit int range.
buf.row('tbl1', columns={'min': -2 ** 31, 'max': 2 ** 31 - 1}, at=qi.ServerTimestamp)
self.assertEqual(str(buf), f'tbl1 min=-2147483648i,max=2147483647i\n')
self.assertEqual(bytes(buf), f'tbl1 min=-2147483648i,max=2147483647i\n'.encode('utf-8'))
buf.clear()

# 64-bit int range.
buf.row('tbl1', columns={'min': -2 ** 63, 'max': 2 ** 63 - 1}, at=qi.ServerTimestamp)
self.assertEqual(str(buf), f'tbl1 min=-9223372036854775808i,max=9223372036854775807i\n')
self.assertEqual(bytes(buf), f'tbl1 min=-9223372036854775808i,max=9223372036854775807i\n'.encode('utf-8'))
buf.clear()

# Overflow.
Expand Down Expand Up @@ -356,9 +356,9 @@ def test_flush_1(self):
server.accept()
with self.assertRaisesRegex(qi.IngressError, 'Column names'):
sender.row('tbl1', symbols={'...bad name..': 'val1'}, at=qi.ServerTimestamp)
self.assertEqual(str(sender), '')
self.assertEqual(bytes(sender), b'')
sender.flush()
self.assertEqual(str(sender), '')
self.assertEqual(bytes(sender), b'')
msgs = server.recv()
self.assertEqual(msgs, [])

Expand Down Expand Up @@ -423,7 +423,7 @@ def test_two_rows_explicit_buffer(self):
exp = (
'line_sender_buffer_example2,id=Hola price="111222233333i",qty=3.5 111222233333\n'
'line_sender_example,id=Adios price="111222233343i",qty=2.5 111222233343\n')
self.assertEqual(str(buffer), exp)
self.assertEqual(bytes(buffer), exp.encode('utf-8'))
sender.flush(buffer)
msgs = server.recv()
bexp = [msg.encode('utf-8') for msg in exp.rstrip().split('\n')]
Expand All @@ -432,9 +432,8 @@ def test_two_rows_explicit_buffer(self):
def test_independent_buffer(self):
buf = qi.Buffer()
buf.row('tbl1', symbols={'sym1': 'val1'}, at=qi.ServerTimestamp)
exp = 'tbl1,sym1=val1\n'
bexp = exp[:-1].encode('utf-8')
self.assertEqual(str(buf), exp)
exp = b'tbl1,sym1=val1\n'
self.assertEqual(bytes(buf), exp)

with Server() as server1, Server() as server2:
with self.builder('tcp', 'localhost', server1.port) as sender1, \
Expand All @@ -443,21 +442,21 @@ def test_independent_buffer(self):
server2.accept()

sender1.flush(buf, clear=False)
self.assertEqual(str(buf), exp)
self.assertEqual(bytes(buf), exp)

sender2.flush(buf, clear=False)
self.assertEqual(str(buf), exp)
self.assertEqual(bytes(buf), exp)

msgs1 = server1.recv()
msgs2 = server2.recv()
self.assertEqual(msgs1, [bexp])
self.assertEqual(msgs2, [bexp])
self.assertEqual(msgs1, [exp[:-1]])
self.assertEqual(msgs2, [exp[:-1]])

sender1.flush(buf)
self.assertEqual(server1.recv(), [bexp])
self.assertEqual(server1.recv(), [exp[:-1]])

# The buffer is now auto-cleared.
self.assertEqual(str(buf), '')
self.assertEqual(bytes(buf), b'')

def test_auto_flush_settings_defaults(self):
for protocol in ('tcp', 'tcps', 'http', 'https'):
Expand Down Expand Up @@ -560,7 +559,7 @@ def test_dont_flush_on_exception(self):
with self.builder('tcp', 'localhost', server.port) as sender:
server.accept()
sender.row('tbl1', symbols={'sym1': 'val1'}, at=qi.ServerTimestamp)
self.assertEqual(str(sender), 'tbl1,sym1=val1\n')
self.assertEqual(bytes(sender), b'tbl1,sym1=val1\n')
raise RuntimeError('Test exception')
msgs = server.recv()
self.assertEqual(msgs, [])
Expand Down
Loading