#!/usr/bin/env python3 # Copyright (c) 2024 FZI Forschungszentrum Informatik # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # * Neither the name of the {copyright_holder} nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # # Author: Felix Exner # This is an example of how to interface the robot without any additional ROS components. For # real-life applications, we do recommend to use something like MoveIt! import time import rclpy from rclpy.action import ActionClient from rclpy.node import Node from builtin_interfaces.msg import Duration from action_msgs.msg import GoalStatus from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint from control_msgs.action import FollowJointTrajectory from control_msgs.msg import JointTolerance TRAJECTORIES = { "traj0": [ { "positions": [0.785, -1.57, 0.785, 0.785, 0.785, 0.785], "velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], "time_from_start": Duration(sec=6, nanosec=0), }, { "positions": [0.0, -1.57, 0.0, 0.0, 0.0, 0.0], "velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], "time_from_start": Duration(sec=12, nanosec=0), }, ], "traj1": [ { "positions": [0.0, -1.57, 0.0, 0.0, 0.0, 0.0], "velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], "time_from_start": Duration(sec=0, nanosec=0), }, { "positions": [0.0, -1.57, 0.0, 0.0, -0.785, 0.0], "velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], "time_from_start": Duration(sec=5, nanosec=0), }, ], "traj2": [ { "positions": [0.0, -1.57, 0.0, 0.0, -0.785, 0.0], "velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], "time_from_start": Duration(sec=0, nanosec=0), }, { "positions": [0.0, -1.57, 0.0, 0.0, 0.0, 0.0], "velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], "time_from_start": Duration(sec=6, nanosec=0), }, ], # "traj0": [ # { # "positions": [-4.3, -3.0, 0.0, -1.57, 1.57, 0.67], # "velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], # "time_from_start": Duration(sec=15, nanosec=0), # }, # { # "positions": [0.0, -1.57, 0.0, 0.0, 0.0, 0.0], # "velocities": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0], # "time_from_start": Duration(sec=30, nanosec=0), # }, # ], } class JTCClient(Node): """Small test client for the jtc.""" def __init__(self, name): super().__init__(name) self.declare_parameter("controller_name", "scaled_joint_trajectory_controller") self.declare_parameter( "joints", [ "shoulder_pan_joint", "shoulder_lift_joint", "elbow_joint", "wrist_1_joint", "wrist_2_joint", "wrist_3_joint", ], ) controller_name = self.get_parameter("controller_name").value + "/follow_joint_trajectory" self.joints = self.get_parameter("joints").value if self.joints is None or len(self.joints) == 0: raise Exception('"joints" parameter is required') self._action_client = ActionClient(self, FollowJointTrajectory, controller_name) self.get_logger().info(f"Waiting for action server on {controller_name}") self._action_client.wait_for_server() self.parse_trajectories() self.i = 0 self._send_goal_future = None self._get_result_future = None # wait fot the previous loop succdessful. # self.rclpy.sleep(2) # this will execute immediately after you call this node - turn it off for now so that it will execute it while camera detect sth. #----------------------------------------------------------------------------------------------- # self.execute_next_trajectory() def parse_trajectories(self): self.goals = {} for traj_name in TRAJECTORIES: goal = JointTrajectory() goal.joint_names = self.joints for pt in TRAJECTORIES[traj_name]: point = JointTrajectoryPoint() point.positions = pt["positions"] point.velocities = pt["velocities"] point.time_from_start = pt["time_from_start"] goal.points.append(point) self.goals[traj_name] = goal def execute_next_trajectory(self): if self.i >= len(self.goals): self.get_logger().info("Done with all trajectories") raise SystemExit traj_name = list(self.goals)[self.i] self.i = self.i + 1 if traj_name: self.execute_trajectory(traj_name) def execute_trajectory(self, traj_name): self.get_logger().info(f"Executing trajectory {traj_name}") goal = FollowJointTrajectory.Goal() goal.trajectory = self.goals[traj_name] # ───── send goal ───────────────────────────────────────── self._send_goal_future = self._action_client.send_goal_async(goal) # === create future for returning ==================== result_future = rclpy.task.Future() def _on_goal_response(send_fut): """after receiving GoalHandle, wait for real result""" goal_handle = send_fut.result() # if canceled,put the exception into result_future if not goal_handle.accepted: result_future.set_exception(RuntimeError("Goal rejected :(")) return # let old result callback still working(if wanna keep it for log) self.goal_response_callback(send_fut) # wait for all the trajectory movement ends. get_result_fut = goal_handle.get_result_async() # give me the inner future to the one for returning def _on_result(res_fut): result_future.set_result(res_fut.result()) get_result_fut.add_done_callback(_on_result) # after giving goal wait for goal to response self._send_goal_future.add_done_callback(_on_goal_response) # -------★ return this future ★-------------------------- return result_future def goal_response_callback(self, future): goal_handle = future.result() if not goal_handle.accepted: self.get_logger().error("Goal rejected :(") raise RuntimeError("Goal rejected :(") self.get_logger().debug("Goal accepted :)") self._get_result_future = goal_handle.get_result_async() self._get_result_future.add_done_callback(self.get_result_callback) def get_result_callback(self, future): result = future.result().result status = future.result().status self.get_logger().info(f"Done with result: {self.status_to_str(status)}") # if this don't comment - it will automatically run the next movement until no traj anymore. #----------------------------------------------------------------------------------------------- # if status == GoalStatus.STATUS_SUCCEEDED: # # time.sleep(2) # self.execute_next_trajectory() # else: # if result.error_code != FollowJointTrajectory.Result.SUCCESSFUL: # self.get_logger().error( # f"Done with result: {self.error_code_to_str(result.error_code)}" # ) # raise RuntimeError("Executing trajectory failed. " + result.error_string) @staticmethod def error_code_to_str(error_code): if error_code == FollowJointTrajectory.Result.SUCCESSFUL: return "SUCCESSFUL" if error_code == FollowJointTrajectory.Result.INVALID_GOAL: return "INVALID_GOAL" if error_code == FollowJointTrajectory.Result.INVALID_JOINTS: return "INVALID_JOINTS" if error_code == FollowJointTrajectory.Result.OLD_HEADER_TIMESTAMP: return "OLD_HEADER_TIMESTAMP" if error_code == FollowJointTrajectory.Result.PATH_TOLERANCE_VIOLATED: return "PATH_TOLERANCE_VIOLATED" if error_code == FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED: return "GOAL_TOLERANCE_VIOLATED" @staticmethod def status_to_str(error_code): if error_code == GoalStatus.STATUS_UNKNOWN: return "UNKNOWN" if error_code == GoalStatus.STATUS_ACCEPTED: return "ACCEPTED" if error_code == GoalStatus.STATUS_EXECUTING: return "EXECUTING" if error_code == GoalStatus.STATUS_CANCELING: return "CANCELING" if error_code == GoalStatus.STATUS_SUCCEEDED: return "SUCCEEDED" if error_code == GoalStatus.STATUS_CANCELED: return "CANCELED" if error_code == GoalStatus.STATUS_ABORTED: return "ABORTED" def main(args=None): rclpy.init(args=args) jtc_client = JTCClient('ur3e_actions') try: rclpy.spin(jtc_client) except RuntimeError as err: jtc_client.get_logger().error(str(err)) except SystemExit: rclpy.logging.get_logger("jtc_client").info("Done") jtc_client.destroy_node() rclpy.shutdown() if __name__ == "__main__": main()