100 lines
4.0 KiB
Python
Executable File
100 lines
4.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
|
||
import rclpy
|
||
from rclpy.node import Node
|
||
from builtin_interfaces.msg import Duration
|
||
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
|
||
|
||
|
||
class UnifiedTrajectoryPublisher(Node):
|
||
def __init__(self, name):
|
||
super().__init__(name)
|
||
|
||
self.controller_name = "scaled_joint_trajectory_controller" # 或 "joint_trajectory_controller" could also exchange to oother controller
|
||
self.wait_sec_between_publish = 6
|
||
self.goal_name = ["pos1", "pos2"] # can also add pos3, 4 and so on
|
||
self.joints = [
|
||
"shoulder_pan_joint",
|
||
"shoulder_lift_joint",
|
||
"elbow_joint",
|
||
"wrist_1_joint",
|
||
"wrist_2_joint",
|
||
"wrist_3_joint"
|
||
]
|
||
self.check_starting_point = False # if we didn't check the starting poin,set it to False
|
||
|
||
# if we need to chekc the starting point limit, can set it here -> but not really need here (the setting here is 0, 90, 0, 90, 0, 0) which is the HOME position
|
||
self.starting_point_limits = {
|
||
"shoulder_pan_joint": [-0.1, 0.1], # this interval is -0.1 <= x <= 0.1 joint is able to work within this interval
|
||
"shoulder_lift_joint": [-1.6, -1.5],
|
||
"elbow_joint": [-0.1, 0.1],
|
||
"wrist_1_joint": [-1.6, -1.5],
|
||
"wrist_2_joint": [-0.1, 0.1],
|
||
"wrist_3_joint": [-0.1, 0.1],
|
||
}
|
||
|
||
# 宏義各個目標
|
||
self.declare_parameter("Joints1", [-1.57, -2.07, 0.0, -1.04, 0.0, 0.0])
|
||
self.declare_parameter("Joints2", [-1.57, -1.04, 0.0, -2.35, 0.0, 0.0])
|
||
|
||
|
||
self.goals_dict = {
|
||
"pos1": {"positions": self.get_parameter("Joints1").value},
|
||
"pos2": {"positions": self.get_parameter("Joints2").value},
|
||
# "pos3": {"positions": [0.0, -1.57, 0.0, 0.0, -0.785, 0.0]},
|
||
# "pos4": {"positions": [0.0, -1.57, 0.0, 0.0, 0.0, 0.0]},
|
||
}
|
||
|
||
# read the target and create JointTrajectoryPoint
|
||
self.goals = []
|
||
for goal in self.goal_name:
|
||
goal_data = self.goals_dict.get(goal, {})
|
||
positions = goal_data.get("positions", [])
|
||
if len(positions) == len(self.joints): # length of position should == how many joints (each joints should have one position)
|
||
|
||
point = JointTrajectoryPoint()
|
||
point.positions = positions
|
||
point.time_from_start = Duration(sec=self.wait_sec_between_publish, nanosec=0)
|
||
self.goals.append(point)
|
||
|
||
self.get_logger().info(f"Loaded goal {goal}: {positions}")
|
||
else:
|
||
|
||
self.get_logger().warn(f"Goal {goal} positions not defined properly.")
|
||
|
||
if len(self.goals) == 0:
|
||
self.get_logger().error("No valid goal found. Exiting...")
|
||
exit(1)
|
||
|
||
# set Publisher,chosse the correct topic that is going to publish
|
||
publish_topic = "/" + self.controller_name + "/" + "joint_trajectory"
|
||
self.get_logger().info(f"Publishing {len(self.goals)} goals on topic '{publish_topic}' every {self.wait_sec_between_publish} second")
|
||
self.publisher_ = self.create_publisher(JointTrajectory, publish_topic, 10)
|
||
self.timer = self.create_timer(self.wait_sec_between_publish, self.timer_callback)
|
||
self.current_goal_index = 0
|
||
|
||
def timer_callback(self):
|
||
traj = JointTrajectory()
|
||
traj.joint_names = self.joints
|
||
traj.points.append(self.goals[self.current_goal_index])
|
||
self.publisher_.publish(traj)
|
||
self.get_logger().info(f"Published goal: {self.goals[self.current_goal_index].positions}")
|
||
self.current_goal_index = (self.current_goal_index + 1) % len(self.goals)
|
||
|
||
def main(args=None):
|
||
|
||
rclpy.init(args=args)
|
||
node = UnifiedTrajectoryPublisher("ur3e_topic")
|
||
|
||
try:
|
||
rclpy.spin(node)
|
||
except KeyboardInterrupt:
|
||
node.get_logger().info("shutting down...")
|
||
|
||
node.destroy_node()
|
||
rclpy.shutdown()
|
||
|
||
if __name__ == "__main__":
|
||
|
||
main()
|