ur_robot_driver/scripts/ur3e_actions.py
2025-05-28 00:25:11 +08:00

279 lines
11 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# Copyright (c) 2024 FZI Forschungszentrum Informatik
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the {copyright_holder} nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Author: Felix Exner
# This is an example of how to interface the robot without any additional ROS components. For
# real-life applications, we do recommend to use something like MoveIt!
import time
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from builtin_interfaces.msg import Duration
from action_msgs.msg import GoalStatus
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from control_msgs.action import FollowJointTrajectory
from control_msgs.msg import JointTolerance
TRAJECTORIES = {
"traj0": [
{
"positions": [0.785, -1.57, 0.785, 0.785, 0.785, 0.785],
"velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
"time_from_start": Duration(sec=6, nanosec=0),
},
{
"positions": [0.0, -1.57, 0.0, 0.0, 0.0, 0.0],
"velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
"time_from_start": Duration(sec=12, nanosec=0),
},
],
"traj1": [
{
"positions": [0.0, -1.57, 0.0, 0.0, 0.0, 0.0],
"velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
"time_from_start": Duration(sec=0, nanosec=0),
},
{
"positions": [0.0, -1.57, 0.0, 0.0, -0.785, 0.0],
"velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
"time_from_start": Duration(sec=5, nanosec=0),
},
],
"traj2": [
{
"positions": [0.0, -1.57, 0.0, 0.0, -0.785, 0.0],
"velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
"time_from_start": Duration(sec=0, nanosec=0),
},
{
"positions": [0.0, -1.57, 0.0, 0.0, 0.0, 0.0],
"velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
"time_from_start": Duration(sec=6, nanosec=0),
},
],
# "traj0": [
# {
# "positions": [-4.3, -3.0, 0.0, -1.57, 1.57, 0.67],
# "velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
# "time_from_start": Duration(sec=15, nanosec=0),
# },
# {
# "positions": [0.0, -1.57, 0.0, 0.0, 0.0, 0.0],
# "velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
# "time_from_start": Duration(sec=30, nanosec=0),
# },
# ],
}
class JTCClient(Node):
"""Small test client for the jtc."""
def __init__(self, name):
super().__init__(name)
self.declare_parameter("controller_name", "scaled_joint_trajectory_controller")
self.declare_parameter(
"joints",
[
"shoulder_pan_joint",
"shoulder_lift_joint",
"elbow_joint",
"wrist_1_joint",
"wrist_2_joint",
"wrist_3_joint",
],
)
controller_name = self.get_parameter("controller_name").value + "/follow_joint_trajectory"
self.joints = self.get_parameter("joints").value
if self.joints is None or len(self.joints) == 0:
raise Exception('"joints" parameter is required')
self._action_client = ActionClient(self, FollowJointTrajectory, controller_name)
self.get_logger().info(f"Waiting for action server on {controller_name}")
self._action_client.wait_for_server()
self.parse_trajectories()
self.i = 0
self._send_goal_future = None
self._get_result_future = None
# wait fot the previous loop succdessful.
# self.rclpy.sleep(2)
# this will execute immediately after you call this node - turn it off for now so that it will execute it while camera detect sth.
#-----------------------------------------------------------------------------------------------
# self.execute_next_trajectory()
def parse_trajectories(self):
self.goals = {}
for traj_name in TRAJECTORIES:
goal = JointTrajectory()
goal.joint_names = self.joints
for pt in TRAJECTORIES[traj_name]:
point = JointTrajectoryPoint()
point.positions = pt["positions"]
point.velocities = pt["velocities"]
point.time_from_start = pt["time_from_start"]
goal.points.append(point)
self.goals[traj_name] = goal
def execute_next_trajectory(self):
if self.i >= len(self.goals):
self.get_logger().info("Done with all trajectories")
raise SystemExit
traj_name = list(self.goals)[self.i]
self.i = self.i + 1
if traj_name:
self.execute_trajectory(traj_name)
def execute_trajectory(self, traj_name):
self.get_logger().info(f"Executing trajectory {traj_name}")
goal = FollowJointTrajectory.Goal()
goal.trajectory = self.goals[traj_name]
# ───── send goal ─────────────────────────────────────────
self._send_goal_future = self._action_client.send_goal_async(goal)
# === create future for returning ====================
result_future = rclpy.task.Future()
def _on_goal_response(send_fut):
"""after receiving GoalHandle, wait for real result"""
goal_handle = send_fut.result()
# if canceledput the exception into result_future
if not goal_handle.accepted:
result_future.set_exception(RuntimeError("Goal rejected :("))
return
# let old result callback still workingif wanna keep it for log
self.goal_response_callback(send_fut)
# wait for all the trajectory movement ends.
get_result_fut = goal_handle.get_result_async()
# give me the inner future to the one for returning
def _on_result(res_fut):
result_future.set_result(res_fut.result())
get_result_fut.add_done_callback(_on_result)
# after giving goal wait for goal to response
self._send_goal_future.add_done_callback(_on_goal_response)
# -------★ return this future ★--------------------------
return result_future
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error("Goal rejected :(")
raise RuntimeError("Goal rejected :(")
self.get_logger().debug("Goal accepted :)")
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
status = future.result().status
self.get_logger().info(f"Done with result: {self.status_to_str(status)}")
# if this don't comment - it will automatically run the next movement until no traj anymore.
#-----------------------------------------------------------------------------------------------
# if status == GoalStatus.STATUS_SUCCEEDED:
# # time.sleep(2)
# self.execute_next_trajectory()
# else:
# if result.error_code != FollowJointTrajectory.Result.SUCCESSFUL:
# self.get_logger().error(
# f"Done with result: {self.error_code_to_str(result.error_code)}"
# )
# raise RuntimeError("Executing trajectory failed. " + result.error_string)
@staticmethod
def error_code_to_str(error_code):
if error_code == FollowJointTrajectory.Result.SUCCESSFUL:
return "SUCCESSFUL"
if error_code == FollowJointTrajectory.Result.INVALID_GOAL:
return "INVALID_GOAL"
if error_code == FollowJointTrajectory.Result.INVALID_JOINTS:
return "INVALID_JOINTS"
if error_code == FollowJointTrajectory.Result.OLD_HEADER_TIMESTAMP:
return "OLD_HEADER_TIMESTAMP"
if error_code == FollowJointTrajectory.Result.PATH_TOLERANCE_VIOLATED:
return "PATH_TOLERANCE_VIOLATED"
if error_code == FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED:
return "GOAL_TOLERANCE_VIOLATED"
@staticmethod
def status_to_str(error_code):
if error_code == GoalStatus.STATUS_UNKNOWN:
return "UNKNOWN"
if error_code == GoalStatus.STATUS_ACCEPTED:
return "ACCEPTED"
if error_code == GoalStatus.STATUS_EXECUTING:
return "EXECUTING"
if error_code == GoalStatus.STATUS_CANCELING:
return "CANCELING"
if error_code == GoalStatus.STATUS_SUCCEEDED:
return "SUCCEEDED"
if error_code == GoalStatus.STATUS_CANCELED:
return "CANCELED"
if error_code == GoalStatus.STATUS_ABORTED:
return "ABORTED"
def main(args=None):
rclpy.init(args=args)
jtc_client = JTCClient('ur3e_actions')
try:
rclpy.spin(jtc_client)
except RuntimeError as err:
jtc_client.get_logger().error(str(err))
except SystemExit:
rclpy.logging.get_logger("jtc_client").info("Done")
jtc_client.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()