2025-05-28 00:25:11 +08:00

181 lines
6.8 KiB
Python

#!/usr/bin/env python3
# Copyright 2024, Universal Robots A/S
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# * Neither the name of the {copyright_holder} nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import rclpy
from builtin_interfaces.msg import Duration
from control_msgs.action import FollowJointTrajectory
from rclpy.action import ActionClient
from rclpy.node import Node
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from ur_msgs.srv import SetIO
from controller_manager_msgs.srv import SwitchController
from std_srvs.srv import Trigger
TIMEOUT_WAIT_SERVICE = 10
TIMEOUT_WAIT_SERVICE_INITIAL = 60
TIMEOUT_WAIT_ACTION = 10
ROBOT_JOINTS = [
"shoulder_pan_joint",
"shoulder_lift_joint",
"elbow_joint",
"wrist_1_joint",
"wrist_2_joint",
"wrist_3_joint",
]
# Helper functions
def waitForService(node, srv_name, srv_type, timeout=TIMEOUT_WAIT_SERVICE):
client = node.create_client(srv_type, srv_name)
if client.wait_for_service(timeout) is False:
raise Exception(f"Could not reach service '{srv_name}' within timeout of {timeout}")
node.get_logger().info(f"Successfully connected to service '{srv_name}'")
return client
def waitForAction(node, action_name, action_type, timeout=TIMEOUT_WAIT_ACTION):
client = ActionClient(node, action_type, action_name)
if client.wait_for_server(timeout) is False:
raise Exception(
f"Could not reach action server '{action_name}' within timeout of {timeout}"
)
node.get_logger().info(f"Successfully connected to action '{action_name}'")
return client
class Robot:
def __init__(self, node):
self.node = node
self.service_interfaces = {
"/io_and_status_controller/set_io": SetIO,
"/dashboard_client/play": Trigger,
"/controller_manager/switch_controller": SwitchController,
}
self.init_robot()
def init_robot(self):
self.service_clients = {
srv_name: waitForService(self.node, srv_name, srv_type)
for (srv_name, srv_type) in self.service_interfaces.items()
}
self.jtc_action_client = waitForAction(
self.node,
"/scaled_joint_trajectory_controller/follow_joint_trajectory",
FollowJointTrajectory,
)
self.passthrough_trajectory_action_client = waitForAction(
self.node,
"/passthrough_trajectory_controller/follow_joint_trajectory",
FollowJointTrajectory,
)
def set_io(self, pin, value):
"""Test to set an IO."""
set_io_req = SetIO.Request()
set_io_req.fun = 1
set_io_req.pin = pin
set_io_req.state = value
self.call_service("/io_and_status_controller/set_io", set_io_req)
def send_trajectory(self, waypts, time_vec, action_client):
"""Send robot trajectory."""
if len(waypts) != len(time_vec):
raise Exception("waypoints vector and time vec should be same length")
# Construct test trajectory
joint_trajectory = JointTrajectory()
joint_trajectory.joint_names = ROBOT_JOINTS
for i in range(len(waypts)):
point = JointTrajectoryPoint()
point.positions = waypts[i]
point.time_from_start = time_vec[i]
joint_trajectory.points.append(point)
# Sending trajectory goal
goal_response = self.call_action(
action_client, FollowJointTrajectory.Goal(trajectory=joint_trajectory)
)
if not goal_response.accepted:
raise Exception("trajectory was not accepted")
# Verify execution
result = self.get_result(action_client, goal_response)
return result.error_code == FollowJointTrajectory.Result.SUCCESSFUL
def call_service(self, srv_name, request):
future = self.service_clients[srv_name].call_async(request)
rclpy.spin_until_future_complete(self.node, future)
if future.result() is not None:
return future.result()
else:
raise Exception(f"Exception while calling service: {future.exception()}")
def call_action(self, ac_client, g):
future = ac_client.send_goal_async(g)
rclpy.spin_until_future_complete(self.node, future)
if future.result() is not None:
return future.result()
else:
raise Exception(f"Exception while calling action: {future.exception()}")
def get_result(self, ac_client, goal_response):
future_res = ac_client._get_result_async(goal_response)
rclpy.spin_until_future_complete(self.node, future_res)
if future_res.result() is not None:
return future_res.result().result
else:
raise Exception(f"Exception while calling action: {future_res.exception()}")
if __name__ == "__main__":
rclpy.init()
node = Node("robot_driver_test")
robot = Robot(node)
# The following list are arbitrary joint positions, change according to your own needs
waypts = [
[-1.6006, -1.7272, -2.2030, -0.8079, 1.5951, -0.0311],
[-1.2, -1.4, -1.9, -1.2, 1.5951, -0.0311],
[-1.6006, -1.7272, -2.2030, -0.8079, 1.5951, -0.0311],
]
time_vec = [Duration(sec=4, nanosec=0), Duration(sec=8, nanosec=0), Duration(sec=12, nanosec=0)]
# Execute trajectory on robot, make sure that the robot is booted and the control script is running
robot.send_trajectory(waypts, time_vec)
# Set digital output 1 to true
robot.set_io(1, 1.0)