142 lines
5.0 KiB
Python
Executable File
142 lines
5.0 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# Copyright 2023, FZI Forschungszentrum Informatik
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# * Redistributions of source code must retain the above copyright
|
|
# notice, this list of conditions and the following disclaimer.
|
|
#
|
|
# * Redistributions in binary form must reproduce the above copyright
|
|
# notice, this list of conditions and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution.
|
|
#
|
|
# * Neither the name of the {copyright_holder} nor the names of its
|
|
# contributors may be used to endorse or promote products derived from
|
|
# this software without specific prior written permission.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
import os
|
|
import sys
|
|
import time
|
|
import unittest
|
|
|
|
import pytest
|
|
import rclpy
|
|
import rclpy.node
|
|
from std_msgs.msg import String as StringMsg
|
|
from ur_msgs.msg import IOStates
|
|
|
|
sys.path.append(os.path.dirname(__file__))
|
|
from test_common import ( # noqa: E402
|
|
ControllerManagerInterface,
|
|
DashboardInterface,
|
|
IoStatusInterface,
|
|
generate_driver_test_description,
|
|
)
|
|
|
|
ROBOT_IP = "192.168.56.101"
|
|
|
|
|
|
@pytest.mark.launch_test
|
|
def generate_test_description():
|
|
return generate_driver_test_description()
|
|
|
|
|
|
class URScriptInterfaceTest(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
# Initialize the ROS context
|
|
rclpy.init()
|
|
cls.node = rclpy.node.Node("urscript_interface_test")
|
|
cls.init_robot(cls)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
# Shutdown the ROS context
|
|
cls.node.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
def init_robot(self):
|
|
self._dashboard_interface = DashboardInterface(self.node)
|
|
self._controller_manager_interface = ControllerManagerInterface(self.node)
|
|
self._io_status_controller_interface = IoStatusInterface(self.node)
|
|
|
|
self.urscript_pub = self.node.create_publisher(
|
|
StringMsg, "/urscript_interface/script_command", 1
|
|
)
|
|
|
|
def setUp(self):
|
|
self._dashboard_interface.start_robot()
|
|
time.sleep(1)
|
|
self.assertTrue(self._io_status_controller_interface.resend_robot_program().success)
|
|
|
|
self._controller_manager_interface.wait_for_controller("io_and_status_controller")
|
|
|
|
def test_set_io(self):
|
|
"""Test setting an IO using a direct program call."""
|
|
self.io_states_sub = self.node.create_subscription(
|
|
IOStates,
|
|
"/io_and_status_controller/io_states",
|
|
self.io_msg_cb,
|
|
rclpy.qos.qos_profile_system_default,
|
|
)
|
|
|
|
self.set_digout_checked(0, True)
|
|
time.sleep(1)
|
|
self.set_digout_checked(0, False)
|
|
|
|
self.io_msg = None
|
|
self.io_states_sub = self.node.create_subscription(
|
|
IOStates,
|
|
"/io_and_status_controller/io_states",
|
|
self.io_msg_cb,
|
|
rclpy.qos.qos_profile_system_default,
|
|
)
|
|
|
|
script_msg = StringMsg(
|
|
data="sec my_program():\n set_digital_out(0, False)\n set_digital_out(1,True)\nend"
|
|
)
|
|
self.urscript_pub.publish(script_msg)
|
|
self.check_pin_states([0, 1], [False, True])
|
|
|
|
time.sleep(1)
|
|
|
|
script_msg = StringMsg(
|
|
data="sec my_program():\n set_digital_out(0, True)\n set_digital_out(1,False)\nend"
|
|
)
|
|
self.urscript_pub.publish(script_msg)
|
|
self.check_pin_states([0, 1], [True, False])
|
|
|
|
def io_msg_cb(self, msg):
|
|
self.io_msg = msg
|
|
|
|
def set_digout_checked(self, pin, state):
|
|
self.io_msg = None
|
|
|
|
script_msg = StringMsg(data=f"set_digital_out({pin}, {state})")
|
|
self.urscript_pub.publish(script_msg)
|
|
|
|
self.check_pin_states([pin], [state])
|
|
|
|
def check_pin_states(self, pins, states):
|
|
pin_states = [not x for x in states]
|
|
end_time = time.time() + 50
|
|
while pin_states != states and time.time() < end_time:
|
|
rclpy.spin_once(self.node, timeout_sec=0.1)
|
|
if self.io_msg is not None:
|
|
for i, pin_id in enumerate(pins):
|
|
pin_states[i] = self.io_msg.digital_out_states[pin_id].state
|
|
self.assertIsNotNone(self.io_msg, "Did not receive an IO state in requested time.")
|
|
self.assertEqual(pin_states, states)
|