Skip to content

feat: HTTP SenderPool with asyncio support #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 247 additions & 0 deletions src/questdb/ingress.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ __all__ = [
'TimestampMicros',
'TimestampNanos',
'TlsCa',
'SenderPool',
'AsyncTransaction',
]

# For prototypes: https://github.com/cython/cython/tree/master/Cython/Includes
Expand Down Expand Up @@ -82,6 +84,11 @@ import pathlib
import sys
import os

import concurrent.futures
import asyncio
from queue import Queue, Full, Empty
import threading


# This value is automatically updated by the `bump2version` tool.
# If you need to update it, also update the search definition in
Expand Down Expand Up @@ -2531,3 +2538,243 @@ cdef class Sender:
self._close()
free(self._last_flush_ms)


class AsyncTransaction:
"""
A :class:`buffer <questdb.ingress.Buffer>` restricted to a single table,
ensuring it can be flushed transactionally.

Use in conjunction with :class:`SenderPool` to send data to QuestDB
asynchronously.
"""
def __init__(self, pool: SenderPool, buffer: Buffer, table_name: str):
self._pool = pool # TODO: weakref
self._table_name = table_name
self._buffer = buffer
if self._buffer is None:
raise ValueError('buffer cannot be None')
if len(self._buffer) > 0:
raise ValueError('buffer must be cleared')
self._entered = False

def dataframe(
self,
df,
*,
symbols: str | bool | List[int] | List[str] = 'auto',
at: ServerTimestamp | int | str | TimestampNanos | datetime):
if self._buffer is None:
raise ValueError('buffer has already been flushed, obtain a new one from the pool')
self._buffer.dataframe(df, table_name=self._table_name, symbols=symbols, at=at)
return self

def row(
self,
*,
symbols: Dict[str, str] | None = None,
columns: Dict[str, bool | int | float | str | TimestampMicros | datetime] | None = None,
at: TimestampNanos | datetime | ServerTimestamp) -> 'AsyncTransaction':
if self._buffer is None:
raise ValueError('buffer has already been flushed, obtain a new one from the pool')
self._buffer.row(self._table_name, symbols=symbols, columns=columns, at=at)
return self

def __str__(self) -> str:
return str(self._buffer)

def __len__(self) -> int:
return len(self._buffer)

def commit_fut(self) -> concurrent.future.Future[None]:
pool = self._pool
self._pool = None
buffer = self._buffer
self._buffer = None
if pool is None:
raise ValueError('transaction has already been committed')
return pool._thread_pool.submit(pool._flush, buffer)

# async - despite `async lacking in signature`
def commit(self) -> asyncio.Future[None]:
return asyncio.wrap_future(
self.commit_fut(),
loop=asyncio.get_event_loop())

def rollback(self):
pool = self._pool
self._pool = None
buffer = self._buffer
self._buffer = None
if pool is not None:
pool._add_buffer_to_free_list(buffer)

async def __aenter__(self):
if self._entered:
raise IngressError(
IngressErrorCode.InvalidApiCall,
'transaction already entered')
self._entered = True
return self

def __aexit__(self, exc_type, exc_val, _exc_tb):
if not self._entered:
raise IngressError(
IngressErrorCode.InvalidApiCall,
'transaction not entered')
if exc_type is None:
return self.commit()
else:
self.rollback()
loop = asyncio.get_event_loop()
future = loop.create_future()
future.set_exception(exc_val)
return future



class SenderPool:
"""
A pool of Senders that can be used asynchronously to send data to QuestDB.

.. code-block:: python

import pandas as pd
from questdb.ingress.pool import SenderPool, TimestampNanos

with SenderPool('http::addr=localhost:9000;') as pool:
txn1 = pool.transaction('my_table')
txn1.row(columns={'a': 1, 'b': 2}, at=TimestampNanos.now())
txn1.row(columns={'a': 3, 'b': 4}, at=TimestampNanos.now())

df = pd.DataFrame({
'timestamp': pd.to_datetime([
'2021-01-01T00:00:00', '2021-01-01T00:00:01']),
'a': [1, 3],
'b': [2, 4]})
txn2 = pool.transaction('another_table')
txn2.dataframe(df, timestamp='timestamp')

# Send the buffers asynchronously in parallel
f1 = txn1.commit()
f2 = txn2.commit()

# Wait for both to complete, raising any exceptions on error
try:
await f1
await f2
except IngressError as e:
...

If you don't have an async context, use `txn.commit_fut()` to get a
`concurrent.futures.Future` instead of an `asyncio.Future`.

Alternatively, the transaction itself can be an async context manager:

.. code-block:: python

with SenderPool('http::addr=localhost:9000;') as pool:
async with pool.transaction('my_table') as txn:
txn.row(columns={'a': 1, 'b': 2}, at=TimestampNanos.now())
txn.row(columns={'a': 3, 'b': 4}, at=TimestampNanos.now())
"""
def __init__(
self,
conf: str,
max_workers: Optional[int] = None,
max_free_buffers: Optional[int] = None):
"""
Create a pool of Senders that can be used asynchronously to send data to QuestDB.

:param conf: the configuration string for each Sender in the pool
:param max_workers: the maximum number of workers in the pool, if None defaults to min(32, os.cpu_count() + 4)
:param max_free_buffers: the maximum number of buffers to keep in the pool for reuse, if None defaults to 2 * max_workers
"""
self._conf = conf
if max_workers is None:
# Same logic as for ThreadPoolExecutor
self._max_workers = min(32, (os.cpu_count() or 1) + 4)
else:
self._max_workers = int(max_workers)
if self._max_workers < 1:
raise ValueError(
'SenderPool requires at least one worker')
if max_free_buffers is None:
self._max_free_buffers = 2 * self._max_workers
else:
self._max_free_buffers = int(max_free_buffers)
if self._max_free_buffers < 0:
raise ValueError(
'SenderPool max_free_buffers can\'t be negative')

if not conf.startswith("http"):
raise IngressError(
IngressErrorCode.ConfigError,
'SenderPool only supports "http" and "https" protocols')
self._thread_pool = None
self._buffer_provisioner_sender = None
self._buffer_free_list = None
self._executor_thread_local = None

def create(self):
"""
Create the pool of Senders.
"""
self._thread_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=self._max_workers)
self._buffer_provisioner_sender = Sender.from_conf(self._conf)
try:
self._buffer_provisioner_sender.establish()
except:
self._buffer_provisioner_sender.close()
self._buffer_provisioner_sender = None
raise
self._buffer_free_list = Queue(self._max_free_buffers)
self._executor_thread_local = threading.local()

def __enter__(self):
self.create()
return self

def close(self):
if self._thread_pool is not None:
self._thread_pool.shutdown()
self._thread_pool = None
if self._buffer_provisioner_sender is not None:
self._buffer_provisioner_sender.close()
self._buffer_provisioner_sender = None
self._buffer_free_list = None

def __exit__(self, _exc_type, _exc_val, _exc_tb):
self.close()

def transaction(self, table_name: str):
# TODO: Work out the thread safety details of this method.
try:
buf = self._buffer_free_list.get_nowait()
except Empty:
buf = self._buffer_provisioner_sender.new_buffer()
return AsyncTransaction(self, buf, table_name)

def _add_buffer_to_free_list(self, buffer):
if buffer is None:
return
buffer.clear()
free_list = self._buffer_free_list
if free_list is None:
return
try:
free_list.put_nowait(buffer)
except Full:
pass # drop the buffer, too many in free list

def _flush(self, buffer):
try:
sender = self._executor_thread_local.sender
except AttributeError:
sender = Sender.from_conf(self._conf)
sender.establish() # will be closed by __del__
self._executor_thread_local.sender = sender
try:
sender.flush(buffer, clear=False, transactional=True)
finally:
self._add_buffer_to_free_list(buffer)
Loading
Loading