C++ client: add thread-safe SessionPool, enable RPC compression, and harden buffers#17800
C++ client: add thread-safe SessionPool, enable RPC compression, and harden buffers#17800SpriCoder wants to merge 9 commits into
Conversation
The enableRPCCompression option set via Session::open(bool) or the session builder was never propagated to SessionConnection, whose flag was hardcoded to false, so the compact Thrift protocol never took effect. Thread the flag from the builder/open() into both the data SessionConnection and the node-discovery NodesSupplier client so compression actually applies.
Tablet::addValue and the OBJECT-value overload formatted out-of-range diagnostics with sprintf into a fixed 100-byte stack buffer, risking an overflow. Switch to snprintf bounded by sizeof(buffer) and cast the size_t arguments to long to match the %ld format.
On big-endian hosts MyStringBuffer::putOrderedByte used str.assign, which replaced the whole buffer with each numeric write and corrupted previously serialized content. Use str.append so bytes accumulate, matching the little-endian path.
Introduce SessionPool and SessionPoolBuilder so multiple threads can share a bounded set of connections without external locking. A single Session is not safe to use concurrently, so the pool lends each Session to one borrower at a time via an RAII PooledSession handle and reclaims it on scope exit. Sessions are created outside the lock to avoid blocking other borrowers during the handshake, and getSession() blocks up to a configurable timeout when the pool is exhausted. Query results are returned as a PooledSessionDataSet that keeps the Session leased until the result set is fully read, since SessionDataSet lazily fetches further blocks over the same connection. Connections that raise IoTDBConnectionException are evicted rather than recycled. Add integration tests covering basic borrow/insert/query, concurrent writers, and exhaustion-timeout behavior.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #17800 +/- ##
============================================
+ Coverage 40.82% 40.85% +0.03%
+ Complexity 2611 2610 -1
============================================
Files 5184 5186 +2
Lines 350965 351140 +175
Branches 44895 44929 +34
============================================
+ Hits 143276 143460 +184
+ Misses 207689 207680 -9 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| SessionPool::SessionPool(std::string host, int rpcPort, std::string username, std::string password, | ||
| size_t maxSize) | ||
| : host_(std::move(host)), rpcPort_(rpcPort), username_(std::move(username)), | ||
| password_(std::move(password)), maxSize_(maxSize == 0 ? 1 : maxSize) {} |
There was a problem hiding this comment.
Good catch. Since maxSize is size_t (unsigned), <= 0 reduces to == 0 and would trigger a tautological-comparison warning under -Wall. Rather than silently clamping an invalid 0 to 1, I changed the constructors to fail fast and throw IoTDBException when maxSize == 0, so the misuse surfaces at construction time. Done in 8ca5a42.
Address review feedback: maxSize is size_t, so a non-positive check reduces to == 0 (and "<= 0" would be a tautological-comparison warning under -Wall). Rather than silently clamping an invalid 0 to 1, fail fast by throwing IoTDBException so the misuse surfaces at construction time.
The pre-test cleanup deleted root.test.pool.* timeseries unconditionally, which threw 508 (does not exist) on a fresh database and failed the new [sessionPool] cases. Ignore that error since the cleanup is best-effort.
This reverts commit 2f35cc5. Honoring the compression flag makes the client negotiate the compact Thrift protocol, which the binary-only IoTDB server used by the C++ integration tests cannot speak, breaking the pre-existing ts_session_open_with_compression smoke test (it had only passed because the flag was a no-op). Compression needs a compact-protocol-enabled test server, so it will be reintroduced in a dedicated PR with the matching server-side test support. SessionPool keeps its compression option for forward compatibility; it is currently a no-op, as the rest of the client has always been.
| lock.unlock(); | ||
| std::shared_ptr<Session> session; | ||
| try { | ||
| session = constructNewSession(); |
There was a problem hiding this comment.
If another thread calls close() while that connection is being built, closed_ becomes true, but the waiter still hands out a brand-new session after open() completes.
There was a problem hiding this comment.
Good catch, that was a real race. Fixed in 7d5fa7e: after constructNewSession() returns I now re-acquire the lock and re-check closed_. If the pool was closed while the connection was being built, the slot is released (--size_), the session is torn down outside the lock, and acquire() throws IoTDBException("SessionPool is closed.") instead of handing it out. The reserved slot is also accounted for correctly: close() only subtracts the idle count, and this in-flight slot is decremented on the closed path.
Address review feedback: acquire() releases the lock while building a new connection, so a concurrent close() could set closed_ after the slot was reserved, and the freshly opened session would still be handed out from a closed pool. Re-check closed_ under the lock after construction; if the pool was closed meanwhile, release the slot, tear the session down outside the lock, and throw instead of returning it.
|



Summary
Improvements to the C++ client (
iotdb-client/client-cpp) along two axes: concurrency and safety/correctness.1. Thread-safe
SessionPool(new feature)A single
Sessionis not safe to use from multiple threads concurrently (it owns one Thrift transport/client). Previously the C++ client had no pooling, forcing callers to either serialize all requests through one session or hand-roll connection management.SessionPool+SessionPoolBuilderlend eachSessionto exactly one borrower at a time and reclaim it afterwards, so many application threads can share a bounded set of connections without external locking.getSession()returns an RAIIPooledSessionhandle that returns the session to the pool on scope exit and works with anySessionmethod; a genericexecute([](Session&){...})plus convenience wrappers (insertTablet,insertRecord,executeQueryStatement, …) cover common operations.getSession()blocks up to a configurable timeout when the pool is exhausted, andmaxSize == 0is rejected at construction.PooledSessionDataSetthat keeps the session leased until the result set is fully read, sinceSessionDataSetlazily fetches further blocks over the same connection. Sessions that raiseIoTDBConnectionExceptionare evicted instead of recycled.2. Safety / correctness hardening
Tablet::addValueformatted out-of-range diagnostics withsprintfinto a fixed 100-byte stack buffer; switched tosnprintfbounded bysizeof(buffer)to remove the overflow risk.MyStringBuffer::putOrderedByteusedstr.assignon big-endian hosts, which overwrote the whole buffer on every numeric write and corrupted previously serialized content; changed tostr.appendto match the little-endian path.Note on RPC compression
An earlier commit wired the (previously ignored)
enableRPCCompressionflag through toSessionConnection. Honoring it makes the client negotiate the compact Thrift protocol, which the binary-only IoTDB server used by the C++ integration tests cannot speak — so it broke the pre-existingts_session_open_with_compressionsmoke test (which had only passed because the flag was a no-op). That change is reverted here and will be reintroduced in a dedicated PR with matching compact-protocol test-server support.SessionPoolkeeps its compression option for forward compatibility (currently a no-op, as the rest of the client has always been).Test plan
g++ -std=c++11 -fsyntax-only, 0 errors); new code conforms to the repo.clang-format(2-space, 100-col).[sessionPool]cleanup threw508on a fresh DB (now tolerated), and the reverted compression change brokets_session_open_with_compression(protocol mismatch). Awaiting the re-run.