Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ python scripts/test_placement_agent.py --task=Isaac-Pack-NoArm-v0 --num_envs 5

---

## Teleoperation and Imitation Learning
**Command:**
```bash
python scripts/teleop_se3_agent.py --task Isaac-Pack-UR5-Teleop-v0 --num_envs 1 --teleop_device mello
```
### Teleop Device
**Mello** is a teleoperation device designed for intuitive robot control, similar in concept to Gello. It mimics the robot's joint structure, allowing users to control the robot by physically moving the device. Joint positions are sent directly to the robot, eliminating the need for inverse kinematics or physics-based computation. Mello is especially useful for imitation learning, where human demonstrations collected via teleoperation are used to train models. Because Mello closely matches the robot’s kinematics, it enables efficient and accurate data collection for learning from demonstration.

---

## Code Formatting

We use a **pre-commit template** to automatically format your code.
Expand Down
24 changes: 24 additions & 0 deletions scripts/record_demos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""
Script to record demonstrations with Isaac Lab environments using human teleoperation.

This script allows users to record demonstrations operated by human teleoperation for a specified task.
The recorded demonstrations are stored as episodes in a hdf5 file. Users can specify the task, teleoperation
device, dataset directory, and environment stepping rate through command-line arguments.

required arguments:
--task Name of the task.

optional arguments:
-h, --help Show this help message and exit
--teleop_device Device for interacting with environment. (default: keyboard)
--dataset_file File path to export recorded demos. (default: "./datasets/dataset.hdf5")
--step_hz Environment stepping rate in Hz. (default: 30)
--num_demos Number of demonstrations to record. (default: 0)
--num_success_steps Number of continuous steps with task success for concluding a demo as successful. (default: 10)
"""

"""Launch Isaac Sim Simulator first."""
332 changes: 332 additions & 0 deletions scripts/teleop_se3_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Script to run a keyboard teleoperation with Isaac Lab manipulation environments."""

"""Launch Isaac Sim Simulator first."""

import argparse

from isaaclab.app import AppLauncher

# add argparse arguments
parser = argparse.ArgumentParser(description="Keyboard teleoperation for Isaac Lab environments.")
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to simulate.")
parser.add_argument("--teleop_device", type=str, default="keyboard", help="Device for interacting with environment")
parser.add_argument("--task", type=str, default=None, help="Name of the task.")
parser.add_argument("--sensitivity", type=float, default=1.0, help="Sensitivity factor.")
parser.add_argument(
"--enable_pinocchio",
action="store_true",
default=False,
help="Enable Pinocchio.",
)
# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()

app_launcher_args = vars(args_cli)

if args_cli.enable_pinocchio:
# Import pinocchio before AppLauncher to force the use of the version installed by IsaacLab and
# not the one installed by Isaac Sim pinocchio is required by the Pink IK controllers and the
# GR1T2 retargeter
import pinocchio # noqa: F401
if "handtracking" in args_cli.teleop_device.lower():
app_launcher_args["xr"] = True

# launch omniverse app
app_launcher = AppLauncher(app_launcher_args)
simulation_app = app_launcher.app

"""Rest everything follows."""


import gymnasium as gym
import numpy as np
import torch

import omni.log

if "handtracking" in args_cli.teleop_device.lower():
from isaacsim.xr.openxr import OpenXRSpec

from isaaclab.devices import OpenXRDevice, Se3Gamepad, Se3Keyboard, Se3SpaceMouse, Se3Mello

if args_cli.enable_pinocchio:
from isaaclab.devices.openxr.retargeters.humanoid.fourier.gr1t2_retargeter import GR1T2Retargeter
import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401
from isaaclab.devices.openxr.retargeters.manipulator import GripperRetargeter, Se3AbsRetargeter, Se3RelRetargeter
from isaaclab.managers import TerminationTermCfg as DoneTerm

import isaaclab_tasks # noqa: F401
import tote_consolidation.tasks # noqa: F401
from isaaclab_tasks.manager_based.manipulation.lift import mdp
from isaaclab_tasks.utils import parse_env_cfg


smoothed_command = 0.0
alpha = 0.1


def pre_process_actions(
teleop_data: tuple[np.ndarray, bool] | tuple[np.ndarray, float] | list[tuple[np.ndarray, np.ndarray, np.ndarray]], num_envs: int, device: str
) -> torch.Tensor:
"""Convert teleop data to the format expected by the environment action space.

Args:
teleop_data: Data from the teleoperation device.
num_envs: Number of environments.
device: Device to create tensors on.

Returns:
Processed actions as a tensor.
"""
# compute actions based on environment
if "Reach" in args_cli.task:
delta_pose, gripper_command = teleop_data
# convert to torch
delta_pose = torch.tensor(delta_pose, dtype=torch.float, device=device).repeat(num_envs, 1)
# note: reach is the only one that uses a different action space
# compute actions
return delta_pose
elif "PickPlace-GR1T2" in args_cli.task:
(left_wrist_pose, right_wrist_pose, hand_joints) = teleop_data[0]
# Reconstruct actions_arms tensor with converted positions and rotations
actions = torch.tensor(
np.concatenate([
left_wrist_pose, # left ee pose
right_wrist_pose, # right ee pose
hand_joints, # hand joint angles
]),
device=device,
dtype=torch.float32,
).unsqueeze(0)
# Concatenate arm poses and hand joint angles
return actions
elif "Pack-UR5" in args_cli.task:
joint_pos, gripper_raw_command = teleop_data
joint_pos = torch.tensor(joint_pos, dtype=torch.float, device=device).repeat(num_envs, 1)

global smoothed_command

# Clamp positive values
gripper_raw_command = 0 if gripper_raw_command > 0 else gripper_raw_command

# Update smoothed command
smoothed_command = 0 if gripper_raw_command == 0 else alpha * gripper_raw_command + (1 - alpha) * smoothed_command

print(f"smoothed_command: {smoothed_command}")

max_value = 0.785398
min_gripper = -4045

gripper_joints = torch.tensor([
smoothed_command * (max_value / min_gripper), # finger_joint
smoothed_command * (max_value / min_gripper), # right_outer_knuckle_joint
smoothed_command * (max_value / -1*min_gripper) + max_value, # right_outer_finger_joint
smoothed_command * (max_value / -1*min_gripper) + max_value, # left_outer_finger_joint
smoothed_command * (max_value / min_gripper), # left_inner_finger_pad_joint
smoothed_command * (max_value / min_gripper), # right_inner_finger_pad_joint
-max_value, # left_inner_finger_joint
-max_value # right_inner_finger_joint
], dtype=torch.float,
device=device
).repeat(num_envs, 1)


# gripper_vel = torch.zeros((delta_pose.shape[0], 1), dtype=torch.float, device=device)
# gripper_vel[:] = -1 if gripper_command else 1
# compute actions
return torch.concat([joint_pos, gripper_joints], dim=1)
'''gripper_state = 1 if gripper_command else 0
# compute actions
return delta_pose
#return delta_pose'''
else:
# resolve gripper command
delta_pose, gripper_command = teleop_data
# convert to torch
delta_pose = torch.tensor(delta_pose, dtype=torch.float, device=device).repeat(num_envs, 1)
gripper_vel = torch.zeros((delta_pose.shape[0], 1), dtype=torch.float, device=device)
gripper_vel[:] = 1 if gripper_command else 0
# compute actions
return torch.concat([delta_pose, gripper_vel], dim=1)


def main():
"""Running keyboard teleoperation with Isaac Lab manipulation environment."""
# parse configuration
env_cfg = parse_env_cfg(args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs)
env_cfg.env_name = args_cli.task
# modify configuration
env_cfg.terminations.time_out = None
if "Lift" in args_cli.task:
# set the resampling time range to large number to avoid resampling
env_cfg.commands.object_pose.resampling_time_range = (1.0e9, 1.0e9)
# add termination condition for reaching the goal otherwise the environment won't reset
env_cfg.terminations.object_reached_goal = DoneTerm(func=mdp.object_reached_goal)
# create environment
env = gym.make(args_cli.task, cfg=env_cfg).unwrapped
# check environment name (for reach , we don't allow the gripper)
if "Reach" in args_cli.task:
omni.log.warn(
f"The environment '{args_cli.task}' does not support gripper control. The device command will be ignored."
)

# Flags for controlling teleoperation flow
should_reset_recording_instance = False
teleoperation_active = True

# Callback handlers
def reset_recording_instance():
"""Reset the environment to its initial state.

This callback is triggered when the user presses the reset key (typically 'R').
It's useful when:
- The robot gets into an undesirable configuration
- The user wants to start over with the task
- Objects in the scene need to be reset to their initial positions

The environment will be reset on the next simulation step.
"""
nonlocal should_reset_recording_instance
should_reset_recording_instance = True

def start_teleoperation():
"""Activate teleoperation control of the robot.

This callback enables active control of the robot through the input device.
It's typically triggered by a specific gesture or button press and is used when:
- Beginning a new teleoperation session
- Resuming control after temporarily pausing
- Switching from observation mode to control mode

While active, all commands from the device will be applied to the robot.
"""
nonlocal teleoperation_active
teleoperation_active = True

def stop_teleoperation():
"""Deactivate teleoperation control of the robot.

This callback temporarily suspends control of the robot through the input device.
It's typically triggered by a specific gesture or button press and is used when:
- Taking a break from controlling the robot
- Repositioning the input device without moving the robot
- Pausing to observe the scene without interference

While inactive, the simulation continues to render but device commands are ignored.
"""
nonlocal teleoperation_active
teleoperation_active = False

# create controller
if args_cli.teleop_device.lower() == "keyboard":
teleop_interface = Se3Keyboard(
pos_sensitivity=0.5 * args_cli.sensitivity, rot_sensitivity=0.5 * args_cli.sensitivity
)
elif args_cli.teleop_device.lower() == "spacemouse":
teleop_interface = Se3SpaceMouse(
pos_sensitivity=0.05 * args_cli.sensitivity, rot_sensitivity=0.05 * args_cli.sensitivity
)
elif args_cli.teleop_device.lower() == "mello":
teleop_interface = Se3Mello()
elif args_cli.teleop_device.lower() == "gamepad":
teleop_interface = Se3Gamepad(
pos_sensitivity=0.1 * args_cli.sensitivity, rot_sensitivity=0.1 * args_cli.sensitivity
)
elif "dualhandtracking_abs" in args_cli.teleop_device.lower() and "GR1T2" in args_cli.task:
# Create GR1T2 retargeter with desired configuration
gr1t2_retargeter = GR1T2Retargeter(
enable_visualization=True,
num_open_xr_hand_joints=2 * (int(OpenXRSpec.HandJointEXT.XR_HAND_JOINT_LITTLE_TIP_EXT) + 1),
device=env.unwrapped.device,
hand_joint_names=env.scene["robot"].data.joint_names[-22:],
)

# Create hand tracking device with retargeter
teleop_interface = OpenXRDevice(
env_cfg.xr,
retargeters=[gr1t2_retargeter],
)
teleop_interface.add_callback("RESET", reset_recording_instance)
teleop_interface.add_callback("START", start_teleoperation)
teleop_interface.add_callback("STOP", stop_teleoperation)

# Hand tracking needs explicit start gesture to activate
teleoperation_active = False

elif "handtracking" in args_cli.teleop_device.lower():
# Create EE retargeter with desired configuration
if "_abs" in args_cli.teleop_device.lower():
retargeter_device = Se3AbsRetargeter(
bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT, zero_out_xy_rotation=True
)
else:
retargeter_device = Se3RelRetargeter(
bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT, zero_out_xy_rotation=True
)

grip_retargeter = GripperRetargeter(bound_hand=OpenXRDevice.TrackingTarget.HAND_RIGHT)

# Create hand tracking device with retargeter (in a list)
teleop_interface = OpenXRDevice(
env_cfg.xr,
retargeters=[retargeter_device, grip_retargeter],
)
teleop_interface.add_callback("RESET", reset_recording_instance)
teleop_interface.add_callback("START", start_teleoperation)
teleop_interface.add_callback("STOP", stop_teleoperation)

# Hand tracking needs explicit start gesture to activate
teleoperation_active = False
else:
raise ValueError(
f"Invalid device interface '{args_cli.teleop_device}'. Supported: 'keyboard', 'spacemouse', 'mello', 'gamepad',"
" 'handtracking', 'handtracking_abs'."
)

# add teleoperation key for env reset (for all devices)
teleop_interface.add_callback("R", reset_recording_instance)
print(teleop_interface)

# reset environment
env.reset()
teleop_interface.reset()

# simulate environment
while simulation_app.is_running():
# run everything in inference mode
with torch.inference_mode():
# get device command
teleop_data = teleop_interface.advance()

# Only apply teleop commands when active

if teleoperation_active:
print(f"Teleoperation data: {teleop_data}")
# compute actions based on environment
actions = pre_process_actions(teleop_data, env.num_envs, env.device)
print(f"Actions: {actions}")
# apply actions
env.step(actions)
else:
env.sim.render()

if should_reset_recording_instance:
env.reset()
should_reset_recording_instance = False

# close the simulator
env.close()


if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()
3 changes: 3 additions & 0 deletions scripts/zero_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def main():
with torch.inference_mode():
# compute zero actions
actions = torch.zeros(env.action_space.shape, device=env.unwrapped.device)

actions[:, 6]= 0
print("[INFO] Actions:", actions)
# apply actions
env.step(actions)

Expand Down
Loading