Skip to content

Async/await and events executor in animation server #704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion bitbots_misc/bitbots_utils/bitbots_utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from typing import Any
from threading import Thread
from typing import Any, Callable

import rclpy
import yaml
Expand All @@ -11,6 +12,7 @@
from rcl_interfaces.srv import GetParameters, SetParameters
from rclpy.node import Node
from rclpy.parameter import PARAMETER_SEPARATOR_STRING, Parameter, parameter_value_to_python
from rclpy.task import Future

# Create a new @nobeartype decorator disabling type-checking.
nobeartype = beartype(conf=BeartypeConf(strategy=BeartypeStrategy.O0))
Expand Down Expand Up @@ -194,3 +196,43 @@ def parse_parameter_dict(*, namespace: str, parameter_dict: dict) -> list[Parame
parameter = Parameter(name=full_param_name, value=param_value)
parameters.append(parameter.to_parameter_msg())
return parameters


async def async_wait_for(node: Node, rel_time: float):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature async_wait_for(mynode, 10) can easily be misunderstood as waiting for mynode to become available, or similar. Therefore, I suggest the following solutions:

  • rename to async_wait
  • rename to async_wait_for_seconds
  • change to async_wait_for(rel_time: float, node: Node)
  • change to async_wait_for(seconds: float, node: Node)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe sleep_until?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method sleep_until sounds like it expects a end-time, not a duration...

What do you think about using Duration instead of seconds: float? This might be cleaner and more ROSy, but is not really necessary and a bit overhead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it as a duration at one point, but I didn't like the verbosity. But I agree that it would be more like the rclpy API.

What do you think of sleep_for?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YES! Does that need async as well in the name or is that clear through other means?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add async, but it should show as a coroutine in the signature as well

"""
ROS 2 does not provide an async sleep function, so we implement our own using a timer.
This function will wait for the specified relative time in seconds.

:param node: The ROS2 node to create the timer on.
:param rel_time: The relative time in seconds to wait.
:return: None
"""
future = Future()
rel_time = max(rel_time, 0.0)

def done_waiting():
future.set_result(None)

timer = node.create_timer(rel_time, done_waiting, clock=node.get_clock())
await future
timer.cancel()
node.destroy_timer(timer)


async def async_run_thread(func: Callable[[], Any]) -> None:
"""
Allows the usage of blocking functions in an async context.

Spawns a thread to run the function and returns a Future that will be set when the function is done.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem to return anything, so the comment is wrong. Otherwise, the type annotation is wrong and I have not yet understood async-await in Python...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it doesn't return. But we should return, I will fix it.

"""
future = Future()

def thread_func():
try:
future.set_result(func())
except Exception as e:
future.set_exception(e)

thread = Thread(target=thread_func)
thread.start()
await future
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import numpy as np
import rclpy
from bitbots_utils.transforms import quat2fused
from bitbots_utils.utils import async_wait_for
from rclpy.action import ActionServer, GoalResponse
from rclpy.action.server import ServerGoalHandle
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.duration import Duration
from rclpy.executors import ExternalShutdownException, MultiThreadedExecutor
from rclpy.executors import ExternalShutdownException
from rclpy.experimental.events_executor import EventsExecutor
from rclpy.node import Node
from sensor_msgs.msg import Imu, JointState
from simpleeval import simple_eval
Expand Down Expand Up @@ -65,11 +65,9 @@ def __init__(self):
)
traceback.print_exc()

callback_group = ReentrantCallbackGroup()

# Subscribers
self.create_subscription(JointState, "joint_states", self.update_current_pose, 1, callback_group=callback_group)
self.create_subscription(Imu, "imu/data", self.imu_callback, 1, callback_group=callback_group)
self.create_subscription(JointState, "joint_states", self.update_current_pose, 1)
self.create_subscription(Imu, "imu/data", self.imu_callback, 1)

# Service clients
self.hcm_animation_mode = self.create_client(SetBool, "play_animation_mode")
Expand All @@ -79,7 +77,7 @@ def __init__(self):

# Action server for playing animations
self._action_server = ActionServer(
self, PlayAnimation, "animation", self.execute_cb, callback_group=callback_group, goal_callback=self.goal_cb
self, PlayAnimation, "animation", self.execute_cb, goal_callback=self.goal_cb
)

# Service to temporarily add an animation to the cache
Expand Down Expand Up @@ -125,7 +123,7 @@ def goal_cb(self, request: PlayAnimation.Goal) -> GoalResponse:
# Everything is fine we are good to go
return GoalResponse.ACCEPT

def execute_cb(self, goal: ServerGoalHandle) -> PlayAnimation.Result:
async def execute_cb(self, goal: ServerGoalHandle) -> PlayAnimation.Result:
"""This is called, when the action should be executed."""

# Convert goal id uuid to hashable tuple (custom UUID type)
Expand Down Expand Up @@ -165,16 +163,16 @@ def finish(successful: bool) -> PlayAnimation.Result:

# Send request to make the HCM to go into animation play mode
num_tries = 0
while rclpy.ok() and (not self.hcm_animation_mode.call(SetBool.Request(data=True)).success): # type: ignore[attr-defined]
while rclpy.ok() and (not (await self.hcm_animation_mode.call_async(SetBool.Request(data=True))).success): # type: ignore[attr-defined]
if num_tries >= 10:
self.get_logger().error("Failed to request HCM to go into animation play mode")
return finish(successful=False)
num_tries += 1
self.get_clock().sleep_for(Duration(seconds=0.1))
await async_wait_for(self, 0.1)

# Make sure we have our current joint states
while rclpy.ok() and self.current_joint_states is None:
self.get_clock().sleep_for(Duration(seconds=0.1))
await async_wait_for(self, 0.1)

# Create splines
animator = SplineAnimator(
Expand Down Expand Up @@ -263,7 +261,9 @@ def finish(successful: bool) -> PlayAnimation.Result:
if pose is None or (request.bounds and once and t > animator.get_keyframe_times()[request.end - 1]):
# Animation is finished, tell it to the hcm (except if it is from the hcm)
if not request.hcm:
hcm_result: SetBool.Response = self.hcm_animation_mode.call(SetBool.Request(data=False))
hcm_result: SetBool.Response = await self.hcm_animation_mode.call_async(
SetBool.Request(data=False)
)
if not hcm_result.success:
self.get_logger().error(f"Failed to finish animation on HCM. Reason: {hcm_result.message}")

Expand All @@ -283,7 +283,10 @@ def finish(successful: bool) -> PlayAnimation.Result:

once = True

self.get_clock().sleep_until(last_time + Duration(seconds=0.02))
execution_duration = (self.get_clock().now() - last_time).nanoseconds / 1e9

await async_wait_for(self, execution_duration + 0.02) # Wait for the next iteration

except (ExternalShutdownException, KeyboardInterrupt):
sys.exit(0)
return finish(successful=False)
Expand Down Expand Up @@ -339,7 +342,7 @@ def add_animation(self, request: AddAnimation.Request, response: AddAnimation.Re
def main(args=None):
rclpy.init(args=args)
node = AnimationNode()
ex = MultiThreadedExecutor(num_threads=10)
ex = EventsExecutor()
ex.add_node(node)
try:
ex.spin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ def perform(self, reevaluate=False):
if not self.blackboard.visualization_active and not self.blackboard.simulation_active:
req = SetBool.Request()
req.data = False
self.blackboard.motor_switch_service.call(req)
self.blackboard.motor_switch_service.call_async(req)
return self.pop()
Loading