Skip to content

RST-13741 Adding service client timeouts #48

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: locus-noetic-devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions clients/roscpp/include/ros/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ class ROSCPP_DECL Connection : public boost::enable_shared_from_this<Connection>
std::string getCallerId();
std::string getRemoteString();

/**
* \brief Cancels any active read operation
*/
void stopRead();

private:
/**
* \brief Called by the Transport when there is data available to be read
Expand Down
8 changes: 4 additions & 4 deletions clients/roscpp/include/ros/node_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -1236,10 +1236,10 @@ if (service) // Enter if advertised service is valid
*/
template<class MReq, class MRes>
ServiceClient serviceClient(const std::string& service_name, bool persistent = false,
const M_string& header_values = M_string())
const M_string& header_values = M_string(), ros::Duration timeout = ros::Duration(-1.0))
{
ServiceClientOptions ops;
ops.template init<MReq, MRes>(service_name, persistent, header_values);
ops.template init<MReq, MRes>(service_name, persistent, header_values, timeout);
return serviceClient(ops);
}

Expand All @@ -1256,10 +1256,10 @@ if (service) // Enter if advertised service is valid
*/
template<class Service>
ServiceClient serviceClient(const std::string& service_name, bool persistent = false,
const M_string& header_values = M_string())
const M_string& header_values = M_string(), ros::Duration timeout = ros::Duration(-1.0))
{
ServiceClientOptions ops;
ops.template init<Service>(service_name, persistent, header_values);
ops.template init<Service>(service_name, persistent, header_values, timeout);
return serviceClient(ops);
}

Expand Down
17 changes: 9 additions & 8 deletions clients/roscpp/include/ros/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include <string>
#include "ros/common.h"
#include "ros/duration.h"
#include "ros/message.h"
#include "ros/forwards.h"
#include "ros/node_handle.h"
Expand Down Expand Up @@ -62,11 +63,11 @@ namespace service
* @return true on success, false otherwise.
*/
template<class MReq, class MRes>
bool call(const std::string& service_name, MReq& req, MRes& res)
bool call(const std::string& service_name, MReq& req, MRes& res, const ros::Duration& timeout = ros::Duration(-1.0))
{
namespace st = service_traits;
NodeHandle nh;
ServiceClientOptions ops(ros::names::resolve(service_name), st::md5sum(req), false, M_string());
ServiceClientOptions ops(ros::names::resolve(service_name), st::md5sum(req), false, M_string(), timeout);
ServiceClient client = nh.serviceClient(ops);
return client.call(req, res);
}
Expand All @@ -82,12 +83,12 @@ bool call(const std::string& service_name, MReq& req, MRes& res)
* @return true on success, false otherwise.
*/
template<class Service>
bool call(const std::string& service_name, Service& service)
bool call(const std::string& service_name, Service& service, const ros::Duration& timeout = ros::Duration(-1.0))
{
namespace st = service_traits;

NodeHandle nh;
ServiceClientOptions ops(ros::names::resolve(service_name), st::md5sum(service), false, M_string());
ServiceClientOptions ops(ros::names::resolve(service_name), st::md5sum(service), false, M_string(), timeout);
ServiceClient client = nh.serviceClient(ops);
return client.call(service.request, service.response);
}
Expand Down Expand Up @@ -131,10 +132,10 @@ ROSCPP_DECL bool exists(const std::string& service_name, bool print_failure_reas
* @param header_values Key/value pairs you'd like to send along in the connection handshake
*/
template<class MReq, class MRes>
ServiceClient createClient(const std::string& service_name, bool persistent = false, const M_string& header_values = M_string())
ServiceClient createClient(const std::string& service_name, bool persistent = false, const M_string& header_values = M_string(), const ros::Duration& timeout = ros::Duration(-1.0))
{
NodeHandle nh;
ServiceClient client = nh.template serviceClient<MReq, MRes>(ros::names::resolve(service_name), persistent, header_values);
ServiceClient client = nh.template serviceClient<MReq, MRes>(ros::names::resolve(service_name), persistent, header_values, timeout);
return client;
}

Expand All @@ -149,10 +150,10 @@ ServiceClient createClient(const std::string& service_name, bool persistent = fa
* @param header_values Key/value pairs you'd like to send along in the connection handshake
*/
template<class Service>
ServiceClient createClient(const std::string& service_name, bool persistent = false, const M_string& header_values = M_string())
ServiceClient createClient(const std::string& service_name, bool persistent = false, const M_string& header_values = M_string(), const ros::Duration& timeout = ros::Duration(-1.0))
{
NodeHandle nh;
ServiceClient client = nh.template serviceClient<Service>(ros::names::resolve(service_name), persistent, header_values);
ServiceClient client = nh.template serviceClient<Service>(ros::names::resolve(service_name), persistent, header_values, timeout);
return client;
}

Expand Down
3 changes: 2 additions & 1 deletion clients/roscpp/include/ros/service_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ROSCPP_DECL ServiceClient
{
public:
ServiceClient() {}
ServiceClient(const std::string& service_name, bool persistent, const M_string& header_values, const std::string& service_md5sum);
ServiceClient(const std::string& service_name, bool persistent, const M_string& header_values, const std::string& service_md5sum, const ros::Duration& timeout = ros::Duration(-1.0));
ServiceClient(const ServiceClient& rhs);
~ServiceClient();
ServiceClient& operator=(const ServiceClient& other) = default;
Expand Down Expand Up @@ -200,6 +200,7 @@ class ROSCPP_DECL ServiceClient
M_string header_values_;
std::string service_md5sum_;
bool is_shutdown_;
double timeout_sec_ { -1.0 };
};
typedef boost::shared_ptr<Impl> ImplPtr;
typedef boost::weak_ptr<Impl> ImplWPtr;
Expand Down
13 changes: 9 additions & 4 deletions clients/roscpp/include/ros/service_client_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ namespace ros
struct ROSCPP_DECL ServiceClientOptions
{
ServiceClientOptions()
: persistent(false)
: persistent(false),
timeout(-1.0)
{
}

Expand All @@ -52,11 +53,12 @@ struct ROSCPP_DECL ServiceClientOptions
* \param _persistent Whether or not to keep the connection open to the service for future calls
* \param _header Any extra values to be passed along in the connection header
*/
ServiceClientOptions(const std::string& _service, const std::string& _md5sum, bool _persistent, const M_string& _header)
ServiceClientOptions(const std::string& _service, const std::string& _md5sum, bool _persistent, const M_string& _header, const ros::Duration& _timeout = ros::Duration(-1.0))
: service(_service)
, md5sum(_md5sum)
, persistent(_persistent)
, header(_header)
, timeout(_timeout)
{
}

Expand All @@ -69,14 +71,15 @@ struct ROSCPP_DECL ServiceClientOptions
* \param _header Any extra values to be passed along in the connection header
*/
template <class MReq, class MRes>
void init(const std::string& _service, bool _persistent, const M_string& _header)
void init(const std::string& _service, bool _persistent, const M_string& _header, const ros::Duration& _timeout = ros::Duration(-1.0))
{
namespace st = service_traits;

service = _service;
md5sum = st::md5sum<MReq>();
persistent = _persistent;
header = _header;
timeout = _timeout;
}

/*
Expand All @@ -87,20 +90,22 @@ struct ROSCPP_DECL ServiceClientOptions
* \param _header Any extra values to be passed along in the connection header
*/
template <class Service>
void init(const std::string& _service, bool _persistent, const M_string& _header)
void init(const std::string& _service, bool _persistent, const M_string& _header, const ros::Duration& _timeout = ros::Duration(-1.0))
{
namespace st = service_traits;

service = _service;
md5sum = st::md5sum<Service>();
persistent = _persistent;
header = _header;
timeout = _timeout;
}

std::string service; ///< Service to connect to
std::string md5sum; ///< Service md5sum
bool persistent; ///< Whether or not the connection should persist
M_string header; ///< Extra key/value pairs to add to the connection header
ros::Duration timeout; ///< Timeout for the service call. -1 (< 0) means no timeout
};


Expand Down
4 changes: 3 additions & 1 deletion clients/roscpp/include/ros/service_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,15 @@ class ROSCPP_DECL ServiceManager
* @param persistent Whether to keep this connection alive for more than one service call
* @param request_md5sum The md5sum of the request message
* @param response_md5sum The md5sum of the response message
* @param timeout The time, in seconds, until the service times out (-1 to never time out)
*
* @returns Shared pointer to the ServiceServerLink, empty shared pointer if none is found.
*/
ServiceServerLinkPtr createServiceServerLink(const std::string& service,
bool persistent,
const std::string& request_md5sum, const std::string& response_md5sum,
const M_string& header_values);
const M_string& header_values,
double timeout_sec = -1.0);

/** @brief Remove the specified service client from our list
*
Expand Down
17 changes: 12 additions & 5 deletions clients/roscpp/include/ros/service_server_link.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ROSCPP_DECL ServiceServerLink : public boost::enable_shared_from_this<Serv

bool finished_;
boost::condition_variable finished_condition_;
boost::mutex finished_mutex_;
boost::mutex mutex_;
boost::thread::id caller_thread_id_;

bool success_;
Expand All @@ -79,7 +79,7 @@ class ROSCPP_DECL ServiceServerLink : public boost::enable_shared_from_this<Serv

public:
typedef std::map<std::string, std::string> M_string;
ServiceServerLink(const std::string& service_name, bool persistent, const std::string& request_md5sum, const std::string& response_md5sum, const M_string& header_values);
ServiceServerLink(const std::string& service_name, bool persistent, const std::string& request_md5sum, const std::string& response_md5sum, const M_string& header_values, double timeout_sec = -1.0);
virtual ~ServiceServerLink();

//
Expand Down Expand Up @@ -116,7 +116,7 @@ class ROSCPP_DECL ServiceServerLink : public boost::enable_shared_from_this<Serv
* \brief Called when the currently queued call has finished. Clears out the current call, notifying it that it
* has finished, then calls processNextCall()
*/
void callFinished();
void callFinished(CallInfoPtr info);
/**
* \brief Pops the next call off the queue if one is available. If this is a non-persistent connection and the queue is empty
* it will also drop the connection.
Expand All @@ -131,10 +131,15 @@ class ROSCPP_DECL ServiceServerLink : public boost::enable_shared_from_this<Serv
*/
void cancelCall(const CallInfoPtr& info);

/**
* \brief Utility method for handling cases where our most recent service call has timed out
*/
void handleTimeout(const CallInfoPtr& info);

void onHeaderWritten(const ConnectionPtr& conn);
void onRequestWritten(const ConnectionPtr& conn);
void onResponseOkAndLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success);
void onResponse(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success);
void onResponseOkAndLength(CallInfoPtr info, const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success);
void onResponse(CallInfoPtr info, const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success);

ConnectionPtr connection_;
std::string service_name_;
Expand All @@ -152,6 +157,8 @@ class ROSCPP_DECL ServiceServerLink : public boost::enable_shared_from_this<Serv
CallInfoPtr current_call_;

bool dropped_;

double timeout_sec_;
};
typedef boost::shared_ptr<ServiceServerLink> ServiceServerLinkPtr;

Expand Down
16 changes: 16 additions & 0 deletions clients/roscpp/src/libros/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,4 +479,20 @@ std::string Connection::getRemoteString()
return ss.str();
}

void Connection::stopRead()
{
boost::recursive_mutex::scoped_lock lock(read_mutex_);

if (has_read_callback_)
{
read_callback_.clear();
read_buffer_.reset();
read_size_ = 0;
read_filled_ = 0;
has_read_callback_ = 0;
}

transport_->disableRead();
}

}
2 changes: 1 addition & 1 deletion clients/roscpp/src/libros/node_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ ServiceServer NodeHandle::advertiseService(AdvertiseServiceOptions& ops)
ServiceClient NodeHandle::serviceClient(ServiceClientOptions& ops)
{
ops.service = resolveName(ops.service);
ServiceClient client(ops.service, ops.persistent, ops.header, ops.md5sum);
ServiceClient client(ops.service, ops.persistent, ops.header, ops.md5sum, ops.timeout);

if (client)
{
Expand Down
9 changes: 5 additions & 4 deletions clients/roscpp/src/libros/service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,18 @@ bool ServiceClient::Impl::isValid() const
return server_link_->isValid();
}

ServiceClient::ServiceClient(const std::string& service_name, bool persistent, const M_string& header_values, const std::string& service_md5sum)
ServiceClient::ServiceClient(const std::string& service_name, bool persistent, const M_string& header_values, const std::string& service_md5sum, const ros::Duration& timeout)
: impl_(new Impl)
{
impl_->name_ = service_name;
impl_->persistent_ = persistent;
impl_->header_values_ = header_values;
impl_->service_md5sum_ = service_md5sum;
impl_->timeout_sec_ = timeout.toSec();

if (persistent)
{
impl_->server_link_ = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, impl_->service_md5sum_, impl_->service_md5sum_, impl_->header_values_);
impl_->server_link_ = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, impl_->service_md5sum_, impl_->service_md5sum_, impl_->header_values_, impl_->timeout_sec_);
}
}

Expand Down Expand Up @@ -120,7 +121,7 @@ bool ServiceClient::call(const SerializedMessage& req, SerializedMessage& resp,
{
if (!impl_->server_link_)
{
impl_->server_link_ = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_);
impl_->server_link_ = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_, impl_->timeout_sec_);

if (!impl_->server_link_)
{
Expand All @@ -132,7 +133,7 @@ bool ServiceClient::call(const SerializedMessage& req, SerializedMessage& resp,
}
else
{
link = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_);
link = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_, impl_->timeout_sec_);

if (!link)
{
Expand Down
4 changes: 2 additions & 2 deletions clients/roscpp/src/libros/service_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ ServicePublicationPtr ServiceManager::lookupServicePublication(const std::string

ServiceServerLinkPtr ServiceManager::createServiceServerLink(const std::string& service, bool persistent,
const std::string& request_md5sum, const std::string& response_md5sum,
const M_string& header_values)
const M_string& header_values, double timeout_sec)
{

boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
Expand All @@ -260,7 +260,7 @@ ServiceServerLinkPtr ServiceManager::createServiceServerLink(const std::string&

if (transport->connect(serv_host, serv_port))
{
ServiceServerLinkPtr client(boost::make_shared<ServiceServerLink>(service, persistent, request_md5sum, response_md5sum, header_values));
ServiceServerLinkPtr client(boost::make_shared<ServiceServerLink>(service, persistent, request_md5sum, response_md5sum, header_values, timeout_sec));

{
boost::mutex::scoped_lock lock(service_server_links_mutex_);
Expand Down
Loading