Skip to content

Commit 4edd74f

Browse files
committed
Splitted server_thread
1 parent 61f8f15 commit 4edd74f

File tree

3 files changed

+182
-144
lines changed

3 files changed

+182
-144
lines changed

Diff for: CMakeLists.txt

+8
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@ set_target_properties(
6363
CXX_CLANG_TIDY ""
6464
)
6565

66+
add_library(
67+
svr_lib OBJECT
68+
source/server_thread.cpp
69+
${CMAKE_CURRENT_BINARY_DIR}/message.h
70+
)
71+
72+
target_compile_features(svr_lib PUBLIC cxx_std_20)
6673

6774
# ---- Declare executables ----
6875

@@ -80,6 +87,7 @@ target_compile_features(svr PRIVATE cxx_std_20)
8087
target_link_libraries(svr
8188
PRIVATE
8289
clt-svr-proto_lib
90+
svr_lib
8391
${Protobuf_LIBRARIES}
8492
fmt::fmt
8593
Threads::Threads)

Diff for: source/server_thread.cpp

+155
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
#include <cstdint>
2+
#include <optional>
3+
#include <vector>
4+
5+
#include "server_thread.h"
6+
7+
auto ServerThread::operator=(ServerThread && other) noexcept -> ServerThread & {
8+
fmt::print("{}\n", __PRETTY_FUNCTION__);
9+
std::lock_guard lock(mtx);
10+
std::lock_guard lock2(other.mtx);
11+
id = other.id;
12+
max_fd = other.max_fd;
13+
rfds = other.rfds;
14+
listening_sockets = std::move(other.listening_sockets);
15+
other.id = -1;
16+
other.max_fd = -1;
17+
return *this;
18+
}
19+
20+
void ServerThread::update_connections(int new_sock_fd) {
21+
{
22+
std::lock_guard lock(mtx);
23+
listening_sockets.push_back(new_sock_fd);
24+
}
25+
cv.notify_one();
26+
}
27+
28+
auto ServerThread::incomming_requests() -> int {
29+
constexpr auto timeout_s = 10ULL;
30+
timeval timeout {};
31+
timeout.tv_sec = timeout_s;
32+
timeout.tv_usec = 0;
33+
int retval = select((max_fd + 1), &rfds, nullptr, nullptr, &timeout);
34+
if (retval == 0) {
35+
fmt::print("update connections\n");
36+
init();
37+
} else if (retval < 0) {
38+
fmt::print("timeout\n");
39+
}
40+
return retval;
41+
}
42+
43+
void ServerThread::cleanup_connection(int dead_connection) {
44+
// cleanup connection
45+
std::lock_guard lock(mtx);
46+
auto it = std::find(
47+
listening_sockets.begin(), listening_sockets.end(), dead_connection);
48+
listening_sockets.erase(it);
49+
fmt::print("{}: {}\n", __PRETTY_FUNCTION__, dead_connection);
50+
}
51+
52+
auto ServerThread::get_new_requests() -> int {
53+
std::vector<int> lsockets;
54+
{
55+
std::lock_guard lock(mtx);
56+
lsockets = listening_sockets; // this is weird, you create a copy but do
57+
// not delete old vector?
58+
}
59+
for (auto csock : lsockets) {
60+
if (FD_ISSET(csock, &rfds)) { // NOLINT
61+
auto [bytecount, buffer] = secure_recv(csock);
62+
if (bytecount <= 0) {
63+
if (bytecount == 0) {
64+
cleanup_connection(csock);
65+
init();
66+
}
67+
}
68+
// FIXME: We expect the provider of the function to handle error cases!
69+
process_req(bytecount, buffer.get());
70+
}
71+
}
72+
// FIXME What do we actually return here???
73+
return 0;
74+
}
75+
76+
auto ServerThread::init() -> void {
77+
get_new_connections();
78+
reset_fds();
79+
}
80+
81+
void ServerThread::get_new_connections() {
82+
std::unique_lock<std::mutex> lock(mtx);
83+
auto nb_connections = listening_sockets.size();
84+
while (nb_connections == 0) {
85+
fmt::print("{}: no connections\n", __PRETTY_FUNCTION__);
86+
cv.wait(lock);
87+
nb_connections = listening_sockets.size();
88+
}
89+
}
90+
91+
void ServerThread::reset_fds() {
92+
max_fd = -1;
93+
FD_ZERO(&rfds); // NOLINT
94+
for (auto rfd : listening_sockets) {
95+
FD_SET(rfd, &rfds); // NOLINT
96+
max_fd = (max_fd < rfd) ? rfd : max_fd;
97+
}
98+
}
99+
100+
auto ServerThread::destruct_message(char * msg, size_t bytes)
101+
-> std::optional<uint32_t> {
102+
if (bytes < 4) {
103+
return std::nullopt;
104+
}
105+
106+
auto actual_msg_size = convert_byte_array_to_int(msg);
107+
return actual_msg_size;
108+
}
109+
110+
auto ServerThread::read_n(int fd, char * buffer, size_t n) -> size_t {
111+
size_t bytes_read = 0;
112+
while (bytes_read < n) {
113+
auto bytes_left = n - bytes_read;
114+
auto bytes_read_now = recv(fd, buffer + bytes_read, bytes_left, 0);
115+
if (bytes_read_now <= 0) {
116+
return bytes_read_now;
117+
}
118+
bytes_read += bytes_read_now;
119+
}
120+
return bytes_read;
121+
}
122+
123+
auto ServerThread::secure_recv(int fd)
124+
-> std::pair<uint32_t, std::unique_ptr<char[]>> {
125+
char dlen[4];
126+
if (auto byte_read = read_n(fd, dlen, length_size_field);
127+
byte_read != length_size_field) {
128+
return {byte_read, nullptr};
129+
}
130+
131+
auto actual_msg_size_opt = destruct_message(dlen, length_size_field);
132+
if (!actual_msg_size_opt) {
133+
return {-1, nullptr};
134+
}
135+
auto actual_msg_size = *actual_msg_size_opt;
136+
auto buf = std::make_unique<char[]>(static_cast<size_t>(actual_msg_size) + 1);
137+
buf[actual_msg_size] = '\0';
138+
if (auto byte_read = read_n(fd, buf.get(), actual_msg_size);
139+
byte_read != actual_msg_size) {
140+
return {byte_read, nullptr};
141+
}
142+
143+
return {actual_msg_size, std::move(buf)};
144+
}
145+
146+
auto ServerThread::process_req(size_t sz, char * buf) const -> void {
147+
sockets::client_msg msg;
148+
auto payload_sz = sz - 4;
149+
std::string tmp(buf + 4, payload_sz);
150+
msg.ParseFromString(tmp);
151+
for (auto i = 0; i < msg.ops_size(); ++i) {
152+
auto const & op = msg.ops(i);
153+
callbacks[op.op_id()](op);
154+
}
155+
}

Diff for: source/server_thread.h

+19-144
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <variant>
1111

1212
#include <fmt/format.h>
13+
#include <netinet/in.h>
14+
#include <sys/socket.h>
1315
#include <unistd.h>
1416

1517
#include "callback.h"
@@ -60,18 +62,7 @@ class ServerThread {
6062
other.max_fd = -1;
6163
}
6264

63-
inline auto operator=(ServerThread && other) noexcept -> ServerThread & {
64-
fmt::print("{}\n", __PRETTY_FUNCTION__);
65-
std::lock_guard lock(mtx);
66-
std::lock_guard lock2(other.mtx);
67-
id = other.id;
68-
max_fd = other.max_fd;
69-
rfds = other.rfds;
70-
listening_sockets = std::move(other.listening_sockets);
71-
other.id = -1;
72-
other.max_fd = -1;
73-
return *this;
74-
}
65+
auto operator=(ServerThread && other) noexcept -> ServerThread &;
7566

7667
inline ~ServerThread() {
7768
std::lock_guard lock(mtx);
@@ -81,82 +72,20 @@ class ServerThread {
8172
}
8273
}
8374

84-
inline void update_connections(int new_sock_fd) {
85-
{
86-
std::lock_guard lock(mtx);
87-
listening_sockets.push_back(new_sock_fd);
88-
}
89-
cv.notify_one();
90-
};
91-
92-
inline auto incomming_requests() -> int {
93-
constexpr auto timeout_s = 10ULL;
94-
timeval timeout {};
95-
timeout.tv_sec = timeout_s;
96-
timeout.tv_usec = 0;
97-
int retval = select((max_fd + 1), &rfds, nullptr, nullptr, &timeout);
98-
if (retval == 0) {
99-
fmt::print("update connections\n");
100-
init();
101-
} else if (retval < 0) {
102-
fmt::print("timeout\n");
103-
}
104-
return retval;
105-
}
75+
void update_connections(int new_sock_fd);
10676

107-
inline void cleanup_connection(int dead_connection) {
108-
// cleanup connection
109-
std::lock_guard lock(mtx);
110-
auto it = std::find(
111-
listening_sockets.begin(), listening_sockets.end(), dead_connection);
112-
listening_sockets.erase(it);
113-
fmt::print("{}: {}\n", __PRETTY_FUNCTION__, dead_connection);
114-
}
77+
auto incomming_requests() -> int;
11578

116-
inline auto process_req(size_t sz, char * buf) const -> void {
117-
sockets::client_msg msg;
118-
auto payload_sz = sz - 4;
119-
std::string tmp(buf + 4, payload_sz);
120-
msg.ParseFromString(tmp);
121-
for (auto i = 0; i < msg.ops_size(); ++i) {
122-
auto const & op = msg.ops(i);
123-
callbacks[op.op_id()](op);
124-
}
125-
}
79+
void cleanup_connection(int dead_connection);
12680

12781
inline void register_callback(sockets::client_msg::OperationType op,
12882
CallbackT cb) {
12983
callbacks[op] = std::move(cb);
13084
}
13185

132-
inline auto get_new_requests() -> int {
133-
std::vector<int> lsockets;
134-
{
135-
std::lock_guard lock(mtx);
136-
lsockets = listening_sockets; // this is weird, you create a copy but do
137-
// not delete old vector?
138-
}
139-
for (auto csock : lsockets) {
140-
if (FD_ISSET(csock, &rfds)) { // NOLINT
141-
auto [bytecount, buffer] = secure_recv(csock);
142-
if (bytecount <= 0) {
143-
if (bytecount == 0) {
144-
cleanup_connection(csock);
145-
init();
146-
}
147-
}
148-
// FIXME: We expect the provider of the function to handle error cases!
149-
process_req(bytecount, buffer.get());
150-
}
151-
}
152-
// FIXME What do we actually return here???
153-
return 0;
154-
}
86+
auto get_new_requests() -> int;
15587

156-
void init() {
157-
get_new_connections();
158-
reset_fds();
159-
}
88+
void init();
16089

16190
private:
16291
int id;
@@ -168,77 +97,23 @@ class ServerThread {
16897
std::mutex mtx;
16998
std::condition_variable cv;
17099

171-
void get_new_connections() {
172-
std::unique_lock<std::mutex> lock(mtx);
173-
auto nb_connections = listening_sockets.size();
174-
while (nb_connections == 0) {
175-
fmt::print("{}: no connections\n", __PRETTY_FUNCTION__);
176-
cv.wait(lock);
177-
nb_connections = listening_sockets.size();
178-
}
179-
}
100+
void get_new_connections();
180101

181-
void reset_fds() {
182-
max_fd = -1;
183-
FD_ZERO(&rfds); // NOLINT
184-
for (auto rfd : listening_sockets) {
185-
FD_SET(rfd, &rfds); // NOLINT
186-
max_fd = (max_fd < rfd) ? rfd : max_fd;
187-
}
188-
}
102+
void reset_fds();
189103

190104
/**
191-
* * It returns the actual size of msg.
192-
* * Not that msg might not contain all payload data.
193-
* * The function expects at least that the msg contains the first 4 bytes
194-
* that
195-
* * indicate the actual size of the payload.
196-
* */
105+
** It returns the actual size of msg.
106+
** Not that msg might not contain all payload data.
107+
** The function expects at least that the msg contains the first 4 bytes that
108+
** indicate the actual size of the payload.
109+
**/
197110
static auto destruct_message(char * msg, size_t bytes)
198-
-> std::optional<uint32_t> {
199-
if (bytes < 4) {
200-
return std::nullopt;
201-
}
202-
203-
auto actual_msg_size = convert_byte_array_to_int(msg);
111+
-> std::optional<uint32_t>;
204112

205-
return actual_msg_size;
206-
}
207-
208-
static auto read_n(int fd, char * buffer, size_t n) -> size_t {
209-
size_t bytes_read = 0;
210-
while (bytes_read < n) {
211-
auto bytes_left = n - bytes_read;
212-
auto bytes_read_now = recv(fd, buffer + bytes_read, bytes_left, 0);
213-
if (bytes_read_now <= 0) {
214-
return bytes_read_now;
215-
}
216-
bytes_read += bytes_read_now;
217-
}
218-
return bytes_read;
219-
}
113+
static auto read_n(int fd, char * buffer, size_t n) -> size_t;
220114

221115
static auto secure_recv(int fd)
222-
-> std::pair<uint32_t, std::unique_ptr<char[]>> {
223-
char dlen[4];
224-
if (auto byte_read = read_n(fd, dlen, length_size_field);
225-
byte_read != length_size_field) {
226-
return {byte_read, nullptr};
227-
}
116+
-> std::pair<uint32_t, std::unique_ptr<char[]>>;
228117

229-
auto actual_msg_size_opt = destruct_message(dlen, length_size_field);
230-
if (!actual_msg_size_opt) {
231-
return {-1, nullptr};
232-
}
233-
auto actual_msg_size = *actual_msg_size_opt;
234-
auto buf =
235-
std::make_unique<char[]>(static_cast<size_t>(actual_msg_size) + 1);
236-
buf[actual_msg_size] = '\0';
237-
if (auto byte_read = read_n(fd, buf.get(), actual_msg_size);
238-
byte_read != actual_msg_size) {
239-
return {byte_read, nullptr};
240-
}
241-
242-
return {actual_msg_size, std::move(buf)};
243-
}
118+
inline auto process_req(size_t sz, char * buf) const -> void;
244119
};

0 commit comments

Comments
 (0)