Skip to content

Generic flat response implementation #278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
3 changes: 3 additions & 0 deletions example/cpp20_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace asio = boost::asio;
using namespace std::chrono_literals;
using boost::redis::request;
using boost::redis::generic_response;
using boost::redis::generic_flat_response;
using boost::redis::consume_one;
using boost::redis::logger;
using boost::redis::config;
Expand Down Expand Up @@ -54,6 +55,8 @@ auto receiver(std::shared_ptr<connection> conn) -> asio::awaitable<void>
request req;
req.push("SUBSCRIBE", "channel");

// Alternatively, you can use generic_flat_response here, but keep in mind
// that to access elements you need to call .view() on resp.value()
generic_response resp;
conn->set_receive_response(resp);

Expand Down
9 changes: 6 additions & 3 deletions include/boost/redis/adapter/any_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ namespace boost::redis {
*/
class any_adapter {
public:
using fn_type = std::function<void(std::size_t, resp3::node_view const&, system::error_code&)>;
using adapt_fn_type = std::function<
void(std::size_t, resp3::node_view const&, system::error_code&)>;
using done_fn_type = std::function<void()>;

struct impl_t {
fn_type adapt_fn;
adapt_fn_type adapt_fn;
done_fn_type prepare_done_fn;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to avoid the creation of two std::function for each request (i.e. async_exec calls) since each one represents a potential dynamic allocation. In this case however I think we can't do much better because inside async_exec we only have access to the type erased adapter i.e. std::function and therefore we can't call adapter.set_done() for example. I think the prepare_done_fn callback is not critical though since it is small and the dynamic allocation is likely to be optimized away with SOO (small object optimization). @anarthal What do you think?

Copy link
Author

@D0zee D0zee Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sizeof(generic_flat_response*)=8
sizeof(generic_flat_response&)=64
sizeof([](){})=1
  • If response type is generic_flat_response we will return lambda with captured reference ([res]()mutable{}). The size of this lambda is 64 bytes, std::function will definitely keep it on heap. I suggest to capture and pass the pointergeneric_flat_response*. Its size is 8 bytes and in this case SOO will take place during creation of std::function in any_adapter. As a result we have no heap allocations in this case.

  • If the response type is any other SOO will take a place by default due to the fact that size of empty lambda is 1 byte.

Source: https://stackoverflow.com/a/57049013

Copy link
Collaborator

@mzimbres mzimbres Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been thinking about this and there seems to be a simpler way of avoiding the extra std::function that I haven't considered earlier. First we define a special node value in node.hpp

constexpr resp3::node_view done_node{resp3::type::invalid, -1, -1, {}};

then call the adapter with this node when the parser is done here

template <class Adapter>
bool parse(resp3::parser& p, std::string_view const& msg, Adapter& adapter, system::error_code& ec)
{
   while (!p.done()) {
      ...
   }

   // ---> New adapter call here to inform parsing is finished.
   adapter(std::make_optional<resp3::parser::result>(done_node), system::error_code());

   return true;
}

Then call set_views() on the adapter when this node is detected here

template <>
class general_aggregate<result<flat_response_value>> {
private:
   result<flat_response_value>* result_;

public:
   template <class String>
   void operator()(resp3::basic_node<String> const& nd, system::error_code&)
   {
      // ---> Check whether done here.
      if (nd == done_node) {
         result_->value().set_views();
         return;
      }

      ...
   }
};

I totally missed this possibility earlier but it looks cleaner than adding a new std::function? Do you mind trying this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it looks much better and we don't have to care about heap allocations. I will try it!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct that this case must be handled by others adapters as well?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @D0zee, yes other adapters will have to handle this as well, for example

if (nd == done_node)
    return;

I forgot to say this in my previous comment.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi again, there is still one problem to solve. The way I suggested above will cause the set_view to be called multiple times, once for each request in the pipeline. That is because the same flat response can be used to store the response of multiple requests. I am still not sure what is the simplest way to avoid that, it can be solved by adding state to set_views() so that it does not traverse what has already been traversed.

Or perhaps there is a way to wrap the adapters at other location such as here

   template <class T>
   static auto create_impl(T& resp) -> impl_t
   {
      using namespace boost::redis::adapter;
      auto adapter = boost_redis_adapt(resp);
      std::size_t size = adapter.get_supported_response_size();
      return {std::move(adapter), size};
   }

or here

   adapter_ = [this, adapter](resp3::node_view const& nd, system::error_code& ec) {
      auto const i = req_->get_expected_responses() - remaining_responses_;
      adapter(i, nd, ec);
   };

so that only the last call to adapter(done_node, ec) triggers a set_view(). I think however this might be too complicated and perhaps it just simpler to let each call to set_view to traverse only what has not been set yet, as suggested above.

If I have time I will review all of this to see if there is any simplification possible.

std::size_t supported_response_size;
} impl_;

Expand All @@ -47,7 +50,7 @@ class any_adapter {
using namespace boost::redis::adapter;
auto adapter = boost_redis_adapt(resp);
std::size_t size = adapter.get_supported_response_size();
return {std::move(adapter), size};
return {std::move(adapter), detail::prepare_done(&resp), size};
}

template <class Executor>
Expand Down
61 changes: 61 additions & 0 deletions include/boost/redis/adapter/detail/adapters.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
#include <boost/redis/resp3/node.hpp>
#include <boost/redis/resp3/serialization.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/response.hpp>

#include <boost/assert.hpp>

#include <array>
#include <charconv>
#include <deque>
#include <forward_list>
#include <iostream>
#include <list>
#include <map>
#include <optional>
Expand Down Expand Up @@ -138,6 +140,24 @@ void boost_redis_from_bulk(T& t, resp3::basic_node<String> const& node, system::

//================================================

template <typename T>
inline auto prepare_done(T*) noexcept
{
return [] { };
}

template <>
inline auto prepare_done(generic_flat_response* resp) noexcept
{
return [resp] {
if (resp->has_value()) {
resp->value().set_view();
}
};
}

//================================================

template <class Result>
class general_aggregate {
private:
Expand Down Expand Up @@ -170,6 +190,47 @@ class general_aggregate {
}
};

template <>
class general_aggregate<result<flat_response_value>> {
private:
result<flat_response_value>* result_;

public:
explicit general_aggregate(result<flat_response_value>* c = nullptr)
: result_(c)
{ }
template <class String>
void operator()(resp3::basic_node<String> const& nd, system::error_code&)
{
BOOST_ASSERT_MSG(!!result_, "Unexpected null pointer");
switch (nd.data_type) {
case resp3::type::blob_error:
case resp3::type::simple_error:
*result_ = error{
nd.data_type,
std::string{std::cbegin(nd.value), std::cend(nd.value)}
};
break;
default:
auto& data = result_->value().data_;

resp3::offset_string offset_string;
offset_string.offset = data.size();
offset_string.size = nd.value.size();

data.append(nd.value.data(), nd.value.size());

resp3::offset_node new_node;
new_node.data_type = nd.data_type;
new_node.aggregate_size = nd.aggregate_size;
new_node.depth = nd.depth;
new_node.value = std::move(offset_string);

result_->value().view_.push_back(std::move(new_node));
}
}
};

template <class Node>
class general_simple {
private:
Expand Down
8 changes: 8 additions & 0 deletions include/boost/redis/adapter/detail/response_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ struct response_traits<response<Ts...>> {
static auto adapt(response_type& r) noexcept { return adapter_type{r}; }
};

template <>
struct response_traits<generic_flat_response> {
using response_type = generic_flat_response;
using adapter_type = vector_adapter<response_type>;

static auto adapt(response_type& v) noexcept { return adapter_type{v}; }
};

template <class Adapter>
class wrapper {
public:
Expand Down
8 changes: 8 additions & 0 deletions include/boost/redis/adapter/detail/result_traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <boost/redis/error.hpp>
#include <boost/redis/ignore.hpp>
#include <boost/redis/resp3/type.hpp>
#include <boost/redis/response.hpp>

#include <boost/mp11.hpp>

Expand Down Expand Up @@ -62,6 +63,13 @@ struct result_traits<result<std::vector<resp3::basic_node<String>, Allocator>>>
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};

template <>
struct result_traits<generic_flat_response> {
using response_type = generic_flat_response;
using adapter_type = adapter::detail::general_aggregate<response_type>;
static auto adapt(response_type& v) noexcept { return adapter_type{&v}; }
};

template <class T>
using adapter_t = typename result_traits<std::decay_t<T>>::adapter_type;

Expand Down
3 changes: 2 additions & 1 deletion include/boost/redis/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,8 @@ class basic_connection {
1);
auto info = detail::make_elem(req, std::move(adapter_impl.adapt_fn));

info->set_done_callback([notifier]() {
info->set_done_callback([notifier, prepare_done = std::move(adapter_impl.prepare_done_fn)]() {
prepare_done();
notifier->try_send(std::error_code{}, 0);
});

Expand Down
39 changes: 34 additions & 5 deletions include/boost/redis/impl/response.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,30 @@

namespace boost::redis {

void consume_one(generic_response& r, system::error_code& ec)
namespace {
template <typename Container>
auto& get_value(Container& c)
{
return c;
}

template <>
auto& get_value(flat_response_value& c)
{
return c.view();
}

template <typename Response>
void consume_one_impl(Response& r, system::error_code& ec)
{
if (r.has_error())
return; // Nothing to consume.

if (std::empty(r.value()))
auto& value = get_value(r.value());
if (std::empty(value))
return; // Nothing to consume.

auto const depth = r.value().front().depth;
auto const depth = value.front().depth;

// To simplify we will refuse to consume any data-type that is not
// a root node. I think there is no use for that and it is complex
Expand All @@ -33,11 +48,17 @@ void consume_one(generic_response& r, system::error_code& ec)
return e.depth == depth;
};

auto match = std::find_if(std::next(std::cbegin(r.value())), std::cend(r.value()), f);
auto match = std::find_if(std::next(std::cbegin(value)), std::cend(value), f);

r.value().erase(std::cbegin(r.value()), match);
value.erase(std::cbegin(value), match);
}

} // namespace

void consume_one(generic_response& r, system::error_code& ec) { consume_one_impl(r, ec); }

void consume_one(generic_flat_response& r, system::error_code& ec) { consume_one_impl(r, ec); }

void consume_one(generic_response& r)
{
system::error_code ec;
Expand All @@ -46,4 +67,12 @@ void consume_one(generic_response& r)
throw system::system_error(ec);
}

void consume_one(generic_flat_response& r)
{
system::error_code ec;
consume_one(r, ec);
if (ec)
throw system::system_error(ec);
}

} // namespace boost::redis
12 changes: 12 additions & 0 deletions include/boost/redis/resp3/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ using node = basic_node<std::string>;
/// A node in the response tree that does not own its data.
using node_view = basic_node<std::string_view>;

struct offset_string {
std::string_view data;
std::size_t offset{};
std::size_t size{};

operator std::string() const { return std::string{data}; }
};

inline std::ostream& operator<<(std::ostream& os, offset_string const& s) { return os << s.data; }

using offset_node = basic_node<offset_string>;

} // namespace boost::redis::resp3

#endif // BOOST_REDIS_RESP3_NODE_HPP
64 changes: 63 additions & 1 deletion include/boost/redis/response.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,64 @@ using response = std::tuple<adapter::result<Ts>...>;
*/
using generic_response = adapter::result<std::vector<resp3::node>>;

/**
* Forward declaration to allow friendship with the template class
* that manages filling of flat_response_value.
*/
namespace adapter::detail {
template <typename Result>
class general_aggregate;
}

struct flat_response_value {
public:
/// Reserve capacity for nodes and data storage.
void reserve(std::size_t num_nodes, std::size_t string_size)
{
data_.reserve(num_nodes * string_size);
view_.reserve(num_nodes);
}

void clear()
{
data_.clear();
view_.clear();
}

std::size_t size() const noexcept { return view_.size(); }
bool empty() noexcept { return view_.empty(); }

resp3::offset_node& at(std::size_t index) { return view_.at(index); }
resp3::offset_node const& at(std::size_t index) const { return view_.at(index); }

std::vector<resp3::offset_node> const& view() const { return view_; }
std::vector<resp3::offset_node>& view() { return view_; }

void set_view()
{
for (auto& node : view_) {
auto& offset_string = node.value;
offset_string.data = std::string_view{
data_.data() + offset_string.offset,
offset_string.size};
}
}

private:
template <typename T>
friend class adapter::detail::general_aggregate;

std::string data_;
std::vector<resp3::offset_node> view_;
};

/** @brief A memory-efficient generic response to a request.
* @ingroup high-level-api
*
* Uses a compact buffer to store RESP3 data with reduced allocations.
*/
using generic_flat_response = adapter::result<flat_response_value>;

/** @brief Consume on response from a generic response
*
* This function rotates the elements so that the start of the next
Expand Down Expand Up @@ -72,12 +130,16 @@ using generic_response = adapter::result<std::vector<resp3::node>>;
*/
void consume_one(generic_response& r, system::error_code& ec);

/// Consume on response from a generic flat response
void consume_one(generic_flat_response& r, system::error_code& ec);

/**
* @brief Throwing overload of `consume_one`.
* @brief Throwing overloads of `consume_one`.
*
* @param r The response to modify.
*/
void consume_one(generic_response& r);
void consume_one(generic_flat_response& r);

} // namespace boost::redis

Expand Down
3 changes: 3 additions & 0 deletions test/test_any_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <boost/test/included/unit_test.hpp>

using boost::redis::generic_response;
using boost::redis::generic_flat_response;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::any_adapter;
Expand All @@ -23,10 +24,12 @@ BOOST_AUTO_TEST_CASE(any_adapter_response_types)
response<int> r1;
response<int, std::string> r2;
generic_response r3;
generic_flat_response r4;

BOOST_CHECK_NO_THROW(any_adapter{r1});
BOOST_CHECK_NO_THROW(any_adapter{r2});
BOOST_CHECK_NO_THROW(any_adapter{r3});
BOOST_CHECK_NO_THROW(any_adapter{r4});
BOOST_CHECK_NO_THROW(any_adapter{ignore});
}

Expand Down
Loading