Skip to content

Coroutine revamp #311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 19 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b090727
feat: add coroutine-based read and write methods to LoopbackStream an…
kamilsa Jun 4, 2025
1826f2a
feat: implement coroutine-based read and write methods for QuicConnec…
kamilsa Jun 4, 2025
b64d82d
feat: add coroutine-based read and write methods to InsecureReadWrite…
kamilsa Jun 4, 2025
6a60616
feat: add coroutine-based read and write methods to YamuxStream and Y…
kamilsa Jun 4, 2025
ac0915d
feat: add coroutine-based muxConnection method to Yamux and MuxerAdaptor
kamilsa Jun 5, 2025
bce210b
feat: add coroutine-based selectOneOf method for protocol negotiation…
kamilsa Jun 5, 2025
b215f4a
feat: implement coroutine-based selectOneOf method in Multiselect for…
kamilsa Jun 5, 2025
2a41bf0
feat: add coroutine-based HandshakeCoro for Noise protocol
kamilsa Jun 5, 2025
804b94a
feat: clang-format
kamilsa Jun 5, 2025
f42ef25
feat: add coroutine-based upgradeToSecureInbound and upgradeToSecureO…
kamilsa Jun 5, 2025
05b380e
feat: add coroutine-based secureInboundCoro and secureOutboundCoro me…
kamilsa Jun 5, 2025
8f30d17
feat: implement coroutine-based read, readSome, and writeSome methods…
kamilsa Jun 5, 2025
eab2d64
feat: add coroutine-based upgradeInbound and upgradeOutbound methods …
kamilsa Jun 5, 2025
6ac5947
feat: add coroutine-based upgradeLayersInboundCoro, upgradeLayersOutb…
kamilsa Jun 5, 2025
a3793ed
feat: add coroutine-based upgradeInboundCoro and upgradeOutboundCoro …
kamilsa Jun 5, 2025
8b42e70
feat: add coroutine-based connect methods in TcpConnection
kamilsa Jun 5, 2025
4738bd2
feat: add coroutine-based newStreamCoroutine method in YamuxedConnection
kamilsa Jun 5, 2025
bb54f93
feat: add coroutine-based listenCoroutine and asyncAccept methods in …
kamilsa Jun 5, 2025
103f962
feat: add coroutine-based onConnectionCoro method and update listenCo…
kamilsa Jun 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions include/libp2p/basic/message_read_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <memory>

#include <boost/asio/awaitable.hpp>
#include <libp2p/basic/readwriter.hpp>

namespace libp2p::basic {
Expand Down Expand Up @@ -37,5 +38,18 @@ namespace libp2p::basic {
* Quantity of bytes written is passed as an argument in case of success
*/
virtual void write(BytesIn buffer, Writer::WriteCallbackFunc cb) = 0;

/**
* Reads a message that is prepended with its length (coroutine version)
* @return awaitable with result containing read bytes or an error
*/
virtual boost::asio::awaitable<ReadCallback> read() = 0;

/**
* Writes a message and preprends its length (coroutine version)
* @param buffer - bytes to be written
* @return awaitable with result containing number of bytes written or an error
*/
virtual boost::asio::awaitable<outcome::result<size_t>> write(BytesIn buffer) = 0;
};
} // namespace libp2p::basic
8 changes: 8 additions & 0 deletions include/libp2p/basic/reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <functional>
#include <vector>

#include <boost/asio/awaitable.hpp>
#include <libp2p/common/types.hpp>
#include <libp2p/outcome/outcome.hpp>

Expand Down Expand Up @@ -54,6 +55,13 @@ namespace libp2p::basic {
* @param res read result
* @param cb callback
*/

virtual boost::asio::awaitable<outcome::result<size_t>> read(
BytesOut out, size_t bytes) = 0;

virtual boost::asio::awaitable<outcome::result<size_t>> readSome(
BytesOut out, size_t bytes) = 0;

virtual void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) = 0;
};
Expand Down
4 changes: 4 additions & 0 deletions include/libp2p/basic/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#pragma once

#include <boost/asio/awaitable.hpp>
#include <functional>

#include <libp2p/common/types.hpp>
Expand Down Expand Up @@ -34,6 +35,9 @@ namespace libp2p::basic {
*/
virtual void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) = 0;

virtual boost::asio::awaitable<std::error_code> writeSome(BytesIn in,
size_t bytes) = 0;

/**
* @brief Defers reporting error state to callback to avoid reentrancy
* (i.e. callback will not be called before initiator function returns)
Expand Down
7 changes: 7 additions & 0 deletions include/libp2p/connection/capable_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ namespace libp2p::connection {
*/
virtual void newStream(StreamHandlerFunc cb) = 0;

/**
* @brief Opens new stream in a coroutine manner
* @return Awaitable result of a new Stream or error
*/
virtual boost::asio::awaitable<outcome::result<std::shared_ptr<Stream>>>
newStreamCoroutine() = 0;

/**
* @brief Set a handler, which is called, when a new stream arrives from the
* other side
Expand Down
5 changes: 5 additions & 0 deletions include/libp2p/connection/loopback_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ namespace libp2p::connection {
outcome::result<libp2p::multi::Multiaddress> remoteMultiaddr()
const override;

// Coroutine-based methods
boost::asio::awaitable<outcome::result<size_t>> read(BytesOut out, size_t bytes) override;
boost::asio::awaitable<outcome::result<size_t>> readSome(BytesOut out, size_t bytes) override;
boost::asio::awaitable<std::error_code> writeSome(BytesIn in, size_t bytes) override;

protected:
void read(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;

Expand Down
18 changes: 18 additions & 0 deletions include/libp2p/layer/layer_adaptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ namespace libp2p::layer {
std::shared_ptr<connection::LayerConnection> conn,
LayerConnCallbackFunc cb) const = 0;

/**
* Coroutine version of upgradeInbound
* @param conn - connection to be upgraded
* @return result with upgraded connection or error
*/
virtual boost::asio::awaitable<outcome::result<std::shared_ptr<connection::LayerConnection>>> upgradeInbound(
std::shared_ptr<connection::LayerConnection> conn) const = 0;

/**
* Make a next-layer connection from the current-layer one, using this
* adaptor
Expand All @@ -42,5 +50,15 @@ namespace libp2p::layer {
const multi::Multiaddress &address,
std::shared_ptr<connection::LayerConnection> conn,
LayerConnCallbackFunc cb) const = 0;

/**
* Coroutine version of upgradeOutbound
* @param address - multiaddress of the remote peer
* @param conn - connection to be upgraded
* @return result with upgraded connection or error
*/
virtual boost::asio::awaitable<outcome::result<std::shared_ptr<connection::LayerConnection>>> upgradeOutbound(
const multi::Multiaddress &address,
std::shared_ptr<connection::LayerConnection> conn) const = 0;
};
} // namespace libp2p::layer
7 changes: 7 additions & 0 deletions include/libp2p/layer/websocket/ssl_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ namespace libp2p::connection {
void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

boost::asio::awaitable<outcome::result<size_t>> read(BytesOut out,
size_t bytes) override;
boost::asio::awaitable<outcome::result<size_t>> readSome(
BytesOut out, size_t bytes) override;
boost::asio::awaitable<std::error_code> writeSome(BytesIn in,
size_t bytes) override;

private:
std::shared_ptr<LayerConnection> connection_;
std::shared_ptr<boost::asio::ssl::context> ssl_context_;
Expand Down
8 changes: 8 additions & 0 deletions include/libp2p/layer/websocket/ws_adaptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <libp2p/layer/websocket/ws_connection.hpp>
#include <libp2p/layer/websocket/ws_connection_config.hpp>
#include <libp2p/network/connection_manager.hpp>
#include <boost/asio/awaitable.hpp>

namespace libp2p::layer {

Expand All @@ -25,10 +26,17 @@ namespace libp2p::layer {
void upgradeInbound(std::shared_ptr<connection::LayerConnection> conn,
LayerConnCallbackFunc cb) const override;

boost::asio::awaitable<outcome::result<std::shared_ptr<connection::LayerConnection>>>
upgradeInbound(std::shared_ptr<connection::LayerConnection> conn) const override;

void upgradeOutbound(const multi::Multiaddress &address,
std::shared_ptr<connection::LayerConnection> conn,
LayerConnCallbackFunc cb) const override;

boost::asio::awaitable<outcome::result<std::shared_ptr<connection::LayerConnection>>>
upgradeOutbound(const multi::Multiaddress &address,
std::shared_ptr<connection::LayerConnection> conn) const override;

private:
std::shared_ptr<basic::Scheduler> scheduler_;
std::shared_ptr<boost::asio::io_context> io_context_;
Expand Down
8 changes: 8 additions & 0 deletions include/libp2p/layer/websocket/ws_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#pragma once

#include <boost/beast/websocket.hpp>
#include <boost/asio/awaitable.hpp>

#include <libp2p/basic/scheduler.hpp>
#include <libp2p/common/metrics/instance_count.hpp>
Expand Down Expand Up @@ -79,6 +80,13 @@ namespace libp2p::connection {

void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

boost::asio::awaitable<outcome::result<size_t>> read(BytesOut out,
size_t bytes) override;
boost::asio::awaitable<outcome::result<size_t>> readSome(
BytesOut out, size_t bytes) override;
boost::asio::awaitable<std::error_code> writeSome(BytesIn in,
size_t bytes) override;

private:
void setTimerPing();
void onPong(BytesIn payload);
Expand Down
8 changes: 8 additions & 0 deletions include/libp2p/layer/websocket/wss_adaptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#pragma once

#include <libp2p/layer/layer_adaptor.hpp>
#include <boost/asio/awaitable.hpp>

namespace boost::asio {
class io_context;
Expand Down Expand Up @@ -36,10 +37,17 @@ namespace libp2p::layer {
void upgradeInbound(std::shared_ptr<connection::LayerConnection> conn,
LayerConnCallbackFunc cb) const override;

boost::asio::awaitable<outcome::result<std::shared_ptr<connection::LayerConnection>>>
upgradeInbound(std::shared_ptr<connection::LayerConnection> conn) const override;

void upgradeOutbound(const multi::Multiaddress &address,
std::shared_ptr<connection::LayerConnection> conn,
LayerConnCallbackFunc cb) const override;

boost::asio::awaitable<outcome::result<std::shared_ptr<connection::LayerConnection>>>
upgradeOutbound(const multi::Multiaddress &address,
std::shared_ptr<connection::LayerConnection> conn) const override;

private:
std::shared_ptr<boost::asio::io_context> io_context_;
WssCertificate server_certificate_;
Expand Down
5 changes: 5 additions & 0 deletions include/libp2p/muxer/mplex/mplex_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <mutex>

#include <boost/asio/streambuf.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/noncopyable.hpp>
#include <libp2p/connection/stream.hpp>
#include <libp2p/log/logger.hpp>
Expand Down Expand Up @@ -61,6 +62,10 @@ namespace libp2p::connection {

void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

boost::asio::awaitable<outcome::result<size_t>> read(BytesOut out, size_t bytes) override;
boost::asio::awaitable<outcome::result<size_t>> readSome(BytesOut out, size_t bytes) override;
boost::asio::awaitable<std::error_code> writeSome(BytesIn in, size_t bytes) override;

bool isClosed() const override;

void close(VoidResultHandlerFunc cb) override;
Expand Down
4 changes: 4 additions & 0 deletions include/libp2p/muxer/mplex/mplexed_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ namespace libp2p::connection {
void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;

boost::asio::awaitable<outcome::result<size_t>> read(BytesOut out, size_t bytes) override;
boost::asio::awaitable<outcome::result<size_t>> readSome(BytesOut out, size_t bytes) override;
boost::asio::awaitable<std::error_code> writeSome(BytesIn in, size_t bytes) override;

void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;
void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;
Expand Down
10 changes: 10 additions & 0 deletions include/libp2p/muxer/muxer_adaptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,15 @@ namespace libp2p::muxer {
virtual void muxConnection(
std::shared_ptr<connection::SecureConnection> conn,
CapConnCallbackFunc cb) const = 0;

/**
* Make a muxed connection from the secure one, using this adaptor
* (coroutine version)
* @param conn - connection to be upgraded
* @return awaitable with result containing upgraded connection or error
*/
virtual boost::asio::awaitable<
outcome::result<std::shared_ptr<connection::CapableConnection>>>
muxConnection(std::shared_ptr<connection::SecureConnection> conn) const = 0;
};
} // namespace libp2p::muxer
11 changes: 11 additions & 0 deletions include/libp2p/muxer/yamux/yamux.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ namespace libp2p::muxer {
void muxConnection(std::shared_ptr<connection::SecureConnection> conn,
CapConnCallbackFunc cb) const override;

/**
* Make a muxed connection from the secure one, using this adaptor
* (coroutine version)
* @param conn - connection to be upgraded
* @return awaitable with result containing upgraded connection or error
*/
boost::asio::awaitable<
outcome::result<std::shared_ptr<connection::CapableConnection>>>
muxConnection(
std::shared_ptr<connection::SecureConnection> conn) const override;

private:
MuxedConnectionConfig config_;
std::shared_ptr<basic::Scheduler> scheduler_;
Expand Down
8 changes: 8 additions & 0 deletions include/libp2p/muxer/yamux/yamux_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ namespace libp2p::connection {

outcome::result<multi::Multiaddress> remoteMultiaddr() const override;

// Coroutine-based methods
boost::asio::awaitable<outcome::result<size_t>> read(BytesOut out,
size_t bytes) override;
boost::asio::awaitable<outcome::result<size_t>> readSome(
BytesOut out, size_t bytes) override;
boost::asio::awaitable<std::error_code> writeSome(BytesIn in,
size_t bytes) override;

/// Increases send window. Called from Connection
void increaseSendWindow(size_t delta);

Expand Down
11 changes: 11 additions & 0 deletions include/libp2p/muxer/yamux/yamuxed_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ namespace libp2p::connection {

void newStream(StreamHandlerFunc cb) override;

boost::asio::awaitable<outcome::result<std::shared_ptr<Stream>>>
newStreamCoroutine();

void onStream(NewStreamHandlerFunc cb) override;

outcome::result<peer::PeerId> localPeer() const override;
Expand All @@ -78,6 +81,14 @@ namespace libp2p::connection {
ReadCallbackFunc cb) override;
void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

// Coroutine-based methods
boost::asio::awaitable<outcome::result<size_t>> read(BytesOut out,
size_t bytes) override;
boost::asio::awaitable<outcome::result<size_t>> readSome(
BytesOut out, size_t bytes) override;
boost::asio::awaitable<std::error_code> writeSome(BytesIn in,
size_t bytes) override;

private:
using Streams = std::unordered_map<StreamId, std::shared_ptr<YamuxStream>>;

Expand Down
7 changes: 7 additions & 0 deletions include/libp2p/network/impl/listener_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ namespace libp2p::network {

outcome::result<void> listen(const multi::Multiaddress &ma) override;

boost::asio::awaitable<outcome::result<void>> listenCoroutine(
const multi::Multiaddress &ma) override;

std::vector<multi::Multiaddress> getListenAddresses() const override;

std::vector<multi::Multiaddress> getListenAddressesInterfaces()
Expand All @@ -49,6 +52,10 @@ namespace libp2p::network {
outcome::result<std::shared_ptr<connection::CapableConnection>> rconn)
override;

void onConnectionCoro(
outcome::result<std::shared_ptr<connection::CapableConnection>> rconn)
override;

private:
bool started = false;

Expand Down
6 changes: 6 additions & 0 deletions include/libp2p/network/listener_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ namespace libp2p::network {
*/
virtual outcome::result<void> listen(const multi::Multiaddress &ma) = 0;

virtual boost::asio::awaitable<outcome::result<void>> listenCoroutine(
const multi::Multiaddress &ma) = 0;
/**
* @brief Returns an unmodified list of addresses, added by user.
*/
Expand All @@ -98,6 +100,10 @@ namespace libp2p::network {
virtual void onConnection(
outcome::result<std::shared_ptr<connection::CapableConnection>>
rconn) = 0;

virtual void onConnectionCoro(
outcome::result<std::shared_ptr<connection::CapableConnection>>
rconn) = 0;
};

} // namespace libp2p::network
7 changes: 7 additions & 0 deletions include/libp2p/protocol_muxer/multiselect.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ namespace libp2p::protocol_muxer::multiselect {
bool negotiate_multiselect,
ProtocolHandlerFunc cb) override;

/// Implements coroutine version of ProtocolMuxer API
boost::asio::awaitable<outcome::result<peer::ProtocolName>> selectOneOf(
std::span<const peer::ProtocolName> protocols,
std::shared_ptr<basic::ReadWriter> connection,
bool is_initiator,
bool negotiate_multistream) override;

/// Simple single stream negotiate procedure
void simpleStreamNegotiate(
const std::shared_ptr<connection::Stream> &stream,
Expand Down
Loading
Loading