Skip to content

Commit

Permalink
Remove camera test from sim
Browse files Browse the repository at this point in the history
  • Loading branch information
rafal-gorecki committed Dec 10, 2024
1 parent d48291f commit 3cc31b1
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
5 changes: 2 additions & 3 deletions rosbot_bringup/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ def wait_for_all_events(events, timeout):
start_time = time.time()
while time.time() - start_time < timeout:
if all(event.is_set() for event in events):
return True, [] # All events have been set
time.sleep(0.1) # Short interval between checks
return True, []
time.sleep(0.1)

# Identify which events were not set
not_set_events = [i for i, event in enumerate(events) if not event.is_set()]
return False, not_set_events

Expand Down
53 changes: 41 additions & 12 deletions rosbot_gazebo/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from threading import Event

import rclpy
Expand Down Expand Up @@ -261,21 +262,49 @@ def yaw_speed_test(node: SimulationTestNode, v_x=0.0, v_y=0.0, v_yaw=0.0, robot_
), f"{robot_name} does not rotate properly. Check ekf_filter_node! Twist: {node.ekf_twist}"


def sensors_readings_test(node: SimulationTestNode, robot_name="ROSbot"):
flag = node.scan_event.wait(timeout=20.0)
assert flag, f"{robot_name}'s lidar does not work properly!"
def wait_for_all_events(events, timeout):
start_time = time.time()
while time.time() - start_time < timeout:
if all(event.is_set() for event in events):
return True, []
time.sleep(0.1)

not_set_events = [i for i, event in enumerate(events) if not event.is_set()]
return False, not_set_events

for i in range(len(node.RANGE_SENSORS_TOPICS)):
flag = node.ranges_events[i].wait(timeout=20.0)
assert (
flag
), f"{robot_name}'s range sensor {node.RANGE_SENSORS_TOPICS[i]} does not work properly!"

flag = node.camera_color_event.wait(timeout=20.0)
assert flag, f"{robot_name}'s camera color image does not work properly!"
def sensors_readings_test(node: SimulationTestNode, robot_name="ROSbot"):
events = [
node.scan_event,
node.ranges_events[0],
node.ranges_events[1],
node.ranges_events[2],
node.ranges_events[3],
# FIXME: Camera topics are not available in the current simulation
# node.camera_color_event,
# node.camera_points_event,
]

event_names = [
"Scan",
"Range FL",
"Range FR",
"Range RL",
"Range RR",
"Camera Color",
"Camera Points",
]

msgs_received_flag, not_set_indices = wait_for_all_events(events, timeout=20.0)

if not msgs_received_flag:
not_set_event_names = [event_names[i] for i in not_set_indices]
missing_events = ", ".join(not_set_event_names)
raise AssertionError(
f"{robot_name}: Not all expected messages were received. Missing: {missing_events}."
)

flag = node.camera_points_event.wait(timeout=20.0)
assert flag, f"{robot_name}'s camera point cloud does not work properly!"
print(f"{robot_name}: All messages received successfully.")


def diff_test(node: SimulationTestNode, robot_name="ROSbot"):
Expand Down

0 comments on commit 3cc31b1

Please sign in to comment.