Skip to content

Commit 45294f8

Browse files
committed
Fix reply batching.
1 parent 6c5e4da commit 45294f8

8 files changed

+46
-17
lines changed

README.md

-2
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,3 @@ cd build-opt && ninja midi-redis
2626
```
2727

2828
for more options, run `./midi-redis --help`
29-
30-

server/dragonfly_connection.cc

+1-6
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ namespace fibers = boost::fibers;
2626
namespace dfly {
2727
namespace {
2828

29-
using CmdArgVec = std::vector<MutableStrSpan>;
30-
3129
void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) {
3230
string res("-ERR Protocol error: ");
3331
if (pres == RedisParser::BAD_BULKLEN) {
@@ -43,10 +41,6 @@ void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) {
4341
}
4442
}
4543

46-
inline MutableStrSpan ToMSS(absl::Span<uint8_t> span) {
47-
return MutableStrSpan{reinterpret_cast<char*>(span.data()), span.size()};
48-
}
49-
5044
void RespToArgList(const RespVec& src, CmdArgVec* dest) {
5145
dest->resize(src.size());
5246
for (size_t i = 0; i < src.size(); ++i) {
@@ -311,6 +305,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
311305
std::unique_ptr<Request> req{dispatch_q_.front()};
312306
dispatch_q_.pop_front();
313307

308+
cc_->SetBatchMode(!dispatch_q_.empty());
314309
cc_->conn_state.mask |= ConnectionState::ASYNC_DISPATCH;
315310
service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get());
316311
cc_->conn_state.mask &= ~ConnectionState::ASYNC_DISPATCH;

server/dragonfly_listener.cc

+3-2
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,12 @@ ProactorBase* Listener::PickConnectionProactor(LinuxSocketBase* sock) {
119119
if (FLAGS_conn_use_incoming_cpu) {
120120
int fd = sock->native_handle();
121121

122-
int cpu;
122+
int cpu, napi_id;
123123
socklen_t len = sizeof(cpu);
124124

125125
CHECK_EQ(0, getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &cpu, &len));
126-
VLOG(1) << "CPU for connection " << fd << " is " << cpu;
126+
CHECK_EQ(0, getsockopt(fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, &napi_id, &len));
127+
VLOG(1) << "CPU/NAPI for connection " << fd << " is " << cpu << "/" << napi_id;
127128

128129
vector<unsigned> ids = pool()->MapCpuToThreads(cpu);
129130
if (!ids.empty()) {

server/engine_shard_set.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ void EngineShard::DestroyThreadLocal() {
4343
delete shard_;
4444
shard_ = nullptr;
4545

46-
DVLOG(1) << "Shard reset " << index;
46+
VLOG(1) << "Shard reset " << index;
4747
}
4848

4949
void EngineShardSet::Init(uint32_t sz) {

server/main_service.cc

+2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ void Service::Init(util::AcceptServer* acceptor) {
6565
}
6666

6767
void Service::Shutdown() {
68+
VLOG(1) << "Service::Shutdown";
69+
6870
engine_varz.reset();
6971
request_latency_usec.Shutdown();
7072
ping_qps.Shutdown();

server/reply_builder.cc

+24-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,30 @@ BaseSerializer::BaseSerializer(io::Sink* sink) : sink_(sink) {
3030
}
3131

3232
void BaseSerializer::Send(const iovec* v, uint32_t len) {
33-
error_code ec = sink_->Write(v, len);
33+
if (should_batch_) {
34+
// TODO: to introduce flushing when too much data is batched.
35+
for (unsigned i = 0; i < len; ++i) {
36+
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
37+
DVLOG(2) << "Appending to stream " << sink_ << " " << src;
38+
batch_.append(src.data(), src.size());
39+
}
40+
return;
41+
}
42+
43+
error_code ec;
44+
if (batch_.empty()) {
45+
ec = sink_->Write(v, len);
46+
} else {
47+
DVLOG(1) << "Sending batch to stream " << sink_ << "\n" << batch_;
48+
49+
iovec tmp[len + 1];
50+
tmp[0].iov_base = batch_.data();
51+
tmp[0].iov_len = batch_.size();
52+
copy(v, v + len, tmp + 1);
53+
ec = sink_->Write(tmp, len + 1);
54+
batch_.clear();
55+
}
56+
3457
if (ec) {
3558
ec_ = ec;
3659
}

server/reply_builder.h

+14-4
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,24 @@ class BaseSerializer {
2222
ec_ = std::make_error_code(std::errc::connection_aborted);
2323
}
2424

25+
// In order to reduce interrupt rate we allow coalescing responses together using
26+
// Batch mode. It is controlled by Connection state machine because it makes sense only
27+
// when pipelined requests are arriving.
28+
void SetBatchMode(bool batch) {
29+
should_batch_ = batch;
30+
}
31+
2532
//! Sends a string as is without any formatting. raw should be encoded according to the protocol.
2633
void SendDirect(std::string_view str);
2734

28-
::io::Sink* sink() {
29-
return sink_;
30-
}
31-
3235
void Send(const iovec* v, uint32_t len);
3336

3437
private:
3538
::io::Sink* sink_;
3639
std::error_code ec_;
40+
std::string batch_;
41+
42+
bool should_batch_ = false;
3743
};
3844

3945
class RespSerializer : public BaseSerializer {
@@ -89,6 +95,10 @@ class ReplyBuilder {
8995
void SendGetReply(std::string_view key, uint32_t flags, std::string_view value);
9096
void SendGetNotFound();
9197

98+
void SetBatchMode(bool mode) {
99+
serializer_->SetBatchMode(mode);
100+
}
101+
92102
private:
93103
RespSerializer* as_resp() {
94104
return static_cast<RespSerializer*>(serializer_.get());

0 commit comments

Comments
 (0)