Skip to content

Commit 0120415

Browse files
committedFeb 18, 2023
Initialization of DB connections, running DB worker threads
1 parent 43c30d9 commit 0120415

13 files changed

+379
-34
lines changed
 

‎etc/default_cfg.json

+12
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,22 @@
77
},
88
"control": {
99
"threads": 1
10+
},
11+
"database": {
1012
}
1113
},
1214
"http_server": {
1315
"keepalive_requests": 100,
1416
"port": 8080
17+
},
18+
"databases": {
19+
"sqlite3": {
20+
"db1": {
21+
"file": "db1.sqlite",
22+
"queries": {
23+
"trivial": "select 1"
24+
}
25+
}
26+
}
1527
}
1628
}

‎src/CMakeLists.txt

+6-1
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,23 @@ protobuf_generate_cpp(
77
message("Protobuf generated headers: ${PROTO_HPP}")
88
message("Protobuf generated sources: ${PROTO_CPP}")
99

10+
add_library(proto_obj OBJECT ${PROTO_CPP})
11+
target_compile_options(proto_obj PRIVATE -Wno-sign-conversion)
12+
1013
# The main ACppSrv executable
1114
add_executable(
1215
acppsrv
1316
acppsrv.cpp
1417
application.cpp
1518
configuration.cpp
19+
db_server.cpp
1620
http_server.cpp
1721
http_hnd_echo.cpp
1822
http_hnd_stat.cpp
1923
log.cpp
24+
sqlite3.cpp
2025
worker.cpp
21-
${PROTO_CPP}
26+
$<TARGET_OBJECTS:proto_obj>
2227
)
2328
target_include_directories(acppsrv PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
2429
target_link_libraries(

‎src/application.cpp

+21-2
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,32 @@
11
#include "application.hpp"
22
#include "configuration.hpp"
3+
#include "db_server.hpp"
34
#include "http_server.hpp"
45
#include "worker.hpp"
56
#include <boost/asio/signal_set.hpp>
7+
#include <tuple>
8+
#include <utility>
69

710
namespace acppsrv {
811

912
bool application::run()
1013
{
14+
// Create threads pools
1115
auto tp = cfg.data().has_thread_pools() ?
1216
&cfg.data().thread_pools() : nullptr;
13-
// Create threads pools
1417
thread_pool control_pool(tp && tp->has_control() ? &tp->control() : nullptr,
1518
std::string{pool_control_name});
1619
thread_pool main_pool(tp && tp->has_main() ? &tp->main() : nullptr,
1720
std::string{pool_main_name});
21+
thread_pool database_pool(tp && tp->has_database() ?
22+
&tp->database() : nullptr,
23+
std::string{pool_database_name});
1824
// Set up the control pool
1925
boost::asio::signal_set termsig{control_pool.ctx, SIGINT, SIGTERM};
2026
termsig.async_wait(
21-
[&main_pool](const boost::system::error_code& ec, int sig) {
27+
[&main_pool, &database_pool](const boost::system::error_code& ec,
28+
int sig)
29+
{
2230
if (ec)
2331
log_msg(log_level::warning) <<
2432
"Waiting for termination signal failed: " << ec.message();
@@ -38,16 +46,27 @@ bool application::run()
3846
}
3947
}
4048
main_pool.stop();
49+
database_pool.stop();
4150
});
4251
// Set up the main pool
4352
http_server http_srv(cfg, main_pool, name, version);
4453
if (!http_srv.run()) {
4554
log_msg(log_level::crit) << "Cannot initialize HTTP server";
4655
return false;
4756
}
57+
// Set up the database pool
58+
db_server db_srv(cfg.data().has_databases() ?
59+
&cfg.data().databases() : nullptr,
60+
database_pool);
61+
if (!db_srv.run()) {
62+
log_msg(log_level::crit) << "Cannot initialize database server";
63+
return false;
64+
}
4865
// Start processing
66+
log_msg(log_level::notice) << "Initialized, starting worker threads";
4967
control_pool.run(false);
5068
main_pool.run(true);
69+
database_pool.run(true);
5170
// We need pool's ctx to register an async op, so any async object
5271
// belonging to the pool must be created after the pool. But this causes
5372
// destruction of async objects before destruction of the pool, causing

‎src/application.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
#pragma once
22

3+
#include "sqlite3.hpp"
4+
5+
#include <map>
36
#include <string_view>
47

58
namespace acppsrv {
69

710
class configuration;
11+
class thread_pool;
812

913
class application {
1014
public:
1115
static constexpr std::string_view name = "ACppSrv";
1216
static constexpr std::string_view version = GIT_VERSION;
1317
static constexpr std::string_view pool_main_name{"main"};
1418
static constexpr std::string_view pool_control_name{"control"};
19+
static constexpr std::string_view pool_database_name{"database"};
1520
explicit application(const configuration& cfg): cfg(cfg) {}
1621
application(const application&) = delete;
1722
application(application&&) = delete;

‎src/configuration.proto

+14-1
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@ message ThreadPool {
4646
message ThreadPools {
4747
// The main server client handling pool
4848
ThreadPool main = 1;
49-
//! The administrative control pool
49+
// The administrative control pool
5050
ThreadPool control = 2;
51+
// The database handling pool
52+
ThreadPool database = 3;
5153
}
5254

5355
// HTTP server
@@ -62,10 +64,21 @@ message HttpServer {
6264
uint64 max_request_body = 8; // 0 = unlimited
6365
}
6466

67+
// SQLite3 database
68+
message SQLite3Db {
69+
string file = 1;
70+
map<string, string> queries = 2;
71+
}
72+
73+
message SQLite3 {
74+
map<string, SQLite3Db> sqlite3 = 1;
75+
}
76+
6577
/*** Top level configuration node ********************************************/
6678

6779
message Configuration {
6880
Log log = 1;
6981
ThreadPools thread_pools = 2;
7082
HttpServer http_server = 3;
83+
SQLite3 databases = 4;
7184
}

‎src/db_server.cpp

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#include "db_server.hpp"
2+
#include "configuration.hpp"
3+
#include "log.hpp"
4+
#include "worker.hpp"
5+
#include <tuple>
6+
#include <utility>
7+
8+
namespace acppsrv {
9+
10+
db_server::db_server(const proto::SQLite3* cfg, thread_pool& workers):
11+
cfg(cfg), workers(workers)
12+
{
13+
}
14+
15+
bool db_server::run()
16+
{
17+
try {
18+
for (size_t tidx = 0; tidx < workers.size(); ++tidx) {
19+
log_msg(log_level::notice) << "Initializing database thread " <<
20+
(tidx + 1) << '/' << workers.size();
21+
databases.emplace_back();
22+
auto& current = databases.back();
23+
auto level = tidx == 0 ? log_level::notice : log_level::debug;
24+
if (cfg) {
25+
for (auto&& [d_name, d_def]: cfg->sqlite3()) {
26+
log_msg(level) << "Open database=\"" << d_name <<
27+
"\" file=" << d_def.file() << '"';
28+
auto& db = current.
29+
emplace(std::piecewise_construct,
30+
std::forward_as_tuple(d_name),
31+
std::forward_as_tuple(d_def.file())).
32+
first->second;
33+
for (auto&& [q_name, q_def]: d_def.queries()) {
34+
log_msg(level) << "Preparing database=\"" <<
35+
d_name << "\" query=\"" << q_name <<
36+
"\" sql=\"" << q_def << '"';
37+
db.queries.emplace(std::piecewise_construct,
38+
std::forward_as_tuple(q_name),
39+
std::forward_as_tuple(db.db,
40+
q_def,
41+
q_name));
42+
}
43+
}
44+
}
45+
}
46+
} catch (const sqlite::error& e) {
47+
log_msg(log_level::crit) << "Opening databases: " << e.what();
48+
return false;
49+
}
50+
return true;
51+
}
52+
53+
} // namespace acppsrv

‎src/db_server.hpp

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#pragma once
2+
3+
#include "sqlite3.hpp"
4+
5+
#include <map>
6+
#include <vector>
7+
8+
namespace acppsrv {
9+
10+
class thread_pool;
11+
12+
namespace proto {
13+
14+
class SQLite3;
15+
16+
} // namespace proto
17+
18+
class db_server {
19+
public:
20+
db_server(const proto::SQLite3* cfg, thread_pool& workers);
21+
bool run();
22+
private:
23+
struct db_def_t {
24+
explicit db_def_t(const std::string& file): db(file) {}
25+
sqlite::connection db;
26+
std::map<std::string, sqlite::query> queries;
27+
};
28+
const proto::SQLite3* cfg;
29+
thread_pool& workers;
30+
std::vector<std::map<std::string, db_def_t>> databases;
31+
};
32+
33+
} // namespace acppsrv

‎src/log.cpp

+30-24
Original file line numberDiff line numberDiff line change
@@ -82,38 +82,44 @@ std::optional<log_level> from_string(std::string_view str)
8282

8383
/*** log_msg *****************************************************************/
8484

85-
log_msg::log_msg(logger& obj, log_level level, session_type session):
85+
log_msg::log_msg(logger& obj, log_level level, session_type session) noexcept:
8686
_logger(obj), _level(level), _session(session)
8787
{
8888
namespace sc = std::chrono;
89-
if (_level > log_level::off && _level <= _logger.level()) {
90-
_os.emplace(std::cerr);
91-
auto now = sc::system_clock::now().time_since_epoch();
92-
std::array<char, 64> buf{};
93-
time_t time = sc::duration_cast<sc::seconds>(now).count();
94-
auto usec =
95-
std::to_string(sc::duration_cast<sc::microseconds>(now).count() %
96-
1'000'000);
97-
tm t{};
98-
localtime_r(&time, &t);
99-
assert(strftime(buf.data(), buf.size(), "%F %T.000000%z", &t) != 0);
100-
static constexpr size_t time_end = 26; // after .000000
101-
std::copy(usec.begin(), usec.end(),
102-
buf.data() + time_end - usec.size());
103-
*this << buf.data();
104-
*this << " [" << getpid() << '.' << std::this_thread::get_id();
105-
for (auto&& s: session)
106-
if (s)
107-
*this << '.' << *s;
108-
*this << ']';
109-
*this << ' ' << to_string(_level) << ' ';
110-
}
89+
if (_level > log_level::off && _level <= _logger.level())
90+
try {
91+
_os.emplace(std::cerr);
92+
auto now = sc::system_clock::now().time_since_epoch();
93+
std::array<char, 64> buf{};
94+
time_t time = sc::duration_cast<sc::seconds>(now).count();
95+
auto usec =
96+
std::to_string(sc::duration_cast<sc::microseconds>(now).count() %
97+
1'000'000);
98+
tm t{};
99+
localtime_r(&time, &t);
100+
assert(strftime(buf.data(), buf.size(), "%F %T.000000%z", &t) != 0);
101+
static constexpr size_t time_end = 26; // after .000000
102+
std::copy(usec.begin(), usec.end(),
103+
buf.data() + time_end - usec.size());
104+
*this << buf.data();
105+
*this << " [" << getpid() << '.' << std::this_thread::get_id();
106+
for (auto&& s: session)
107+
if (s)
108+
*this << '.' << *s;
109+
*this << ']';
110+
*this << ' ' << to_string(_level) << ' ';
111+
} catch (...) {
112+
kill();
113+
}
111114
}
112115

113116
log_msg::~log_msg()
114117
{
115118
if (_os && _os->get_wrapped())
116-
*_os << std::endl;
119+
try {
120+
*_os << std::endl;
121+
} catch (...) {
122+
}
117123
}
118124

119125
/*** logger ******************************************************************/

‎src/log.hpp

+15-6
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,25 @@ enum class log_level: int {
3333

3434
[[nodiscard]] std::optional<log_level> from_string(std::string_view str);
3535

36+
// It may be called in destructor, so member functions are generally noexcept
37+
// and exceptions are silently ignored
3638
class log_msg {
3739
public:
3840
using session_type = std::array<std::optional<uint64_t>, 2>;
39-
explicit log_msg(log_level level, session_type session = {});
40-
log_msg(logger& obj, log_level level, session_type = {});
41+
explicit log_msg(log_level level, session_type session = {}) noexcept;
42+
log_msg(logger& obj, log_level level, session_type = {}) noexcept;
4143
log_msg(const log_msg&) = delete;
42-
log_msg(log_msg&&) = default;
44+
log_msg(log_msg&&) noexcept = default;
4345
~log_msg();
4446
log_msg& operator=(const log_msg&) = delete;
4547
log_msg& operator=(log_msg&&) = delete;
46-
template <class T> log_msg& operator<<(T&& v) & {
48+
template <class T> log_msg& operator<<(T&& v) & noexcept {
4749
if (_os)
48-
*_os << v;
50+
try {
51+
*_os << v;
52+
} catch (...) {
53+
kill();
54+
}
4955
return *this;
5056
}
5157
template <class ...T> log_msg& operator<<(std::variant<T...>&& v) & {
@@ -60,6 +66,9 @@ class log_msg {
6066
return std::move(*this);
6167
}
6268
private:
69+
void kill() {
70+
_os.reset();
71+
}
6372
logger& _logger;
6473
log_level _level;
6574
std::optional<std::osyncstream> _os;
@@ -92,7 +101,7 @@ class logger {
92101

93102
/*** log_msg *****************************************************************/
94103

95-
inline log_msg::log_msg(log_level level, session_type session):
104+
inline log_msg::log_msg(log_level level, session_type session) noexcept:
96105
log_msg(logger::global(), level, session)
97106
{
98107
}

‎src/sqlite3.cpp

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#include "sqlite3.hpp"
2+
#include "log.hpp"
3+
4+
#include <cassert>
5+
#include <sqlite3.h>
6+
7+
namespace acppsrv::sqlite {
8+
9+
/*** connection::impl ********************************************************/
10+
11+
class connection::impl {
12+
public:
13+
explicit impl(connection& conn);
14+
impl(const impl&) = delete;
15+
impl(impl&&) = delete;
16+
~impl();
17+
impl& operator=(const impl&) = delete;
18+
impl& operator=(impl&&) = delete;
19+
connection& conn;
20+
sqlite3* db = nullptr;
21+
};
22+
23+
connection::impl::impl(connection& conn): conn(conn)
24+
{
25+
if (int status = sqlite3_open_v2(conn._file.c_str(), &db,
26+
SQLITE_OPEN_READWRITE |
27+
SQLITE_OPEN_URI |
28+
SQLITE_OPEN_NOMUTEX |
29+
SQLITE_OPEN_EXRESCODE,
30+
nullptr);
31+
status != SQLITE_OK)
32+
{
33+
if (!db)
34+
throw error(conn._file);
35+
throw error(conn, *this);
36+
}
37+
assert(db);
38+
}
39+
40+
connection::impl::~impl()
41+
{
42+
if (sqlite3_close_v2(db) != SQLITE_OK) {
43+
assert(db);
44+
log_msg(log_level::emerg) << "Cannot close database \"" << conn._file <<
45+
"\" handle: " << sqlite3_errmsg(db);
46+
} else
47+
log_msg(log_level::debug) << "Closed database \"" << conn._file << '"';
48+
}
49+
50+
/*** connection **************************************************************/
51+
52+
connection::connection(std::string file):
53+
_file(std::move(file)), _impl(std::make_unique<impl>(*this))
54+
{
55+
}
56+
57+
connection::~connection() = default;
58+
59+
/*** query::impl *************************************************************/
60+
61+
class query::impl {
62+
public:
63+
explicit impl(query& q);
64+
impl(const impl&) = delete;
65+
impl(impl&&) = delete;
66+
~impl();
67+
impl& operator=(const impl&) = delete;
68+
impl& operator=(impl&&) = delete;
69+
private:
70+
query& q;
71+
sqlite3_stmt* stmt = nullptr;
72+
};
73+
74+
query::impl::impl(query& q): q(q)
75+
{
76+
// sqlite3 allows passing size incl. terminating NUL
77+
if (sqlite3_prepare_v3(q._db._impl->db, q._sql.c_str(),
78+
int(q._sql.size() + 1),
79+
SQLITE_PREPARE_PERSISTENT, &stmt,
80+
nullptr) != SQLITE_OK)
81+
{
82+
assert(!stmt);
83+
// this->q to silence compiler warning about unused q
84+
throw error(this->q._db, this->q._sql_id);
85+
}
86+
assert(stmt);
87+
}
88+
89+
query::impl::~impl()
90+
{
91+
sqlite3_finalize(stmt);
92+
}
93+
94+
/*** query *******************************************************************/
95+
96+
query::query(connection& db, std::string sql, std::string sql_id):
97+
_db(db), _sql(std::move(sql)), _sql_id(std::move(sql_id)),
98+
_impl(std::make_unique<impl>(*this))
99+
{
100+
}
101+
102+
query::~query() = default;
103+
104+
/*** error *******************************************************************/
105+
106+
error::error(const std::string& file):
107+
runtime_error("sqlite3 error in db \"" + file + "\"" +
108+
": Cannot allocate database handle")
109+
{
110+
}
111+
112+
error::error(connection& db, const std::string& sql_id):
113+
runtime_error("sqlite3 error in db \"" + db._file + "\"" +
114+
(sql_id.empty() ? "" : (" (" + sql_id + ")")) +
115+
": " + sqlite3_errmsg(db._impl->db))
116+
{
117+
}
118+
119+
error::error(connection& db, connection::impl& impl):
120+
runtime_error("sqlite3 error in db \"" + db._file + "\"" +
121+
": " + sqlite3_errmsg(impl.db))
122+
{
123+
}
124+
125+
} // namespace acppsrv::sqlite

‎src/sqlite3.hpp

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#pragma once
2+
3+
#include <memory>
4+
#include <stdexcept>
5+
#include <string_view>
6+
7+
namespace acppsrv::sqlite {
8+
9+
class connection;
10+
class transaction;
11+
class query;
12+
13+
class connection {
14+
public:
15+
// not using string_view, because sqlite3 requires null-terminated strings
16+
explicit connection(std::string file);
17+
connection(const connection&) = delete;
18+
connection(connection&&) = delete;
19+
~connection();
20+
connection& operator=(const connection&) = delete;
21+
connection& operator=(connection&&) = delete;
22+
private:
23+
class impl;
24+
std::string _file;
25+
std::unique_ptr<impl> _impl;
26+
friend class error;
27+
friend class query;
28+
};
29+
30+
class query {
31+
public:
32+
explicit query(connection& db, std::string sql, std::string sql_id = {});
33+
query(const query&) = delete;
34+
query(query&&) = delete;
35+
~query();
36+
query& operator=(const query&) = delete;
37+
query& operator=(query&&) = delete;
38+
private:
39+
class impl;
40+
connection& _db;
41+
std::string _sql;
42+
std::string _sql_id;
43+
std::unique_ptr<impl> _impl;
44+
};
45+
46+
class error: public std::runtime_error {
47+
public:
48+
explicit error(const std::string& file);
49+
explicit error(connection& db, const std::string& sql_id = {});
50+
// To be used before db._impl is initialized
51+
error(connection& db, connection::impl& impl);
52+
};
53+
54+
} // namespace acppsrv::sqlite

‎src/worker.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
namespace acppsrv {
77

8+
thread_local size_t thread_pool::this_thread_idx;
9+
810
thread_pool::thread_pool(const proto::ThreadPool* cfg, std::string name):
911
ctx(configuration::num_threads(cfg)), name(std::move(name)),
1012
threads(size_t(configuration::num_threads(cfg)))
@@ -28,6 +30,7 @@ void thread_pool::run(bool persistent)
2830
for (size_t t = 0; t < threads.size(); ++t)
2931
threads[t] = std::thread(
3032
[this, t, num_threads = threads.size()]() {
33+
this_thread_idx = t;
3134
log_msg(log_level::debug) << "Starting thread " << (t + 1) <<
3235
'/' << num_threads << " in pool \"" << name << '"';
3336
while (!ctx.stopped())

‎src/worker.hpp

+8
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,17 @@ class thread_pool {
2626
void run(bool persistent);
2727
void stop();
2828
void wait();
29+
size_t size() {
30+
return threads.size();
31+
}
32+
// To be called from a handler running in this thread pool
33+
[[nodiscard]] static size_t this_thread() {
34+
return this_thread_idx;
35+
}
2936
boost::asio::io_context ctx;
3037
private:
3138
const std::string name;
39+
static thread_local size_t this_thread_idx;
3240
std::vector<std::thread> threads;
3341
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
3442
work{ctx.get_executor()};

0 commit comments

Comments
 (0)
Please sign in to comment.