Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions rosgraph_monitor/include/rosgraph_monitor/monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef ROSGRAPH_MONITOR__MONITOR_HPP_
#define ROSGRAPH_MONITOR__MONITOR_HPP_

#include <future>
#include <functional>
#include <thread>
#include <map>
#include <memory>
Expand All @@ -31,6 +33,9 @@
#include "rclcpp/logger.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/time.hpp"
#include "rcl_interfaces/msg/parameter_descriptor.hpp"
#include "rcl_interfaces/msg/parameter_type.hpp"
#include "rcl_interfaces/msg/list_parameters_result.hpp"
#include "rosgraph_monitor_msgs/msg/topic_statistics.hpp"
#include "rosgraph_monitor_msgs/msg/graph.hpp"
#include "rosgraph_monitor_msgs/msg/qos_profile.hpp"
Expand All @@ -39,6 +44,12 @@

typedef std::array<uint8_t, RMW_GID_STORAGE_SIZE> RosRmwGid;

typedef std::shared_future<void> QueryParamsReturnType;
typedef std::function<QueryParamsReturnType(
const std::string & node_name,
std::function<void (const rcl_interfaces::msg::ListParametersResult &)>
callback)> QueryParams;

/// @brief Provide a std::hash specialization so we can use RMW GID as a map key
template<>
struct std::hash<RosRmwGid>
Expand Down Expand Up @@ -106,15 +117,18 @@ class RosGraphMonitor
{
public:
/// @brief Constructor
/// @param config Includes/excludes the entities to care about in diagnostic reporting
/// @param now_fn Function to fetch the current time as defined in the owning context
/// @param node_graph Interface from owning Node to retrieve information about the ROS graph
/// @param now_fn Function to fetch the current time as defined in the owning context
/// @param logger
/// @param config Includes/excludes the entities to care about in diagnostic reporting
/// @param query_params Function to query parameters of a node by name
RosGraphMonitor(
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
std::function<rclcpp::Time()> now_fn,
rclcpp::Logger logger,
GraphMonitorConfiguration config = GraphMonitorConfiguration{});
QueryParams query_params,
GraphMonitorConfiguration config = GraphMonitorConfiguration{}
);

virtual ~RosGraphMonitor();

Expand Down Expand Up @@ -142,16 +156,29 @@ class RosGraphMonitor

/// @brief Set callback function to be called when graph changes
/// @param callback Function to call when graph updates occur
void set_graph_change_callback(std::function<void()> callback);
void set_graph_change_callback(std::function<void(rosgraph_monitor_msgs::msg::Graph &)> callback);

protected:
/* Types */

struct ParameterTracking
{
std::string name;
uint8_t type;

rcl_interfaces::msg::ParameterDescriptor to_msg() const;
};


/// @brief Keeps flags for tracking observed nodes over time
struct NodeTracking
{
std::string name;
bool missing = false;
bool stale = false;
std::vector<ParameterTracking> params;

explicit NodeTracking(const std::string & name);
};

/// @brief Keeps aggregate info about a topic as a whole over time
Expand Down Expand Up @@ -233,6 +260,10 @@ class RosGraphMonitor
const std::string & message,
const std::string & subname) const;

/// @brief Query parameters for a newly discovered node
/// @param node_name The name of the node to query parameters for
void query_node_parameters(const std::string & node_name);

/* Members */

// Configuration
Expand All @@ -248,6 +279,8 @@ class RosGraphMonitor
Event update_event_;
std::function<void()> graph_change_callback_;

QueryParams query_params_;

// Graph cache
std::unordered_map<std::string, NodeTracking> nodes_;
EndpointTrackingMap publishers_;
Expand All @@ -261,6 +294,7 @@ class RosGraphMonitor
std::unordered_map<std::string, TopicTracking> topic_endpoint_counts_;
std::unordered_set<std::string> pubs_with_no_subs_; // a.k.a. "leaf topics"
std::unordered_set<std::string> subs_with_no_pubs_; // a.k.a. "dead sinks"
std::unordered_map<std::string, std::shared_future<void>> params_futures;
};

} // namespace rosgraph_monitor
Expand Down
8 changes: 7 additions & 1 deletion rosgraph_monitor/include/rosgraph_monitor/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define ROSGRAPH_MONITOR__NODE_HPP_

#include <memory>
#include <string>
#include <vector>

#include "diagnostic_msgs/msg/diagnostic_array.hpp"
Expand All @@ -30,6 +31,8 @@
namespace rosgraph_monitor
{

constexpr int SERVICE_TIMEOUT_S = 5;

class Node : public rclcpp::Node
{
private:
Expand All @@ -43,7 +46,10 @@ class Node : public rclcpp::Node
void update_params(const rosgraph_monitor::Params & params);
void on_topic_statistics(const rosgraph_monitor_msgs::msg::TopicStatistics::SharedPtr msg);
void publish_diagnostics();
void publish_rosgraph();
void publish_rosgraph(rosgraph_monitor_msgs::msg::Graph rosgraph_msg);
QueryParamsReturnType query_params(
const std::string & node_name,
std::function<void(const rcl_interfaces::msg::ListParametersResult &)> callback);

rosgraph_monitor::ParamListener param_listener_;
rosgraph_monitor::Params params_;
Expand Down
74 changes: 68 additions & 6 deletions rosgraph_monitor/src/monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <string>
#include <utility>
#include <vector>
#include <memory>

#include "rclcpp/logging.hpp"

Expand Down Expand Up @@ -125,6 +126,19 @@ rosgraph_monitor_msgs::msg::QosProfile to_msg(
return qos_msg;
}

rcl_interfaces::msg::ParameterDescriptor RosGraphMonitor::ParameterTracking::to_msg() const
{
rcl_interfaces::msg::ParameterDescriptor param_msg;
param_msg.name = name;
// TODO(troy): Actual type info will be populated in future PR
param_msg.type = rcl_interfaces::msg::ParameterType::PARAMETER_NOT_SET;
return param_msg;
}


RosGraphMonitor::NodeTracking::NodeTracking(const std::string & name)
: name(name) {}

RosGraphMonitor::EndpointTracking::EndpointTracking(
const std::string & topic_name,
const rclcpp::TopicEndpointInfo & info,
Expand All @@ -151,12 +165,15 @@ RosGraphMonitor::RosGraphMonitor(
rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
std::function<rclcpp::Time()> now_fn,
rclcpp::Logger logger,
GraphMonitorConfiguration config)
QueryParams query_params,
GraphMonitorConfiguration config
)
: config_(config),
now_fn_(now_fn),
node_graph_(node_graph),
logger_(logger),
graph_change_event_(node_graph->get_graph_event())
graph_change_event_(node_graph->get_graph_event()),
query_params_(query_params)
{
update_graph();
watch_thread_ = std::thread(std::bind(&RosGraphMonitor::watch_for_updates, this));
Expand All @@ -169,6 +186,9 @@ RosGraphMonitor::~RosGraphMonitor()
graph_change_event_->set();
node_graph_->notify_shutdown();
update_event_.set();

params_futures.clear();

watch_thread_.join();
}

Expand Down Expand Up @@ -209,16 +229,21 @@ void RosGraphMonitor::track_node_updates(
continue;
}

auto [it, inserted] = nodes_.emplace(node_name, NodeTracking{});
NodeTracking tracking{node_name};
auto [it, inserted] = nodes_.emplace(
node_name, tracking);

if (inserted) {
RCLCPP_DEBUG(logger_, "New node: %s", node_name.c_str());
query_node_parameters(node_name);
} else {
NodeTracking & tracking = it->second;
tracking.stale = false;
if (tracking.missing) {
RCLCPP_INFO(logger_, "Node %s came back", node_name.c_str());
tracking.missing = false;
returned_nodes_.insert(node_name);
query_node_parameters(node_name);
}
}
}
Expand Down Expand Up @@ -653,6 +678,10 @@ void RosGraphMonitor::fill_rosgraph_msg(rosgraph_monitor_msgs::msg::Graph & msg)
rosgraph_monitor_msgs::msg::NodeInfo node_msg;
node_msg.name = node_name;

for (const auto & param : node_info.params) {
node_msg.parameters.push_back(param.to_msg());
}

// Add publishers for this node
for (auto & [gid, tracking] : publishers_) {
if (tracking.node_name == node_name) {
Expand All @@ -668,14 +697,47 @@ void RosGraphMonitor::fill_rosgraph_msg(rosgraph_monitor_msgs::msg::Graph & msg)
node_msg.subscriptions.push_back(topic_msg);
}
}

msg.nodes.push_back(node_msg);
}
}

void RosGraphMonitor::set_graph_change_callback(std::function<void()> callback)
void RosGraphMonitor::set_graph_change_callback(
std::function<void(rosgraph_monitor_msgs::msg::Graph &)> callback)
{
graph_change_callback_ = callback;
graph_change_callback_ = [callback, this]() {
rosgraph_monitor_msgs::msg::Graph msg;
fill_rosgraph_msg(msg);
callback(msg);
};
}

void RosGraphMonitor::query_node_parameters(const std::string & node_name)
{
// Non-blocking async call for parameter query. Hold onto the future to track completion.
params_futures[node_name] = query_params_(
node_name,
[this, node_name_copy = std::string(node_name)](
const rcl_interfaces::msg::ListParametersResult & result) {
RCLCPP_INFO(
logger_, "Got parameters for node %s: %zu", node_name_copy.c_str(),
result.names.size());
auto it = nodes_.find(node_name_copy);
if (it == nodes_.end()) {
RCLCPP_WARN(logger_, "Node %s not found in tracking map", node_name_copy.c_str());
return;
}
auto & tracking = it->second;
tracking.params.clear();
tracking.params.reserve(result.names.size());
for (const auto & param_name : result.names) {
tracking.params.push_back(
ParameterTracking{param_name,
rcl_interfaces::msg::ParameterType::PARAMETER_NOT_SET});
}
if (!tracking.params.empty()) {
graph_change_callback_();
}
});
}


Expand Down
62 changes: 55 additions & 7 deletions rosgraph_monitor/src/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
#include <unordered_set>
#include <utility>
#include <vector>
#include <string>

#include "rclcpp/node.hpp"
#include "rclcpp/parameter_client.hpp"
#include "rclcpp_components/register_node_macro.hpp"
#include "rosgraph_monitor_msgs/msg/graph.hpp"
#include "rosgraph_monitor_msgs/msg/topic_statistics.hpp"
Expand Down Expand Up @@ -67,6 +69,8 @@ Node::Node(const rclcpp::NodeOptions & options)
get_node_graph_interface(),
[this]() {return get_clock()->now();},
get_logger().get_child("rosgraph"),
std::bind(
&Node::query_params, this, std::placeholders::_1, std::placeholders::_2),
create_graph_monitor_config(params_)),
sub_topic_statistics_(
create_subscription<rosgraph_monitor_msgs::msg::TopicStatistics>(
Expand All @@ -90,10 +94,10 @@ Node::Node(const rclcpp::NodeOptions & options)
param_listener_.setUserCallback(std::bind(&Node::update_params, this, std::placeholders::_1));

// Set up callback to publish rosgraph when nodes change
graph_monitor_.set_graph_change_callback(std::bind(&Node::publish_rosgraph, this));

// Publish initial rosgraph state
publish_rosgraph();
graph_monitor_.set_graph_change_callback(
std::bind(
&Node::publish_rosgraph, this,
std::placeholders::_1));
}

void Node::update_params(const rosgraph_monitor::Params & params)
Expand All @@ -102,6 +106,52 @@ void Node::update_params(const rosgraph_monitor::Params & params)
graph_monitor_.config() = create_graph_monitor_config(params_);
}

std::shared_future<void> Node::query_params(
const std::string & node_name,
std::function<void(const rcl_interfaces::msg::ListParametersResult &)> callback)
{
auto param_client = std::make_shared<rclcpp::AsyncParametersClient>(
this->get_node_base_interface(),
this->get_node_topics_interface(),
this->get_node_graph_interface(),
this->get_node_services_interface(),
node_name
);

return std::async(
std::launch::async,
[param_client, node_name, callback]() -> void {
bool params_received = false;
while (!params_received && rclcpp::ok()) {
if (!param_client->wait_for_service(std::chrono::seconds(SERVICE_TIMEOUT_S))) {
RCLCPP_WARN(
rclcpp::get_logger("rosgraph_monitor"),
"Parameter service for node %s not available, retrying in %d seconds",
node_name.c_str(), SERVICE_TIMEOUT_S);
rclcpp::sleep_for(std::chrono::seconds(SERVICE_TIMEOUT_S));
continue;
}

auto list_parameters = param_client->list_parameters({}, 0);

if (list_parameters.wait_for(std::chrono::seconds(SERVICE_TIMEOUT_S)) !=
std::future_status::ready)
{
RCLCPP_WARN(
rclcpp::get_logger("rosgraph_monitor"),
"Parameter query for node %s timed out, retrying in %d seconds", node_name.c_str(),
SERVICE_TIMEOUT_S);
rclcpp::sleep_for(std::chrono::seconds(SERVICE_TIMEOUT_S));
continue;
}

params_received = true;
callback(list_parameters.get());
}
});
}


void Node::on_topic_statistics(const rosgraph_monitor_msgs::msg::TopicStatistics::SharedPtr msg)
{
graph_monitor_.on_topic_statistics(*msg);
Expand All @@ -118,10 +168,8 @@ void Node::publish_diagnostics()
pub_diagnostics_->publish(std::move(diagnostic_array));
}

void Node::publish_rosgraph()
void Node::publish_rosgraph(rosgraph_monitor_msgs::msg::Graph rosgraph_msg)
{
rosgraph_monitor_msgs::msg::Graph rosgraph_msg;
graph_monitor_.fill_rosgraph_msg(rosgraph_msg);
pub_rosgraph_->publish(std::move(rosgraph_msg));
}

Expand Down
Loading
Loading