Skip to content

Commit 993d8a5

Browse files
committed
Add initial C++20 coroutine support
1 parent 21c83ca commit 993d8a5

File tree

9 files changed

+965
-1
lines changed

9 files changed

+965
-1
lines changed

.gitmodules

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[submodule "tests/third_party/corral"]
2+
path = tests/third_party/corral
3+
url = https://github.com/hudson-trading/corral

tests/CMakeLists.txt

+23-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
OPTION (COVERAGE "Enable gcda file generation needed by lcov" OFF)
2+
OPTION (CPPZMQ_TEST_COROUTINE "Enable C++20 coroutine support test cases. This requires Boost.Asio" ON)
3+
14
find_package(Threads)
25

36
find_package(Catch2 QUIET)
@@ -32,6 +35,26 @@ add_executable(
3235
utilities.cpp
3336
)
3437

38+
if (CPPZMQ_TEST_COROUTINE)
39+
target_compile_features(
40+
unit_tests PRIVATE cxx_std_23
41+
)
42+
target_compile_definitions(
43+
unit_tests PRIVATE
44+
CPPZMQ_ENABLE_CORRAL_COROUTINE
45+
)
46+
find_package(Boost CONFIG REQUIRED COMPONENTS asio)
47+
target_include_directories(unit_tests PRIVATE third_party/corral)
48+
target_link_libraries(
49+
unit_tests
50+
PRIVATE Boost::asio
51+
)
52+
target_sources(
53+
unit_tests PRIVATE
54+
async/corral/message.cpp
55+
)
56+
endif()
57+
3558
target_include_directories(unit_tests PUBLIC ${CATCH_MODULE_PATH})
3659
target_link_libraries(
3760
unit_tests
@@ -40,7 +63,6 @@ target_link_libraries(
4063
PRIVATE ${CMAKE_THREAD_LIBS_INIT}
4164
)
4265

43-
OPTION (COVERAGE "Enable gcda file generation needed by lcov" OFF)
4466

4567
if (COVERAGE)
4668
target_compile_options(unit_tests PRIVATE --coverage)

tests/async/corral/common.hpp

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#pragma once
2+
3+
#include <boost/asio/io_context.hpp>
4+
#include <catch2/catch_all.hpp>
5+
#include <catch2/catch_test_macros.hpp>
6+
7+
#include <chrono>
8+
#include <zmq_async.hpp>
9+
#include <corral/corral.h>
10+
#include <boost/asio/steady_timer.hpp>
11+
12+
template<> struct corral::EventLoopTraits<boost::asio::io_context>
13+
{
14+
using T = boost::asio::io_context;
15+
/// Returns a value identifying the event loop.
16+
/// Traits for any sort of wrappers should return ID
17+
/// of the wrapped event loop.
18+
static EventLoopID eventLoopID(T &ex) { return EventLoopID{std::addressof(ex)}; }
19+
20+
/// Runs the event loop.
21+
static void run(T &ex) { ex.run(); }
22+
23+
/// Tells the event loop to exit.
24+
/// run() should return shortly thereafter.
25+
static void stop(T &ex) { ex.stop(); }
26+
};
27+
28+
29+
struct [[nodiscard]] Timer
30+
{
31+
explicit Timer(std::chrono::milliseconds sec, boost::asio::io_context &io) :
32+
timer{io, sec}
33+
{
34+
}
35+
boost::asio::steady_timer timer;
36+
37+
public /* awaitable */:
38+
inline auto await_ready() const noexcept
39+
{
40+
return timer.expiry() <= std::chrono::steady_clock::now();
41+
}
42+
43+
inline auto await_suspend(std::coroutine_handle<> h) noexcept
44+
{
45+
timer.async_wait([h](boost::system::error_code e) {
46+
if (!e)
47+
h.resume();
48+
});
49+
}
50+
51+
inline auto await_resume() {}
52+
53+
public /* corral extensions */:
54+
inline bool await_cancel(std::coroutine_handle<>) noexcept
55+
{
56+
return timer.cancel();
57+
}
58+
59+
inline std::false_type await_must_resume() const noexcept { return {}; }
60+
};

tests/async/corral/message.cpp

+177
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
#include "common.hpp"
2+
#include "corral/Nursery.h"
3+
#include "corral/wait.h"
4+
#include "zmq.hpp"
5+
#include "zmq_addon.hpp"
6+
#include <catch2/catch_test_macros.hpp>
7+
#include <utility>
8+
9+
using namespace std::string_literals;
10+
using namespace std::string_view_literals;
11+
using namespace std::chrono_literals;
12+
using zmq::async::corral::socket_t, zmq::message_t, zmq::context_t;
13+
14+
15+
TEST_CASE("basic REQ and REP", "[async_corral]")
16+
{
17+
boost::asio::io_context io;
18+
context_t ctx;
19+
20+
constexpr auto req_msg = "Hi"sv;
21+
constexpr auto rep_msg = "There"sv;
22+
constexpr auto inproc_addr = "inproc://async_corral-basic";
23+
24+
corral::run(io, [&] -> corral::Task<> {
25+
co_await corral::allOf(
26+
[&] -> corral::Task<> {
27+
socket_t socket{io, ctx, zmq::socket_type::req};
28+
socket.connect(inproc_addr);
29+
co_await socket.send(message_t{req_msg});
30+
auto msg = co_await socket.recv();
31+
REQUIRE(msg.to_string() == rep_msg);
32+
},
33+
[&] -> corral::Task<> {
34+
socket_t socket{io, ctx, zmq::socket_type::rep};
35+
socket.bind(inproc_addr);
36+
auto r = co_await socket.recv();
37+
REQUIRE(r.to_string() == req_msg);
38+
co_await socket.send(message_t{rep_msg});
39+
});
40+
});
41+
}
42+
43+
TEST_CASE("simple ROUTER and DEALER", "[async_corral]")
44+
{
45+
boost::asio::io_context io;
46+
context_t ctx;
47+
48+
constexpr auto request_msg1 = "Test"sv;
49+
constexpr auto request_msg2 = "ing"sv;
50+
constexpr auto response_msg = "42!"sv;
51+
constexpr auto response_repeat = 2;
52+
constexpr auto inproc_addr = "inproc://async_corral-router_dealer";
53+
54+
auto server = [&] -> corral::Task<> {
55+
auto external = socket_t{io, ctx, zmq::socket_type::router};
56+
external.bind(inproc_addr);
57+
58+
for (;;) {
59+
auto msg = co_await external.recv_multipart();
60+
REQUIRE(msg.size() == 3);
61+
REQUIRE(msg[1].to_string_view() == request_msg1);
62+
REQUIRE(msg[2].to_string_view() == request_msg2);
63+
auto routing_id = msg.pop();
64+
65+
for (auto i = 0; i < response_repeat; ++i) {
66+
zmq::multipart_t response;
67+
response.add(std::move(message_t{routing_id.to_string_view()}));
68+
response.add(message_t{response_msg});
69+
co_await external.send(std::move(response));
70+
co_await Timer{5ms, io};
71+
}
72+
}
73+
};
74+
75+
76+
auto client = [&] -> corral::Task<> {
77+
auto socket = socket_t{io.get_executor(), ctx, zmq::socket_type::dealer};
78+
socket.connect(inproc_addr);
79+
80+
for (auto i = 0; i < 3; ++i) {
81+
zmq::multipart_t msg;
82+
msg.add(message_t{request_msg1});
83+
msg.add(message_t{request_msg2});
84+
co_await socket.send(std::move(msg));
85+
86+
for (auto i = 0; i < response_repeat; ++i) {
87+
auto response = co_await socket.recv_multipart();
88+
REQUIRE(response.size() == 1);
89+
REQUIRE(response[0].to_string_view() == response_msg);
90+
}
91+
}
92+
};
93+
94+
corral::run(io, corral::anyOf(client(), server()));
95+
}
96+
97+
TEST_CASE("ROUTER forwarding", "[async_corral]")
98+
{
99+
// dealer client -> external router
100+
// external router -> work dispatcher (spawn a new worker)
101+
// worker -> internal router
102+
// (forward) internal router -> external router
103+
104+
105+
boost::asio::io_context io;
106+
context_t ctx;
107+
108+
constexpr auto request_msg1 = "Test"sv;
109+
constexpr auto request_msg2 = "ing"sv;
110+
constexpr auto response_msg = "42!"sv;
111+
constexpr auto response_repeat = 2;
112+
constexpr auto inproc_external_addr =
113+
"inproc://async_corral-router_forwarding-router";
114+
constexpr auto inproc_internal_addr =
115+
"inproc://async_corral-router_forwarding-rep";
116+
117+
auto worker = [&](socket_t dealer, zmq::multipart_t msg) -> corral::Task<> {
118+
REQUIRE(msg.size() == 2);
119+
REQUIRE(msg[0].to_string_view() == request_msg1);
120+
REQUIRE(msg[1].to_string_view() == request_msg2);
121+
for (auto i = 0; i < response_repeat; ++i) {
122+
co_await dealer.send(message_t{response_msg});
123+
co_await Timer{50ms, io};
124+
}
125+
};
126+
127+
auto work_dispatcher = [&](socket_t &external) -> corral::Task<> {
128+
CORRAL_WITH_NURSERY(n)
129+
{
130+
for (;;) {
131+
auto msg = co_await external.recv_multipart();
132+
133+
auto worker_socket = socket_t{io, ctx, zmq::socket_type::dealer};
134+
worker_socket.set(zmq::sockopt::routing_id, msg[0].to_string_view());
135+
worker_socket.connect(inproc_internal_addr);
136+
msg.pop();
137+
n.start(worker, std::move(worker_socket), std::move(msg));
138+
}
139+
};
140+
};
141+
142+
auto forward = [&](socket_t &external, socket_t &internal) -> corral::Task<> {
143+
for (;;) {
144+
auto msg_from_internal = co_await internal.recv_multipart();
145+
co_await external.send(std::move(msg_from_internal));
146+
}
147+
};
148+
149+
auto server = [&] -> corral::Task<> {
150+
auto external = socket_t{io, ctx, zmq::socket_type::router};
151+
auto internal = socket_t{io, ctx, zmq::socket_type::router};
152+
153+
external.bind(inproc_external_addr);
154+
internal.bind(inproc_internal_addr);
155+
156+
co_await corral::anyOf(forward(external, internal),
157+
work_dispatcher(external));
158+
};
159+
160+
auto client = [&] -> corral::Task<> {
161+
auto socket = socket_t{io.get_executor(), ctx, zmq::socket_type::dealer};
162+
socket.connect(inproc_external_addr);
163+
164+
zmq::multipart_t msg;
165+
msg.add(message_t{request_msg1});
166+
msg.add(message_t{request_msg2});
167+
co_await socket.send(std::move(msg));
168+
169+
for (auto i = 0; i < response_repeat; ++i) {
170+
auto response = co_await socket.recv_multipart();
171+
REQUIRE(response.size() == 1);
172+
REQUIRE(response[0].to_string_view() == response_msg);
173+
}
174+
};
175+
176+
corral::run(io, corral::anyOf(client(), server()));
177+
}

tests/third_party/corral

Submodule corral added at 2739aef

vcpkg-configuration.json

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"default-registry": {
3+
"kind": "git",
4+
"baseline": "856505bb767458c99d8e3c3ed441f59a058d3687",
5+
"repository": "https://github.com/microsoft/vcpkg"
6+
},
7+
"registries": [
8+
{
9+
"kind": "artifact",
10+
"location": "https://github.com/microsoft/vcpkg-ce-catalog/archive/refs/heads/main.zip",
11+
"name": "microsoft"
12+
}
13+
]
14+
}

vcpkg.json

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"name": "cppzmq",
3+
"version-semver": "4.10.0",
4+
"features": {
5+
"coroutine": {
6+
"description": "Dependencies for enabling C++ 20 coroutine support",
7+
"dependencies": [
8+
"boost-asio"
9+
]
10+
},
11+
"test": {
12+
"description": "Dependencies for testing",
13+
"dependencies": [
14+
{
15+
"name": "catch2",
16+
"version>=": "3.5.3"
17+
}
18+
]
19+
}
20+
}
21+
}

zmq.hpp

+3
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@
6060
#if CPPZMQ_LANG >= 201703L
6161
#define ZMQ_CPP17
6262
#endif
63+
#if CPPZMQ_LANG >= 202002L
64+
#define ZMQ_CPP20
65+
#endif
6366

6467
#if defined(ZMQ_CPP14) && !defined(_MSC_VER)
6568
#define ZMQ_DEPRECATED(msg) [[deprecated(msg)]]

0 commit comments

Comments
 (0)