Skip to content

Commit aebc95f

Browse files
author
Igor Egorov
authored
Add timeout parameter to newStream and dial methods (#98)
Signed-off-by: Igor Egorov <[email protected]>
1 parent f075370 commit aebc95f

File tree

16 files changed

+201
-46
lines changed

16 files changed

+201
-46
lines changed

include/libp2p/host/basic_host/basic_host.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ namespace libp2p::host {
5454
const std::function<bool(const peer::Protocol &)> &predicate) override;
5555

5656
void newStream(const peer::PeerInfo &p, const peer::Protocol &protocol,
57-
const StreamResultHandler &handler) override;
57+
const StreamResultHandler &handler,
58+
std::chrono::milliseconds timeout) override;
5859

5960
outcome::result<void> listen(const multi::Multiaddress &ma) override;
6061

@@ -75,7 +76,8 @@ namespace libp2p::host {
7576

7677
event::Bus &getBus() override;
7778

78-
event::Handle setOnNewConnectionHandler(const NewConnectionHandler &h) const override;
79+
event::Handle setOnNewConnectionHandler(
80+
const NewConnectionHandler &h) const override;
7981

8082
private:
8183
std::shared_ptr<peer::IdentityManager> idmgr_;

include/libp2p/host/host.hpp

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#ifndef LIBP2P_HOST_HPP
77
#define LIBP2P_HOST_HPP
88

9+
#include <chrono>
910
#include <functional>
1011
#include <string_view>
1112

@@ -14,12 +15,12 @@
1415
#include <libp2p/multi/multiaddress.hpp>
1516
#include <libp2p/network/network.hpp>
1617
#include <libp2p/network/router.hpp>
18+
#include <libp2p/outcome/outcome.hpp>
1719
#include <libp2p/peer/peer_id.hpp>
1820
#include <libp2p/peer/peer_info.hpp>
1921
#include <libp2p/peer/peer_repository.hpp>
2022
#include <libp2p/peer/protocol.hpp>
2123
#include <libp2p/protocol/base_protocol.hpp>
22-
#include <libp2p/outcome/outcome.hpp>
2324

2425
namespace libp2p {
2526
/**
@@ -50,7 +51,8 @@ namespace libp2p {
5051
* @brief Stores OnNewConnectionHandler.
5152
* @param h handler function to store
5253
*/
53-
virtual event::Handle setOnNewConnectionHandler(const NewConnectionHandler &h) const = 0;
54+
virtual event::Handle setOnNewConnectionHandler(
55+
const NewConnectionHandler &h) const = 0;
5456

5557
/**
5658
* @brief Get a version of this Libp2p client
@@ -124,7 +126,22 @@ namespace libp2p {
124126
*/
125127
virtual void newStream(const peer::PeerInfo &p,
126128
const peer::Protocol &protocol,
127-
const StreamResultHandler &handler) = 0;
129+
const StreamResultHandler &handler) {
130+
newStream(p, protocol, handler, std::chrono::milliseconds(0));
131+
}
132+
133+
/**
134+
* @brief Open new stream to the peer {@param p} with protocol {@param
135+
* protocol} with a specific timeout.
136+
* @param p stream will be opened to this peer
137+
* @param protocol "speak" using this protocol
138+
* @param handler callback, will be executed on success or fail
139+
* @param timeout in milliseconds
140+
*/
141+
virtual void newStream(const peer::PeerInfo &p,
142+
const peer::Protocol &protocol,
143+
const StreamResultHandler &handler,
144+
std::chrono::milliseconds timeout) = 0;
128145

129146
/**
130147
* @brief Create listener on given multiaddress.

include/libp2p/network/dialer.hpp

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#ifndef LIBP2P_NETWORK_DIALER_HPP
77
#define LIBP2P_NETWORK_DIALER_HPP
88

9+
#include <chrono>
10+
911
#include <libp2p/connection/capable_connection.hpp>
1012
#include <libp2p/peer/peer_info.hpp>
1113
#include <libp2p/peer/protocol.hpp>
@@ -27,13 +29,28 @@ namespace libp2p::network {
2729
using StreamResultFunc = std::function<void(StreamResult)>;
2830

2931
// Establishes a connection or returns existing one to a given peer
30-
virtual void dial(const peer::PeerInfo &p, DialResultFunc cb) = 0;
32+
virtual void dial(const peer::PeerInfo &p, DialResultFunc cb) {
33+
dial(p, std::move(cb), std::chrono::milliseconds(0));
34+
}
35+
36+
// Establishes a connection or returns existing one to a given peer with a
37+
// specific timeout
38+
virtual void dial(const peer::PeerInfo &p, DialResultFunc cb,
39+
std::chrono::milliseconds timeout) = 0;
3140

3241
// NewStream returns a new stream to given peer p.
3342
// If there is no connection to p, attempts to create one.
3443
virtual void newStream(const peer::PeerInfo &p,
3544
const peer::Protocol &protocol,
36-
StreamResultFunc cb) = 0;
45+
StreamResultFunc cb) {
46+
newStream(p, protocol, std::move(cb), std::chrono::milliseconds(0));
47+
}
48+
49+
// NewStream returns a new stream to given peer p with a specific timeout.
50+
// If there is no connection to p, attempts to create one.
51+
virtual void newStream(const peer::PeerInfo &p,
52+
const peer::Protocol &protocol, StreamResultFunc cb,
53+
std::chrono::milliseconds timeout) = 0;
3754
};
3855

3956
} // namespace libp2p::network

include/libp2p/network/impl/dialer_impl.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@ namespace libp2p::network {
2424
std::shared_ptr<ListenerManager> listener);
2525

2626
// Establishes a connection to a given peer
27-
void dial(const peer::PeerInfo &p, DialResultFunc cb) override;
27+
void dial(const peer::PeerInfo &p, DialResultFunc cb,
28+
std::chrono::milliseconds timeout) override;
2829

2930
// NewStream returns a new stream to given peer p.
3031
// If there is no connection to p, attempts to create one.
3132
void newStream(const peer::PeerInfo &p, const peer::Protocol &protocol,
32-
StreamResultFunc cb) override;
33+
StreamResultFunc cb,
34+
std::chrono::milliseconds timeout) override;
3335

3436
private:
3537
std::shared_ptr<protocol_muxer::ProtocolMuxer> multiselect_;

include/libp2p/transport/tcp/tcp_connection.hpp

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,17 @@
88

99
#define BOOST_ASIO_NO_DEPRECATED
1010

11+
#include <atomic>
12+
#include <chrono>
13+
1114
#include <boost/asio.hpp>
1215
#include <boost/noncopyable.hpp>
1316
#include <libp2p/connection/raw_connection.hpp>
1417
#include <libp2p/multi/multiaddress.hpp>
1518

16-
namespace libp2p::security { class TlsAdaptor; }
19+
namespace libp2p::security {
20+
class TlsAdaptor;
21+
}
1722

1823
namespace libp2p::transport {
1924

@@ -70,6 +75,16 @@ namespace libp2p::transport {
7075
*/
7176
void connect(const ResolverResultsType &iterator, ConnectCallbackFunc cb);
7277

78+
/**
79+
* @brief Connect to a remote service with a time limit for connection
80+
* establishing.
81+
* @param iterator list of resolved IP addresses of remote service.
82+
* @param cb callback executed on operation completion.
83+
* @param timeout in milliseconds for connection establishing.
84+
*/
85+
void connect(const ResolverResultsType &iterator, ConnectCallbackFunc cb,
86+
std::chrono::milliseconds timeout);
87+
7388
void read(gsl::span<uint8_t> out, size_t bytes,
7489
ReadCallbackFunc cb) override;
7590

@@ -96,6 +111,9 @@ namespace libp2p::transport {
96111
boost::asio::io_context &context_;
97112
Tcp::socket socket_;
98113
bool initiator_ = false;
114+
bool connecting_with_timeout_ = false;
115+
std::atomic_bool connection_phase_done_;
116+
boost::asio::deadline_timer deadline_timer_;
99117

100118
boost::system::error_code handle_errcode(
101119
const boost::system::error_code &e) noexcept;

include/libp2p/transport/tcp/tcp_transport.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ namespace libp2p::transport {
3030
void dial(const peer::PeerId &remoteId, multi::Multiaddress address,
3131
TransportAdaptor::HandlerFunc handler) override;
3232

33+
void dial(const peer::PeerId &remoteId, multi::Multiaddress address,
34+
TransportAdaptor::HandlerFunc handler,
35+
std::chrono::milliseconds timeout) override;
36+
3337
std::shared_ptr<TransportListener> createListener(
3438
TransportListener::HandlerFunc handler) override;
3539

include/libp2p/transport/transport_adaptor.hpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
#include <libp2p/connection/capable_connection.hpp>
1616
#include <libp2p/event/emitter.hpp>
1717
#include <libp2p/multi/multiaddress.hpp>
18+
#include <libp2p/outcome/outcome.hpp> // for outcome::result
1819
#include <libp2p/peer/peer_id.hpp>
1920
#include <libp2p/transport/transport_listener.hpp>
20-
#include <libp2p/outcome/outcome.hpp> // for outcome::result
2121

2222
namespace libp2p::transport {
2323

@@ -34,14 +34,29 @@ namespace libp2p::transport {
3434
~TransportAdaptor() override = default;
3535

3636
/**
37-
* Try to establish connection with a peer
37+
* Try to establish connection with a peer without timeout
38+
* @param remoteId id of remote peer to dial
39+
* @param address of the peer
40+
* @param handler callback that will be executed on connection/error
41+
* @return connection in case of success, error otherwise
42+
*/
43+
virtual void dial(const peer::PeerId &remoteId, multi::Multiaddress address,
44+
HandlerFunc handler) {
45+
dial(remoteId, std::move(address), std::move(handler),
46+
std::chrono::milliseconds(0));
47+
}
48+
49+
/**
50+
* Try to establish connection with a peer with specific timeout
3851
* @param remoteId id of remote peer to dial
3952
* @param address of the peer
4053
* @param handler callback that will be executed on connection/error
54+
* @param timeout in milliseconds for connection establishing
4155
* @return connection in case of success, error otherwise
4256
*/
4357
virtual void dial(const peer::PeerId &remoteId, multi::Multiaddress address,
44-
HandlerFunc handler) = 0;
58+
HandlerFunc handler,
59+
std::chrono::milliseconds timeout) = 0;
4560

4661
/**
4762
* Create a listener for incoming connections of this Transport; in case

src/host/basic_host/basic_host.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ namespace libp2p::host {
8787

8888
void BasicHost::newStream(const peer::PeerInfo &p,
8989
const peer::Protocol &protocol,
90-
const Host::StreamResultHandler &handler) {
91-
network_->getDialer().newStream(p, protocol, handler);
90+
const Host::StreamResultHandler &handler,
91+
std::chrono::milliseconds timeout) {
92+
network_->getDialer().newStream(p, protocol, handler, timeout);
9293
}
9394

9495
outcome::result<void> BasicHost::listen(const multi::Multiaddress &ma) {
@@ -109,7 +110,8 @@ namespace libp2p::host {
109110
network_->getListener().start();
110111
}
111112

112-
event::Handle BasicHost::setOnNewConnectionHandler(const NewConnectionHandler &h) const {
113+
event::Handle BasicHost::setOnNewConnectionHandler(
114+
const NewConnectionHandler &h) const {
113115
return bus_->getChannel<network::event::OnNewConnectionChannel>().subscribe(
114116
[h{std::move(h)}](auto &&conn) {
115117
if (auto connection = conn.lock()) {

src/network/impl/dialer_impl.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212

1313
namespace libp2p::network {
1414

15-
void DialerImpl::dial(const peer::PeerInfo &p, DialResultFunc cb) {
15+
void DialerImpl::dial(const peer::PeerInfo &p, DialResultFunc cb,
16+
std::chrono::milliseconds timeout) {
1617
if (auto c = cmgr_->getBestConnectionForPeer(p.id); c != nullptr) {
1718
// we have connection to this peer
1819

@@ -50,7 +51,8 @@ namespace libp2p::network {
5051

5152
// return connection to the user
5253
cb(rconn.value());
53-
});
54+
},
55+
timeout);
5456
return;
5557
}
5658
}
@@ -61,7 +63,8 @@ namespace libp2p::network {
6163

6264
void DialerImpl::newStream(const peer::PeerInfo &p,
6365
const peer::Protocol &protocol,
64-
StreamResultFunc cb) {
66+
StreamResultFunc cb,
67+
std::chrono::milliseconds timeout) {
6568
// 1. make new connection or reuse existing
6669
this->dial(
6770
p,
@@ -105,7 +108,8 @@ namespace libp2p::network {
105108
cb(std::move(stream));
106109
});
107110
});
108-
});
111+
},
112+
timeout);
109113
}
110114

111115
DialerImpl::DialerImpl(

src/transport/tcp/tcp_connection.cpp

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,16 @@ namespace libp2p::transport {
1111

1212
TcpConnection::TcpConnection(boost::asio::io_context &ctx,
1313
boost::asio::ip::tcp::socket &&socket)
14-
: context_(ctx), socket_(std::move(socket)) {}
14+
: context_(ctx),
15+
socket_(std::move(socket)),
16+
connection_phase_done_{false},
17+
deadline_timer_(context_) {}
1518

1619
TcpConnection::TcpConnection(boost::asio::io_context &ctx)
17-
: context_(ctx), socket_(context_) {}
20+
: context_(ctx),
21+
socket_(context_),
22+
connection_phase_done_{false},
23+
deadline_timer_(context_) {}
1824

1925
outcome::result<void> TcpConnection::close() {
2026
boost::system::error_code ec;
@@ -90,13 +96,57 @@ namespace libp2p::transport {
9096
void TcpConnection::connect(
9197
const TcpConnection::ResolverResultsType &iterator,
9298
TcpConnection::ConnectCallbackFunc cb) {
93-
boost::asio::async_connect(socket_, iterator,
94-
[self{shared_from_this()}, cb{std::move(cb)}](
95-
auto &&ec, auto &&endpoint) {
96-
self->initiator_ = true;
97-
cb(std::forward<decltype(ec)>(ec),
98-
std::forward<decltype(endpoint)>(endpoint));
99-
});
99+
connect(iterator, std::move(cb), std::chrono::milliseconds::zero());
100+
}
101+
102+
void TcpConnection::connect(
103+
const TcpConnection::ResolverResultsType &iterator,
104+
ConnectCallbackFunc cb, std::chrono::milliseconds timeout) {
105+
if (timeout > std::chrono::milliseconds::zero()) {
106+
connecting_with_timeout_ = true;
107+
deadline_timer_.expires_from_now(
108+
boost::posix_time::milliseconds(timeout.count()));
109+
deadline_timer_.async_wait([self{shared_from_this()},
110+
cb](const boost::system::error_code &error) {
111+
bool expected = false;
112+
if (self->connection_phase_done_.compare_exchange_strong(expected,
113+
true)) {
114+
if (not error) {
115+
// timeout happened, timer expired before connection was
116+
// established
117+
cb(boost::system::error_code{boost::system::errc::timed_out,
118+
boost::system::generic_category()},
119+
Tcp::endpoint{});
120+
}
121+
// Another case is: boost::asio::error::operation_aborted == error
122+
// connection was established before timeout and timer has been
123+
// cancelled
124+
}
125+
});
126+
}
127+
boost::asio::async_connect(
128+
socket_, iterator,
129+
[self{shared_from_this()}, cb{std::move(cb)}](auto &&ec,
130+
auto &&endpoint) {
131+
bool expected = false;
132+
if (not self->connection_phase_done_.compare_exchange_strong(expected,
133+
true)) {
134+
BOOST_ASSERT(expected);
135+
// connection phase already done - means that user's callback was
136+
// already called by timer expiration so we are closing socket if it
137+
// was actually connected
138+
if (not ec) {
139+
self->socket_.close();
140+
}
141+
return;
142+
}
143+
if (self->connecting_with_timeout_) {
144+
self->deadline_timer_.cancel();
145+
}
146+
self->initiator_ = true;
147+
cb(std::forward<decltype(ec)>(ec),
148+
std::forward<decltype(endpoint)>(endpoint));
149+
});
100150
}
101151

102152
template <typename Callback>

0 commit comments

Comments
 (0)