Skip to content

Commit

Permalink
create a function to get parameter names in humble, handel no response
Browse files Browse the repository at this point in the history
  • Loading branch information
ipa-rwu committed Feb 15, 2024
1 parent 33b2f75 commit b8c20c1
Showing 1 changed file with 46 additions and 30 deletions.
76 changes: 46 additions & 30 deletions ros2model/api/runtime_parser/rosmodel_runtime_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,23 +257,7 @@ def get_parameters(self, node, include_hidden_nodes=False):

sorted_names = []
if version_str == "humble":
from rcl_interfaces.srv import ListParameters
from ros2service.api import get_service_names

service_names = get_service_names(
node=node, include_hidden_services=include_hidden_nodes
)
service_name = f"{self.name.full_name}/list_parameters"

if service_name in service_names:
client = node.create_client(ListParameters, service_name)
if client.service_is_ready():
request = ListParameters.Request()
future = client.call_async(request)
rclpy.spin_until_future_complete(node, future, timeout_sec=1.0)
if future.result() != None:
response = future.result()
sorted_names = sorted(response.result.names)
sorted_names = self.get_parameter_names_humble(node, include_hidden_nodes)

if version_str == "rolling":
response = call_list_parameters(node=node, node_name=self.name.full_name)
Expand All @@ -282,20 +266,52 @@ def get_parameters(self, node, include_hidden_nodes=False):
sorted_names = sorted(param_names)

regex_filter = re.compile(ParameterReg)
sorted_names = [name for name in sorted_names if not regex_filter.match(name)]
des_resp = call_describe_parameters(
node=node,
node_name=self.name.full_name,
parameter_names=sorted_names,
)
for descriptor in des_resp.descriptors:
self.parameter.append(
ROSModel.Parameter(
name=descriptor.name,
namespace=self.name.namespace,
type=get_parameter_type_string(descriptor.type),
)
if len(sorted_names) != 0:
sorted_names = [
name for name in sorted_names if not regex_filter.match(name)
]
des_resp = call_describe_parameters(
node=node,
node_name=self.name.full_name,
parameter_names=sorted_names,
)
for descriptor in des_resp.descriptors:
self.parameter.append(
ROSModel.Parameter(
name=descriptor.name,
namespace=self.name.namespace,
type=get_parameter_type_string(descriptor.type),
)
)

def get_parameter_names_humble(self, node, include_hidden_nodes=False):
from rcl_interfaces.srv import ListParameters
from ros2service.api import get_service_names

sorted_names = []
service_names = get_service_names(
node=node, include_hidden_services=include_hidden_nodes
)
service_name = f"{self.name.full_name}/list_parameters"

if service_name in service_names:
client = node.create_client(ListParameters, service_name)
if client.service_is_ready():
request = ListParameters.Request()
future = client.call_async(request)
rclpy.spin_until_future_complete(node, future, timeout_sec=1.0)
if future.result() != None:
response = future.result()
if response is None:
try:
e = future.exception()
except:
error = RuntimeError(
f"Exception while calling service of node '{self.name.full_name}': {e}"
)
else:
sorted_names = sorted(response.result.names)
return sorted_names


def get_node_graph_names():
Expand Down

0 comments on commit b8c20c1

Please sign in to comment.