diff --git a/examples/run_in_empty_track.py b/examples/run_in_empty_track.py new file mode 100644 index 00000000..3c924e3e --- /dev/null +++ b/examples/run_in_empty_track.py @@ -0,0 +1,59 @@ +import numpy as np + +from waypoint_follow import PurePursuitPlanner +from f110_gym.envs.track import Track +import gymnasium as gym + + +def main(): + """ + Demonstrate the creation of an empty map with a custom reference line. + This is useful for testing and debugging control algorithms on standard maneuvers. + """ + # create sinusoidal reference line with custom velocity profile + xs = np.linspace(0, 100, 200) + ys = np.sin(xs / 2.0) * 5.0 + velxs = 4.0 * (1 + (np.abs(np.cos(xs / 2.0)))) + + # create track from custom reference line + track = Track.from_refline(x=xs, y=ys, velx=velxs) + + # env and planner + env = gym.make( + "f110_gym:f110-v0", + config={ + "map": track, + "num_agents": 1, + "observation_config": {"type": "kinematic_state"}, + }, + render_mode="human", + ) + planner = PurePursuitPlanner(track=track, wb=0.17145 + 0.15875) + + # rendering callbacks + env.add_render_callback(track.raceline.render_waypoints) + env.add_render_callback(planner.render_lookahead_point) + + # simulation + obs, info = env.reset() + done = False + env.render() + + while not done: + speed, steer = planner.plan( + obs["agent_0"]["pose_x"], + obs["agent_0"]["pose_y"], + obs["agent_0"]["pose_theta"], + lookahead_distance=0.8, + vgain=1.0, + ) + action = np.array([[steer, speed]]) + obs, timestep, terminated, truncated, infos = env.step(action) + done = terminated or truncated + env.render() + + env.close() + + +if __name__ == "__main__": + main() diff --git a/examples/waypoint_follow.py b/examples/waypoint_follow.py index d37add9a..0056cb47 100644 --- a/examples/waypoint_follow.py +++ b/examples/waypoint_follow.py @@ -196,13 +196,6 @@ def load_waypoints(self, conf): conf.wpt_path, delimiter=conf.wpt_delim, skiprows=conf.wpt_rowskip ).astype(np.float32) - def render_waypoints(self, e): - """ - Callback to render waypoints. - """ - points = self.waypoints[:, :2] - e.render_closed_lines(points, color=(128, 0, 0), size=1) - def render_lookahead_point(self, e): """ Callback to render the lookahead point. @@ -310,7 +303,7 @@ def main(): "model": "st", "observation_config": {"type": "kinematic_state"}, "params": {"mu": 1.0}, - "reset_config": {"type": "random_static"}, + "reset_config": {"type": "rl_random_static"}, }, render_mode="human", ) @@ -318,20 +311,11 @@ def main(): planner = PurePursuitPlanner(track=track, wb=0.17145 + 0.15875) - poses = np.array( - [ - [ - track.raceline.xs[0], - track.raceline.ys[0], - track.raceline.yaws[0], - ] - ] - ) - env.unwrapped.add_render_callback(planner.render_waypoints) + env.unwrapped.add_render_callback(track.raceline.render_waypoints) env.unwrapped.add_render_callback(planner.render_local_plan) env.unwrapped.add_render_callback(planner.render_lookahead_point) - obs, info = env.reset(options={"poses": poses}) + obs, info = env.reset() done = False env.render() diff --git a/gym/f110_gym/envs/base_classes.py b/gym/f110_gym/envs/base_classes.py index c9754d50..b1e28483 100644 --- a/gym/f110_gym/envs/base_classes.py +++ b/gym/f110_gym/envs/base_classes.py @@ -33,6 +33,7 @@ from f110_gym.envs.collision_models import collision_multiple, get_vertices from f110_gym.envs.integrator import EulerIntegrator, IntegratorType from f110_gym.envs.laser_models import ScanSimulator2D, check_ttc_jit, ray_cast +from f110_gym.envs.track import Track class RaceCar(object): @@ -177,14 +178,14 @@ def update_params(self, params): """ self.params = params - def set_map(self, map_name: str): + def set_map(self, map: str | Track): """ Sets the map for scan simulator Args: - map_name (str): name of the map + map (str | Track): name of the map, or Track object """ - RaceCar.scan_simulator.set_map(map_name) + RaceCar.scan_simulator.set_map(map) def reset(self, pose): """ @@ -430,18 +431,18 @@ def __init__( num_beams = self.agents[0].scan_simulator.num_beams self.agent_scans = np.empty((self.num_agents, num_beams)) - def set_map(self, map_name): + def set_map(self, map: str | Track): """ Sets the map of the environment and sets the map for scan simulator of each agent Args: - map_name (str): name of the map + map (str | Track): name of the map, or Track object Returns: None """ for agent in self.agents: - agent.set_map(map_name) + agent.set_map(map) def update_params(self, params, agent_idx=-1): """ diff --git a/gym/f110_gym/envs/f110_env.py b/gym/f110_gym/envs/f110_env.py index 5cb1ad41..ba3458f9 100644 --- a/gym/f110_gym/envs/f110_env.py +++ b/gym/f110_gym/envs/f110_env.py @@ -95,7 +95,7 @@ def __init__(self, config: dict = None, render_mode=None, **kwargs): self.configure(config) self.seed = self.config["seed"] - self.map_name = self.config["map"] + self.map = self.config["map"] self.params = self.config["params"] self.num_agents = self.config["num_agents"] self.timestep = self.config["timestep"] @@ -143,10 +143,14 @@ def __init__(self, config: dict = None, render_mode=None, **kwargs): model=self.model, action_type=self.action_type, ) - self.sim.set_map(self.map_name) - self.track = Track.from_track_name( - self.map_name - ) # load track in gym env for convenience + self.sim.set_map(self.map) + + if isinstance(self.map, Track): + self.track = self.map + else: + self.track = Track.from_track_name( + self.map + ) # load track in gym env for convenience # observations self.agent_ids = [f"agent_{i}" for i in range(self.num_agents)] @@ -226,8 +230,8 @@ def default_config(cls) -> dict: "integrator": "rk4", "model": "st", "control_input": ["speed", "steering_angle"], - "observation_config": {"type": "original"}, - "reset_config": {"type": "grid_static"}, + "observation_config": {"type": None}, + "reset_config": {"type": None}, } def configure(self, config: dict) -> None: diff --git a/gym/f110_gym/envs/integrator.py b/gym/f110_gym/envs/integrator.py index f708aa82..bfc796be 100644 --- a/gym/f110_gym/envs/integrator.py +++ b/gym/f110_gym/envs/integrator.py @@ -1,4 +1,3 @@ -import warnings from abc import abstractmethod from enum import Enum diff --git a/gym/f110_gym/envs/laser_models.py b/gym/f110_gym/envs/laser_models.py index 52aa296a..d23a3bee 100644 --- a/gym/f110_gym/envs/laser_models.py +++ b/gym/f110_gym/envs/laser_models.py @@ -25,7 +25,7 @@ Prototype of Utility functions and classes for simulating 2D LIDAR scans Author: Hongrui Zheng """ - +from __future__ import annotations import unittest import numpy as np @@ -448,6 +448,9 @@ def __init__(self, num_beams, fov, eps=0.0001, theta_dis=2000, max_range=30.0): self.map_height = None self.map_width = None self.map_resolution = None + self.track = None + self.map_img = None + self.origin = None self.dt = None # precomputing corresponding cosines and sines of the angle array @@ -455,17 +458,20 @@ def __init__(self, num_beams, fov, eps=0.0001, theta_dis=2000, max_range=30.0): self.sines = np.sin(theta_arr) self.cosines = np.cos(theta_arr) - def set_map(self, map_name: str): + def set_map(self, map: str | Track): """ Set the bitmap of the scan simulator by path Args: - map_name (str): name of the racetrack in the map dir, e.g. "Levine" + map (str | Track): path to the map file, or Track object Returns: flag (bool): if image reading and loading is successful """ - self.track = Track.from_track_name(map_name) + if isinstance(map, str): + self.track = Track.from_track_name(map) + elif isinstance(map, Track): + self.track = map # load map image self.map_img = self.track.occupancy_map diff --git a/gym/f110_gym/envs/observation.py b/gym/f110_gym/envs/observation.py index 855a5838..263f8260 100644 --- a/gym/f110_gym/envs/observation.py +++ b/gym/f110_gym/envs/observation.py @@ -1,6 +1,4 @@ -""" -Author: Luigi Berducci -""" +from __future__ import annotations from abc import abstractmethod from typing import List @@ -265,7 +263,9 @@ def observe(self): return obs -def observation_factory(env, type: str, **kwargs) -> Observation: +def observation_factory(env, type: str | None, **kwargs) -> Observation: + type = type or "original" + if type == "original": return OriginalObservation(env) elif type == "features": diff --git a/gym/f110_gym/envs/rendering/__init__.py b/gym/f110_gym/envs/rendering/__init__.py index cbc3ba3b..d66d2a25 100644 --- a/gym/f110_gym/envs/rendering/__init__.py +++ b/gym/f110_gym/envs/rendering/__init__.py @@ -1,6 +1,5 @@ -from __future__ import annotations import pathlib -from typing import List, Tuple, Any +from typing import Any, Optional from f110_gym.envs.rendering.renderer import RenderSpec, EnvRenderer from f110_gym.envs.track import Track @@ -10,9 +9,25 @@ def make_renderer( params: dict[str, Any], track: Track, agent_ids: list[str], - render_mode: str = None, - render_fps: int = 100, -) -> Tuple[EnvRenderer, RenderSpec]: + render_mode: Optional[str] = None, + render_fps: Optional[int] = 100, +) -> tuple[EnvRenderer, RenderSpec]: + """ + Return an instance of the renderer and the rendering specification. + + Parameters + ---------- + params : dict + dictionary of renderer parameters + track : Track + track object + agent_ids : list + list of agent ids to render + render_mode : str, optional + rendering mode, by default None + render_fps : int, optional + rendering frames per second, by default 100 + """ from f110_gym.envs.rendering.rendering_pygame import PygameEnvRenderer cfg_file = pathlib.Path(__file__).parent.absolute() / "rendering.yaml" diff --git a/gym/f110_gym/envs/rendering/objects.py b/gym/f110_gym/envs/rendering/objects.py index 002a1c71..26ad3d0a 100644 --- a/gym/f110_gym/envs/rendering/objects.py +++ b/gym/f110_gym/envs/rendering/objects.py @@ -8,13 +8,40 @@ class TextObject: + """ + Class to display text on the screen at a given position. + + Attributes + ---------- + font : pygame.font.Font + font object + position : str | tuple + position of the text on the screen + text : pygame.Surface + text surface to be displayed + """ + def __init__( self, - window_shape: tuple[int, int] = (1000, 1000), + window_shape: tuple[int, int], + position: str | tuple, relative_font_size: int = 32, font_name: str = "Arial", - position: str | tuple = "bottom_right", - ): + ) -> None: + """ + Initialize text object. + + Parameters + ---------- + window_shape : tuple + shape of the window (width, height) in pixels + position : str | tuple + position of the text on the screen + relative_font_size : int, optional + font size relative to the window shape, by default 32 + font_name : str, optional + font name, by default "Arial" + """ font_size = int(relative_font_size * window_shape[0] / 1000) self.font = pygame.font.SysFont(font_name, font_size) self.position = position @@ -22,16 +49,34 @@ def __init__( self.text = self.font.render("", True, (125, 125, 125)) def _position_resolver( - self, position: str | tuple, display: pygame.Surface + self, position: str | tuple[int, int], display: pygame.Surface ) -> tuple[int, int]: """ This function takes strings like "bottom center" and converts them into a location for the text to be displayed. - if position is tuple, then passthrough. + If position is tuple, then passthrough. + + Parameters + ---------- + position : str | tuple + position of the text on the screen + display : pygame.Surface + display surface + + Returns + ------- + tuple + position of the text on the screen + + Raises + ------ + ValueError + if position is not a tuple or a string + NotImplementedError + if position is a string but not implemented """ - if isinstance(position, tuple): - return position - - if isinstance(position, str): + if isinstance(position, tuple) and len(position) == 2: + return int(position[0]), int(position[1]) + elif isinstance(position, str): position = position.lower() if position == "bottom_right": display_width, display_height = ( @@ -56,7 +101,7 @@ def _position_resolver( ) text_width, text_height = self.text.get_width(), self.text.get_height() bottom_center = ( - (display_width - text_width) / 2, + (display_width - text_width) // 2, display_height - text_height, ) return bottom_center @@ -71,12 +116,26 @@ def _position_resolver( elif position == "top_center": display_width = display.get_width() text_width = self.text.get_width() - top_center = ((display_width - text_width) / 2, 0) + top_center = ((display_width - text_width) // 2, 0) return top_center else: raise NotImplementedError(f"Position {position} not implemented.") + else: + raise ValueError( + f"Position expected to be a tuple[int, int] or a string. Got {position}." + ) - def render(self, text: str, display: pygame.Surface): + def render(self, text: str, display: pygame.Surface) -> None: + """ + Render text on the screen. + + Parameters + ---------- + text : str + text to be displayed + display : pygame.Surface + display surface + """ self.text = self.font.render(text, True, (125, 125, 125)) position_tuple = self._position_resolver(self.position, display) display.blit(self.text, position_tuple) diff --git a/gym/f110_gym/envs/rendering/renderer.py b/gym/f110_gym/envs/rendering/renderer.py index d20a1b93..98305c82 100644 --- a/gym/f110_gym/envs/rendering/renderer.py +++ b/gym/f110_gym/envs/rendering/renderer.py @@ -1,9 +1,10 @@ from __future__ import annotations import pathlib -from abc import abstractmethod +from abc import abstractmethod, ABC from dataclasses import dataclass -from typing import Union, Any +from typing import Optional, Any +import numpy as np import yaml @@ -14,19 +15,39 @@ class RenderSpec: focus_on: str car_tickness: int show_wheels: bool - show_info: bool = True - vehicle_palette: list[str] = None + show_info: Optional[bool] = True + vehicle_palette: Optional[list[str]] = None def __init__( self, window_size: int = 800, focus_on: str = None, - zoom_in_factor: float = 1.5, + zoom_in_factor: float = 1.0, car_tickness: int = 1, show_wheels: bool = False, show_info: bool = True, vehicle_palette: list[str] = None, - ): + ) -> None: + """ + Initialize rendering specification. + + Parameters + ---------- + window_size : int, optional + size of the square window, by default 800 + focus_on : str, optional + focus on a specific vehicle, by default None + zoom_in_factor : float, optional + zoom in factor, by default 1.0 (no zoom) + car_tickness : int, optional + thickness of the car in pixels, by default 1 + show_wheels : bool, optional + toggle rendering of line segments for wheels, by default False + show_info : bool, optional + toggle rendering of text instructions, by default True + vehicle_palette : list, optional + list of colors for rendering vehicles according to their id, by default None + """ self.window_size = window_size self.focus_on = focus_on self.zoom_in_factor = zoom_in_factor @@ -36,7 +57,20 @@ def __init__( self.vehicle_palette = vehicle_palette or ["#984ea3"] @staticmethod - def from_yaml(yaml_file: Union[str, pathlib.Path]): + def from_yaml(yaml_file: str | pathlib.Path): + """ + Load rendering specification from a yaml file. + + Parameters + ---------- + yaml_file : str | pathlib.Path + path to the yaml file + + Returns + ------- + RenderSpec + rendering specification object + """ with open(yaml_file, "r") as yaml_stream: try: config = yaml.safe_load(yaml_stream) @@ -45,35 +79,70 @@ def from_yaml(yaml_file: Union[str, pathlib.Path]): return RenderSpec(**config) -class EnvRenderer: - render_callbacks = [] +class EnvRenderer(ABC): + """ + Abstract class for rendering the environment. + """ @abstractmethod - def update(self, state): + def update(self, state: Any) -> None: """ Update the state to be rendered. This is called at every rendering call. + + Parameters + ---------- + state : Any + state to be rendered, e.g. a list of vehicle states """ raise NotImplementedError() - def add_renderer_callback(self, callback_fn: callable): + @abstractmethod + def render(self): """ - Add a callback function to be called at every rendering call. - This is called at the end of `update`. + Render the current state in a frame. """ - self.render_callbacks.append(callback_fn) + raise NotImplementedError() @abstractmethod - def render_map(self): + def render_lines( + self, + points: list | np.ndarray, + color: Optional[tuple[int, int, int]] = (0, 0, 255), + size: Optional[int] = 1, + ): """ - Render the current state in a frame. + Render a sequence of lines segments. + + Parameters + ---------- + points : list | np.ndarray + list of points to render + color : tuple[int, int, int], optional + color as rgb tuple, by default blue (0, 0, 255) + size : int, optional + size of the line, by default 1 """ raise NotImplementedError() @abstractmethod - def render(self): + def render_closed_lines( + self, + points: list | np.ndarray, + color: Optional[tuple[int, int, int]] = (0, 0, 255), + size: Optional[int] = 1, + ): """ - Render the current state in a frame. + Render a closed loop of lines (draw a line between the last and the first point). + + Parameters + ---------- + points : list | np.ndarray + list of points to render + color : tuple[int, int, int], optional + color as rgb tuple, by default blue (0, 0, 255) + size : int, optional + size of the line, by default 1 """ raise NotImplementedError() diff --git a/gym/f110_gym/envs/rendering/rendering_pygame.py b/gym/f110_gym/envs/rendering/rendering_pygame.py index eb0ccb85..e1b57777 100644 --- a/gym/f110_gym/envs/rendering/rendering_pygame.py +++ b/gym/f110_gym/envs/rendering/rendering_pygame.py @@ -1,15 +1,12 @@ from __future__ import annotations - import logging import math -import pathlib -from typing import Union, List, Tuple, Any +from typing import Any, Callable, Optional import cv2 import numpy as np import pygame -import yaml -from PIL import Image, ImageColor +from PIL import ImageColor from f110_gym.envs.rendering.objects import ( Map, @@ -24,6 +21,10 @@ class PygameEnvRenderer(EnvRenderer): + """ + Renderer of the environment using Pygame. + """ + def __init__( self, params: dict[str, Any], @@ -33,11 +34,30 @@ def __init__( render_mode: str, render_fps: int, ): + """ + Initialize the Pygame renderer. + + Parameters + ---------- + params : dict + dictionary of simulation parameters (including vehicle dimensions, etc.) + track : Track + track object + agent_ids : list + list of agent ids to render + render_spec : RenderSpec + rendering specification + render_mode : str + rendering mode in ["human", "human_fast", "rgb_array"] + render_fps : int + number of frames per second + """ super().__init__() - self.params = params # simulation params - self.agent_ids = agent_ids # list of agent ids + self.params = params + self.agent_ids = agent_ids self.cars = None + self.sim_time = None self.window = None self.canvas = None @@ -63,15 +83,11 @@ def __init__( self.window.fill((255, 255, 255)) # white background self.canvas = pygame.Surface((width, height)) + self.map_canvas = None - # load map metadata - map_filepath = pathlib.Path(track.filepath) - map_yaml = map_filepath.with_suffix(".yaml") - with open(map_yaml, "r") as yaml_stream: - try: - self.map_metadata = yaml.safe_load(yaml_stream) - except yaml.YAMLError as ex: - print(ex) + # map metadata + self.map_origin = track.spec.origin + self.map_resolution = track.spec.resolution # fps and time renderer self.clock = pygame.time.Clock() @@ -89,10 +105,7 @@ def __init__( ) # load map image - original_img = map_filepath.parent / self.map_metadata["image"] - original_img = np.array( - Image.open(original_img).transpose(Image.FLIP_TOP_BOTTOM) - ).astype(np.float64) + original_img = track.occupancy_map self.map_renderers = { "map": Map(map_img=original_img, zoom_level=0.4), @@ -121,14 +134,14 @@ def __init__( self.follow_agent_flag: bool = False self.agent_to_follow: int = None - def update(self, state): + def update(self, state: dict) -> None: """ Update the simulation state to be rendered. - Args: + Parameters + ---------- state: simulation state as dictionary """ - # initialize cars if self.cars is None: self.cars = [ Car( @@ -136,8 +149,8 @@ def update(self, state): car_width=self.params["width"], color=self.car_colors[ic], render_spec=self.render_spec, - map_origin=self.map_metadata["origin"], - resolution=self.map_metadata["resolution"], + map_origin=self.map_origin[:2], + resolution=self.map_resolution, ppu=self.ppus[self.active_map_renderer], ) for ic in range(len(self.agent_ids)) @@ -151,10 +164,28 @@ def update(self, state): # update time self.sim_time = state["sim_time"] - def add_renderer_callback(self, callback_fn: callable): + def add_renderer_callback(self, callback_fn: Callable[[EnvRenderer], None]) -> None: + """ + Add a custom callback for visualization. + All the callbacks are called at every rendering step, after having rendered the map and the cars. + + Parameters + ---------- + callback_fn : Callable[[EnvRenderer], None] + callback function to be called at every rendering step + """ self.callbacks.append(callback_fn) - def render(self): + def render(self) -> Optional[np.ndarray]: + """ + Render the current state in a frame. + It renders in the order: map, cars, callbacks, info text. + + Returns + ------- + Optional[np.ndarray] + if render_mode is "rgb_array", returns the rendered frame as an array + """ self.event_handling() self.canvas.fill((255, 255, 255)) # white background @@ -176,11 +207,8 @@ def render(self): screen_rect = self.canvas.get_rect() if self.follow_agent_flag: - origin = self.map_metadata["origin"] - resolution = ( - self.map_metadata["resolution"] - * self.ppus[self.active_map_renderer] - ) + origin = self.map_origin + resolution = self.map_resolution * self.ppus[self.active_map_renderer] ego_x, ego_y = self.cars[self.agent_to_follow].pose[:2] cx = (ego_x - origin[0]) / resolution cy = (ego_y - origin[1]) / resolution @@ -231,7 +259,7 @@ def render(self): ) return frame - def event_handling(self): + def event_handling(self) -> None: """ Handle interaction events to change point-of-view. @@ -282,21 +310,25 @@ def event_handling(self): def render_points( self, - points: Union[List, np.ndarray], - color: Tuple[int, int, int] = (0, 0, 255), - size: int = 1, - ): + points: list | np.ndarray, + color: Optional[tuple[int, int, int]] = (0, 0, 255), + size: Optional[int] = 1, + ) -> None: """ Render a sequence of xy points on screen. - Args: - points: sequence of xy points (N, 2) - color: rgb color of the points - size: size of the points in pixels + Parameters + ---------- + points : list | np.ndarray + list of points to render + color : Optional[tuple[int, int, int]], optional + color as rgb tuple, by default blue (0, 0, 255) + size : Optional[int], optional + size of the points in pixels, by default 1 """ - origin = self.map_metadata["origin"] + origin = self.map_origin ppu = self.ppus[self.active_map_renderer] - resolution = self.map_metadata["resolution"] * ppu + resolution = self.map_resolution * ppu points = ((points - origin[:2]) / resolution).astype(int) size = math.ceil(size / ppu) @@ -305,21 +337,25 @@ def render_points( def render_lines( self, - points: Union[List, np.ndarray], - color: Tuple[int, int, int] = (0, 0, 255), - size: int = 1, - ): + points: list | np.ndarray, + color: Optional[tuple[int, int, int]] = (0, 0, 255), + size: Optional[int] = 1, + ) -> None: """ Render a sequence of lines segments. - Args: - points: sequence of xy points (N, 2) - color: rgb color of the points - size: size of the points in pixels + Parameters + ---------- + points : list | np.ndarray + list of points to render + color : Optional[tuple[int, int, int]], optional + color as rgb tuple, by default blue (0, 0, 255) + size : Optional[int], optional + size of the line, by default 1 """ - origin = self.map_metadata["origin"] + origin = self.map_origin ppu = self.ppus[self.active_map_renderer] - resolution = self.map_metadata["resolution"] * ppu + resolution = self.map_resolution * ppu points = ((points - origin[:2]) / resolution).astype(int) size = math.ceil(size / ppu) @@ -329,21 +365,25 @@ def render_lines( def render_closed_lines( self, - points: Union[List, np.ndarray], - color: Tuple[int, int, int] = (0, 0, 255), - size: int = 1, - ): + points: list | np.ndarray, + color: Optional[tuple[int, int, int]] = (0, 0, 255), + size: Optional[int] = 1, + ) -> None: """ - Render a sequence of lines segments. - - Args: - points: sequence of xy points (N, 2) - color: rgb color of the points - size: size of the points in pixels + Render a sequence of lines segments forming a closed loop (draw a line between the last and the first point). + + Parameters + ---------- + points : list | np.ndarray + list of 2d points to render + color : Optional[tuple[int, int, int]], optional + color as rgb tuple, by default blue (0, 0, 255) + size : Optional[int], optional + size of the line, by default 1 """ - origin = self.map_metadata["origin"] + origin = self.map_origin ppu = self.ppus[self.active_map_renderer] - resolution = self.map_metadata["resolution"] * ppu + resolution = self.map_resolution * ppu points = ((points - origin[:2]) / resolution).astype(int) size = math.ceil(size / ppu) @@ -351,6 +391,9 @@ def render_closed_lines( self.map_canvas, color, closed=True, points=points, width=size ) - def close(self): - if self.render_mode == "human" or self.render_mode == "human_fast": + def close(self) -> None: + """ + Close the rendering environment. + """ + if self.render_mode in ["human", "human_fast"]: pygame.quit() diff --git a/gym/f110_gym/envs/reset/__init__.py b/gym/f110_gym/envs/reset/__init__.py index c6e05ba8..d5f56ea1 100644 --- a/gym/f110_gym/envs/reset/__init__.py +++ b/gym/f110_gym/envs/reset/__init__.py @@ -1,20 +1,21 @@ +from __future__ import annotations from f110_gym.envs.reset.masked_reset import GridResetFn, AllTrackResetFn from f110_gym.envs.reset.reset_fn import ResetFn from f110_gym.envs.track import Track -def make_reset_fn(type: str, track: Track, num_agents: int, **kwargs) -> ResetFn: - if type == "grid_static": - return GridResetFn(track=track, num_agents=num_agents, shuffle=False, **kwargs) - elif type == "grid_random": - return GridResetFn(track=track, num_agents=num_agents, shuffle=True, **kwargs) - elif type == "random_static": - return AllTrackResetFn( - track=track, num_agents=num_agents, shuffle=False, **kwargs - ) - elif type == "random_random": - return AllTrackResetFn( - track=track, num_agents=num_agents, shuffle=True, **kwargs - ) - else: - raise ValueError(f"invalid reset type {type}") +def make_reset_fn(type: str | None, track: Track, num_agents: int, **kwargs) -> ResetFn: + type = type or "rl_grid_static" + + try: + refline_token, reset_token, shuffle_token = type.split("_") + + refline = {"cl": track.centerline, "rl": track.raceline}[refline_token] + reset_fn = {"grid": GridResetFn, "random": AllTrackResetFn}[reset_token] + shuffle = {"static": False, "random": True}[shuffle_token] + options = {"cl": {"move_laterally": True}, "rl": {"move_laterally": False}}[refline_token] + + except Exception as ex: + raise ValueError(f"Invalid reset function type: {type}. Expected format: __") from ex + + return reset_fn(reference_line=refline, num_agents=num_agents, shuffle=shuffle, **options, **kwargs) diff --git a/gym/f110_gym/envs/reset/masked_reset.py b/gym/f110_gym/envs/reset/masked_reset.py index cac879cd..1f4e6983 100644 --- a/gym/f110_gym/envs/reset/masked_reset.py +++ b/gym/f110_gym/envs/reset/masked_reset.py @@ -4,7 +4,7 @@ from f110_gym.envs.reset.reset_fn import ResetFn from f110_gym.envs.reset.utils import sample_around_waypoint -from f110_gym.envs.track import Track +from f110_gym.envs.track import Track, Raceline class MaskedResetFn(ResetFn): @@ -14,23 +14,24 @@ def get_mask(self) -> np.ndarray: def __init__( self, - track: Track, + reference_line: Raceline, num_agents: int, move_laterally: bool, min_dist: float, max_dist: float, ): - self.track = track + self.reference_line = reference_line self.n_agents = num_agents self.min_dist = min_dist self.max_dist = max_dist self.move_laterally = move_laterally self.mask = self.get_mask() + self.reference_line = reference_line def sample(self) -> np.ndarray: waypoint_id = np.random.choice(np.where(self.mask)[0]) poses = sample_around_waypoint( - track=self.track, + reference_line=self.reference_line, waypoint_id=waypoint_id, n_agents=self.n_agents, min_dist=self.min_dist, @@ -43,9 +44,10 @@ def sample(self) -> np.ndarray: class GridResetFn(MaskedResetFn): def __init__( self, - track: Track, + reference_line: Raceline, num_agents: int, move_laterally: bool = True, + use_centerline: bool = True, shuffle: bool = True, start_width: float = 1.0, min_dist: float = 1.5, @@ -55,7 +57,7 @@ def __init__( self.shuffle = shuffle super().__init__( - track=track, + reference_line=reference_line, num_agents=num_agents, move_laterally=move_laterally, min_dist=min_dist, @@ -64,10 +66,10 @@ def __init__( def get_mask(self) -> np.ndarray: # approximate the nr waypoints in the starting line - step_size = self.track.centerline.length / self.track.centerline.n + step_size = self.reference_line.length / self.reference_line.n n_wps = int(self.start_width / step_size) - mask = np.zeros(self.track.centerline.n) + mask = np.zeros(self.reference_line.n) mask[:n_wps] = 1 return mask.astype(bool) @@ -83,7 +85,7 @@ def sample(self) -> np.ndarray: class AllTrackResetFn(MaskedResetFn): def __init__( self, - track: Track, + reference_line: Raceline, num_agents: int, move_laterally: bool = True, shuffle: bool = True, @@ -91,7 +93,7 @@ def __init__( max_dist: float = 2.5, ): super().__init__( - track=track, + reference_line=reference_line, num_agents=num_agents, move_laterally=move_laterally, min_dist=min_dist, @@ -100,7 +102,7 @@ def __init__( self.shuffle = shuffle def get_mask(self) -> np.ndarray: - return np.ones(self.track.centerline.n).astype(bool) + return np.ones(self.reference_line.n).astype(bool) def sample(self) -> np.ndarray: poses = super().sample() diff --git a/gym/f110_gym/envs/reset/utils.py b/gym/f110_gym/envs/reset/utils.py index 40ce9a73..c677572f 100644 --- a/gym/f110_gym/envs/reset/utils.py +++ b/gym/f110_gym/envs/reset/utils.py @@ -1,12 +1,10 @@ -from __future__ import annotations - import numpy as np -from f110_gym.envs.track import Track +from f110_gym.envs.track import Raceline def sample_around_waypoint( - track: Track, + reference_line: Raceline, waypoint_id: int, n_agents: int, min_dist: float, @@ -26,22 +24,22 @@ def sample_around_waypoint( - move_laterally: if True, the agents are sampled on the left/right of the track centerline """ current_wp_id = waypoint_id - n_waypoints = track.centerline.n + n_waypoints = reference_line.n poses = [] rnd_sign = ( - np.random.choice([-1, 1]) if move_laterally else 1 + np.random.choice([-1.0, 1.0]) if move_laterally else 0.0 ) # random sign to sample lateral position (left/right) for i in range(n_agents): # compute pose from current wp_id wp = [ - track.centerline.xs[current_wp_id], - track.centerline.ys[current_wp_id], + reference_line.xs[current_wp_id], + reference_line.ys[current_wp_id], ] next_wp_id = (current_wp_id + 1) % n_waypoints next_wp = [ - track.centerline.xs[next_wp_id], - track.centerline.ys[next_wp_id], + reference_line.xs[next_wp_id], + reference_line.ys[next_wp_id], ] theta = np.arctan2(next_wp[1] - wp[1], next_wp[0] - wp[0]) @@ -65,8 +63,8 @@ def sample_around_waypoint( if pnt_id > n_waypoints - 1: pnt_id = 0 # increment distance - x_diff = track.centerline.xs[pnt_id] - track.centerline.xs[pnt_id - 1] - y_diff = track.centerline.ys[pnt_id] - track.centerline.ys[pnt_id - 1] + x_diff = reference_line.xs[pnt_id] - reference_line.xs[pnt_id - 1] + y_diff = reference_line.ys[pnt_id] - reference_line.ys[pnt_id - 1] dist = dist + np.linalg.norm( [y_diff, x_diff] ) # approx distance by summing linear segments diff --git a/gym/f110_gym/envs/track.py b/gym/f110_gym/envs/track.py deleted file mode 100644 index ec3edc5c..00000000 --- a/gym/f110_gym/envs/track.py +++ /dev/null @@ -1,271 +0,0 @@ -import pathlib -import tarfile -from dataclasses import dataclass -from typing import Tuple -import tempfile - -import numpy as np -import requests -import yaml -import warnings -from f110_gym.envs.cubic_spline import CubicSpline2D -from PIL import Image -from PIL.Image import Transpose -from yamldataclassconfig.config import YamlDataClassConfig - - -class Raceline: - n: int - - ss: np.ndarray # cumulative distance along the raceline - xs: np.ndarray # x-coordinates of the raceline - ys: np.ndarray # y-coordinates of the raceline - yaws: np.ndarray # yaw angle of the raceline - ks: np.ndarray # curvature of the raceline - vxs: np.ndarray # velocity along the raceline - axs: np.ndarray # acceleration along the raceline - - length: float - - def __init__( - self, - xs: np.ndarray, - ys: np.ndarray, - velxs: np.ndarray, - ss: np.ndarray = None, - psis: np.ndarray = None, - kappas: np.ndarray = None, - accxs: np.ndarray = None, - spline: CubicSpline2D = None, - ): - assert xs.shape == ys.shape == velxs.shape, "inconsistent shapes for x, y, vel" - - self.n = xs.shape[0] - self.ss = ss - self.xs = xs - self.ys = ys - self.yaws = psis - self.ks = kappas - self.vxs = velxs - self.axs = accxs - - # approximate track length by linear-interpolation of x,y waypoints - # note: we could use 'ss' but sometimes it is normalized to [0,1], so we recompute it here - self.length = float(np.sum(np.sqrt(np.diff(xs) ** 2 + np.diff(ys) ** 2))) - - # compute spline - self.spline = spline if spline is not None else CubicSpline2D(xs, ys) - - @staticmethod - def from_centerline_file( - filepath: pathlib.Path, delimiter: str = ",", fixed_speed: float = 1.0 - ): - assert filepath.exists(), f"input filepath does not exist ({filepath})" - waypoints = np.loadtxt(filepath, delimiter=delimiter) - assert waypoints.shape[1] == 4, "expected waypoints as [x, y, w_left, w_right]" - - # fit cubic spline to waypoints - xx, yy = waypoints[:, 0], waypoints[:, 1] - # close loop - xx = np.append(xx, xx[0]) - yy = np.append(yy, yy[0]) - spline = CubicSpline2D(xx, yy) - ds = 0.1 - - ss, xs, ys, yaws, ks = [], [], [], [], [] - - for i_s in np.arange(0, spline.s[-1], ds): - x, y = spline.calc_position(i_s) - yaw = spline.calc_yaw(i_s) - k = spline.calc_curvature(i_s) - - xs.append(x) - ys.append(y) - yaws.append(yaw) - ks.append(k) - ss.append(i_s) - - return Raceline( - ss=np.array(ss).astype(np.float32), - xs=np.array(xs).astype(np.float32), - ys=np.array(ys).astype(np.float32), - psis=np.array(yaws).astype(np.float32), - kappas=np.array(ks).astype(np.float32), - velxs=np.ones_like(ss).astype( - np.float32 - ), # centerline does not have a speed profile, keep it constant at 1.0 m/s - accxs=np.zeros_like(ss).astype(np.float32), # constant acceleration - ) - - @staticmethod - def from_raceline_file(filepath: pathlib.Path, delimiter: str = ";"): - assert filepath.exists(), f"input filepath does not exist ({filepath})" - waypoints = np.loadtxt(filepath, delimiter=delimiter).astype(np.float32) - assert ( - waypoints.shape[1] == 7 - ), "expected waypoints as [s, x, y, psi, k, vx, ax]" - return Raceline( - ss=waypoints[:, 0], - xs=waypoints[:, 1], - ys=waypoints[:, 2], - psis=waypoints[:, 3], - kappas=waypoints[:, 4], - velxs=waypoints[:, 5], - accxs=waypoints[:, 6], - ) - - -@dataclass -class TrackSpec(YamlDataClassConfig): - name: str - image: str - resolution: float - origin: Tuple[float, float, float] - negate: int - occupied_thresh: float - free_thresh: float - - -def find_track_dir(track_name): - # we assume there are no blank space in the track name. however, to take into account eventual blank spaces in - # the map dirpath, we loop over all possible maps and check if there is a matching with the current track - map_dir = pathlib.Path(__file__).parent.parent.parent.parent / "maps" - - if not (map_dir / track_name).exists(): - print("Downloading Files for: " + track_name) - tracks_url = "http://api.f1tenth.org/" + track_name + ".tar.xz" - tracks_r = requests.get(url=tracks_url, allow_redirects=True) - if tracks_r.status_code == 404: - raise FileNotFoundError(f"No maps exists for {track_name}.") - - tempdir = tempfile.gettempdir() + "/" - - with open(tempdir + track_name + ".tar.xz", "wb") as f: - f.write(tracks_r.content) - - # extract - print("Extracting Files for: " + track_name) - tracks_file = tarfile.open(tempdir + track_name + ".tar.xz") - tracks_file.extractall(map_dir) - tracks_file.close() - - for base_dir in [map_dir]: - if not base_dir.exists(): - continue - - for dir in base_dir.iterdir(): - if track_name == str(dir.stem).replace(" ", ""): - return dir - - raise FileNotFoundError(f"no mapdir matching {track_name} in {[map_dir]}") - - -@dataclass -class Track: - spec: TrackSpec - filepath: str - ext: str - occupancy_map: np.ndarray - centerline: Raceline - raceline: Raceline - - def __init__( - self, - spec: TrackSpec, - filepath: str, - ext: str, - occupancy_map: np.ndarray, - edt: np.ndarray, - centerline: Raceline = None, - raceline: Raceline = None, - ): - self.spec = spec - self.filepath = filepath - self.ext = ext - self.occupancy_map = occupancy_map - self.centerline = centerline - self.raceline = raceline - - @staticmethod - def load_spec(track: str, filespec: str): - """ - Load track specification from yaml file. - - Args: - - """ - with open(filespec, "r") as yaml_stream: - map_metadata = yaml.safe_load(yaml_stream) - track_spec = TrackSpec(name=track, **map_metadata) - return track_spec - - @staticmethod - def from_track_name(track: str): - try: - track_dir = find_track_dir(track) - track_spec = Track.load_spec( - track=track, filespec=str(track_dir / f"{track_dir.stem}_map.yaml") - ) - - # load occupancy grid - map_filename = pathlib.Path(track_spec.image) - image = Image.open(track_dir / str(map_filename)).transpose( - Transpose.FLIP_TOP_BOTTOM - ) - occupancy_map = np.array(image).astype(np.float32) - occupancy_map[occupancy_map <= 128] = 0.0 - occupancy_map[occupancy_map > 128] = 255.0 - - # if exists, load edt - if (track_dir / f"{track}_map.npy").exists(): - edt = np.load(track_dir / f"{track}_map.npy") - else: - edt = None - warnings.warn( - f"Track Distance Transform file at {track_dir / f'{track}_map.npy'} not found, will be created before initialization." - ) - - # if exists, load centerline - if (track_dir / f"{track}_centerline.csv").exists(): - centerline = Raceline.from_centerline_file( - track_dir / f"{track}_centerline.csv" - ) - else: - centerline = None - warnings.warn( - f"Track Centerline file at {track_dir / f'{track}_centerline.csv'} not found, setting None." - ) - - # if exists, load raceline - if (track_dir / f"{track}_raceline.csv").exists(): - raceline = Raceline.from_raceline_file( - track_dir / f"{track}_raceline.csv" - ) - else: - raceline = centerline - if centerline is None: - warnings.warn( - f"Track Raceline file at {track_dir / f'{track}_raceline.csv'} not found, setting None." - ) - else: - warnings.warn( - f"Track Raceline file at {track_dir / f'{track}_raceline.csv'} not found, using Centerline." - ) - - return Track( - spec=track_spec, - filepath=str((track_dir / map_filename.stem).absolute()), - ext=map_filename.suffix, - occupancy_map=occupancy_map, - edt=edt, - centerline=centerline, - raceline=raceline, - ) - except Exception as ex: - print(ex) - raise FileNotFoundError(f"could not load track {track}") from ex - - -if __name__ == "__main__": - track = Track.from_track_name("Example") - print("[Result] map loaded successfully") diff --git a/gym/f110_gym/envs/track/__init__.py b/gym/f110_gym/envs/track/__init__.py new file mode 100644 index 00000000..74938f9c --- /dev/null +++ b/gym/f110_gym/envs/track/__init__.py @@ -0,0 +1,3 @@ +from .raceline import Raceline +from .track import Track, TrackSpec +from .utils import find_track_dir diff --git a/gym/f110_gym/envs/cubic_spline.py b/gym/f110_gym/envs/track/cubic_spline.py similarity index 51% rename from gym/f110_gym/envs/cubic_spline.py rename to gym/f110_gym/envs/track/cubic_spline.py index 8b1a0fab..b148c269 100644 --- a/gym/f110_gym/envs/cubic_spline.py +++ b/gym/f110_gym/envs/track/cubic_spline.py @@ -2,6 +2,7 @@ Code from Cubic spline planner Author: Atsushi Sakai(@Atsushi_twi) """ +from __future__ import annotations import bisect import math @@ -11,16 +12,43 @@ class CubicSpline1D: """ 1D Cubic Spline class - Parameters + + Attributes ---------- x : list x coordinates for data points. This x coordinates must be sorted in ascending order. y : list y coordinates for data points + a : list + coefficient a + b : list + coefficient b + c : list + coefficient c + d : list + coefficient d + nx : int + dimension of x """ - def __init__(self, x, y): + def __init__(self, x: np.ndarray, y: np.ndarray): + """ + Returns a 1D cubic spline object. + + Parameters + ---------- + x : list + x coordinates for data points. This x coordinates must be + sorted in ascending order. + y : list + y coordinates for data points + + Raises + ------ + ValueError + if x is not sorted in ascending order + """ h = np.diff(x) if np.any(h < 0): raise ValueError("x coordinates must be sorted in ascending order") @@ -30,12 +58,12 @@ def __init__(self, x, y): self.y = y self.nx = len(x) # dimension of x - # calc coefficient a + # Calc coefficient a self.a = [iy for iy in y] - # calc coefficient c - A = self.__calc_A(h) - B = self.__calc_B(h, self.a) + # Calc coefficient c + A = self.__calc_A(h=h) + B = self.__calc_B(h=h, a=self.a) self.c = np.linalg.solve(A, B) # calc spline coefficient b and d @@ -47,14 +75,20 @@ def __init__(self, x, y): self.d.append(d) self.b.append(b) - def calc_position(self, x): + def calc_position(self, x: float) -> float | None: """ Calc `y` position for given `x`. - if `x` is outside the data point's `x` range, return None. + If `x` is outside the data point's `x` range, return None. + + Parameters + ---------- + x : float + position for which to calculate y. + Returns ------- y : float - y position for given x. + position along the spline for given x. """ if x < self.x[0]: return None @@ -64,15 +98,21 @@ def calc_position(self, x): i = self.__search_index(x) dx = x - self.x[i] position = ( - self.a[i] + self.b[i] * dx + self.c[i] * dx**2.0 + self.d[i] * dx**3.0 + self.a[i] + self.b[i] * dx + self.c[i] * dx ** 2.0 + self.d[i] * dx ** 3.0 ) return position - def calc_first_derivative(self, x): + def calc_first_derivative(self, x: float) -> float | None: """ Calc first derivative at given x. - if x is outside the input x, return None + If x is outside the input x, return None + + Parameters + ---------- + x : float + position for which to calculate the first derivative. + Returns ------- dy : float @@ -86,13 +126,19 @@ def calc_first_derivative(self, x): i = self.__search_index(x) dx = x - self.x[i] - dy = self.b[i] + 2.0 * self.c[i] * dx + 3.0 * self.d[i] * dx**2.0 + dy = self.b[i] + 2.0 * self.c[i] * dx + 3.0 * self.d[i] * dx ** 2.0 return dy - def calc_second_derivative(self, x): + def calc_second_derivative(self, x: float) -> float | None: """ Calc second derivative at given x. - if x is outside the input x, return None + If x is outside the input x, return None + + Parameters + ---------- + x : float + position for which to calculate the second derivative. + Returns ------- ddy : float @@ -109,15 +155,35 @@ def calc_second_derivative(self, x): ddy = 2.0 * self.c[i] + 6.0 * self.d[i] * dx return ddy - def __search_index(self, x): + def __search_index(self, x: float) -> int: """ - search data segment index + Search data segment index. + + Parameters + ---------- + x : float + position for which to find the segment index. + + Returns + ------- + index : int + index of the segment. """ return bisect.bisect(self.x[:-1], x) - 1 - def __calc_A(self, h): + def __calc_A(self, h: np.ndarray) -> np.ndarray: """ - calc matrix A for spline coefficient c + Calc matrix A for spline coefficient c. + + Parameters + ---------- + h : np.ndarray + difference of x coordinates. + + Returns + ------- + A : np.ndarray + matrix A. """ A = np.zeros((self.nx, self.nx)) A[0, 0] = 1.0 @@ -132,9 +198,21 @@ def __calc_A(self, h): A[self.nx - 1, self.nx - 1] = 1.0 return A - def __calc_B(self, h, a): + def __calc_B(self, h: np.ndarray, a: np.ndarray) -> np.ndarray: """ - calc matrix B for spline coefficient c + Calc matrix B for spline coefficient c. + + Parameters + ---------- + h : np.ndarray + difference of x coordinates. + a : np.ndarray + y coordinates for data points. + + Returns + ------- + B : np.ndarray + matrix B. """ B = np.zeros(self.nx) for i in range(self.nx - 2): @@ -146,41 +224,71 @@ def __calc_B(self, h, a): class CubicSpline2D: """ - Cubic CubicSpline2D class - Parameters + Cubic CubicSpline2D class. + + Attributes ---------- - x : list - x coordinates for data points. - y : list - y coordinates for data points. + s : list + cumulative distance along the data points. + sx : CubicSpline1D + cubic spline for x coordinates. + sy : CubicSpline1D + cubic spline for y coordinates. """ - def __init__(self, x, y): + def __init__(self, x: np.ndarray, y: np.ndarray): + """ + Returns a 2D cubic spline object. + + Parameters + ---------- + x : list + x coordinates for data points. + y : list + y coordinates for data points. + """ self.s = self.__calc_s(x, y) - self.sx = CubicSpline1D(self.s, x) - self.sy = CubicSpline1D(self.s, y) + self.sx = CubicSpline1D(x=self.s, y=x) + self.sy = CubicSpline1D(x=self.s, y=y) + + def __calc_s(self, x: np.ndarray, y: np.ndarray) -> np.ndarray: + """ + Calc cumulative distance. - def __calc_s(self, x, y): + Parameters + ---------- + x : list + x coordinates for data points. + y : list + y coordinates for data points. + + Returns + ------- + s : np.ndarray + cumulative distance along the data points. + """ dx = np.diff(x) dy = np.diff(y) self.ds = np.hypot(dx, dy) s = [0] s.extend(np.cumsum(self.ds)) - return s + return np.array(s) - def calc_position(self, s): + def calc_position(self, s: float) -> tuple[float | None, float | None]: """ - calc position + Calc position at the given s. + Parameters ---------- s : float distance from the start point. if `s` is outside the data point's range, return None. + Returns ------- - x : float + x : float | None x position for given s. - y : float + y : float | None y position for given s. """ x = self.sx.calc_position(s) @@ -188,14 +296,16 @@ def calc_position(self, s): return x, y - def calc_curvature(self, s): + def calc_curvature(self, s: float) -> float | None: """ - calc curvature + Calc curvature at the given s. + Parameters ---------- s : float distance from the start point. if `s` is outside the data point's range, return None. + Returns ------- k : float @@ -205,17 +315,18 @@ def calc_curvature(self, s): ddx = self.sx.calc_second_derivative(s) dy = self.sy.calc_first_derivative(s) ddy = self.sy.calc_second_derivative(s) - k = (ddy * dx - ddx * dy) / ((dx**2 + dy**2) ** (3 / 2)) + k = (ddy * dx - ddx * dy) / ((dx ** 2 + dy ** 2) ** (3 / 2)) return k - def calc_yaw(self, s): + def calc_yaw(self, s: float) -> float | None: """ - calc yaw + Calc yaw angle at the given s. + Parameters ---------- s : float - distance from the start point. if `s` is outside the data point's - range, return None. + distance from the start point. If `s` is outside the data point's range, return None. + Returns ------- yaw : float diff --git a/gym/f110_gym/envs/track/raceline.py b/gym/f110_gym/envs/track/raceline.py new file mode 100644 index 00000000..e1420b24 --- /dev/null +++ b/gym/f110_gym/envs/track/raceline.py @@ -0,0 +1,169 @@ +from __future__ import annotations +import pathlib +from typing import Optional + +import numpy as np + +from f110_gym.envs.rendering import EnvRenderer +from f110_gym.envs.track.cubic_spline import CubicSpline2D + + +class Raceline: + """ + Raceline object. + + Attributes + ---------- + n : int + number of waypoints + ss : np.ndarray + arclength along the raceline + xs : np.ndarray + x-coordinates of the waypoints + ys : np.ndarray + y-coordinates of the waypoints + yaws : np.ndarray + yaw angles of the waypoints + ks : np.ndarray + curvature of the waypoints + vxs : np.ndarray + velocity along the raceline + axs : np.ndarray + acceleration along the raceline + length : float + length of the raceline + spline : CubicSpline2D + spline object through the waypoints + """ + + def __init__( + self, + xs: np.ndarray, + ys: np.ndarray, + velxs: np.ndarray, + ss: Optional[np.ndarray] = None, + psis: Optional[np.ndarray] = None, + kappas: Optional[np.ndarray] = None, + accxs: Optional[np.ndarray] = None, + spline: Optional[CubicSpline2D] = None, + ): + assert xs.shape == ys.shape == velxs.shape, "inconsistent shapes for x, y, vel" + + self.n = xs.shape[0] + self.ss = ss + self.xs = xs + self.ys = ys + self.yaws = psis + self.ks = kappas + self.vxs = velxs + self.axs = accxs + + # approximate track length by linear-interpolation of x,y waypoints + # note: we could use 'ss' but sometimes it is normalized to [0,1], so we recompute it here + self.length = float(np.sum(np.sqrt(np.diff(xs) ** 2 + np.diff(ys) ** 2))) + + # compute spline through waypoints if not provided + self.spline = spline or CubicSpline2D(x=xs, y=ys) + + @staticmethod + def from_centerline_file( + filepath: pathlib.Path, + delimiter: Optional[str] = ",", + fixed_speed: Optional[float] = 1.0, + ): + """ + Load raceline from a centerline file. + + Parameters + ---------- + filepath : pathlib.Path + path to the centerline file + delimiter : str, optional + delimiter used in the file, by default "," + fixed_speed : float, optional + fixed speed along the raceline, by default 1.0 + + Returns + ------- + Raceline + raceline object + """ + assert filepath.exists(), f"input filepath does not exist ({filepath})" + waypoints = np.loadtxt(filepath, delimiter=delimiter) + assert waypoints.shape[1] == 4, "expected waypoints as [x, y, w_left, w_right]" + + # fit cubic spline to waypoints + xx, yy = waypoints[:, 0], waypoints[:, 1] + # close loop + xx = np.append(xx, xx[0]) + yy = np.append(yy, yy[0]) + spline = CubicSpline2D(x=xx, y=yy) + ds = 0.1 + + ss, xs, ys, yaws, ks = [], [], [], [], [] + + for i_s in np.arange(0, spline.s[-1], ds): + x, y = spline.calc_position(i_s) + yaw = spline.calc_yaw(i_s) + k = spline.calc_curvature(i_s) + + xs.append(x) + ys.append(y) + yaws.append(yaw) + ks.append(k) + ss.append(i_s) + + return Raceline( + ss=np.array(ss).astype(np.float32), + xs=np.array(xs).astype(np.float32), + ys=np.array(ys).astype(np.float32), + psis=np.array(yaws).astype(np.float32), + kappas=np.array(ks).astype(np.float32), + velxs=np.ones_like(ss).astype(np.float32) * fixed_speed, # constant speed + accxs=np.zeros_like(ss).astype(np.float32), # constant acceleration + spline=spline, + ) + + @staticmethod + def from_raceline_file(filepath: pathlib.Path, delimiter: str = ";"): + """ + Load raceline from a raceline file. + + Parameters + ---------- + filepath : pathlib.Path + path to the raceline file + delimiter : str, optional + delimiter used in the file, by default ";" + + Returns + ------- + Raceline + raceline object + """ + assert filepath.exists(), f"input filepath does not exist ({filepath})" + waypoints = np.loadtxt(filepath, delimiter=delimiter).astype(np.float32) + assert ( + waypoints.shape[1] == 7 + ), "expected waypoints as [s, x, y, psi, k, vx, ax]" + return Raceline( + ss=waypoints[:, 0], + xs=waypoints[:, 1], + ys=waypoints[:, 2], + psis=waypoints[:, 3], + kappas=waypoints[:, 4], + velxs=waypoints[:, 5], + accxs=waypoints[:, 6], + ) + + def render_waypoints(self, e: EnvRenderer) -> None: + """ + Callback to render waypoints. + + Parameters + ---------- + e : EnvRenderer + Environment renderer object. + """ + points = np.stack([self.xs, self.ys], axis=1) + e.render_closed_lines(points, color=(0, 128, 0), size=1) diff --git a/gym/f110_gym/envs/track/track.py b/gym/f110_gym/envs/track/track.py new file mode 100644 index 00000000..9163b67a --- /dev/null +++ b/gym/f110_gym/envs/track/track.py @@ -0,0 +1,239 @@ +from __future__ import annotations +import pathlib +from dataclasses import dataclass +from typing import Tuple, Optional + +import numpy as np +import yaml +from PIL import Image +from PIL.Image import Transpose +from yamldataclassconfig.config import YamlDataClassConfig + +from f110_gym.envs.track import Raceline +from f110_gym.envs.track.cubic_spline import CubicSpline2D +from f110_gym.envs.track.utils import find_track_dir + + +@dataclass +class TrackSpec(YamlDataClassConfig): + name: str + image: str + resolution: float + origin: Tuple[float, float, float] + negate: int + occupied_thresh: float + free_thresh: float + + +@dataclass +class Track: + spec: TrackSpec + filepath: str + ext: str + occupancy_map: np.ndarray + centerline: Raceline + raceline: Raceline + + def __init__( + self, + spec: TrackSpec, + filepath: str, + ext: str, + occupancy_map: np.ndarray, + centerline: Optional[Raceline] = None, + raceline: Optional[Raceline] = None, + ): + """ + Initialize track object. + + Parameters + ---------- + spec : TrackSpec + track specification + filepath : str + path to the track image + ext : str + file extension of the track image file + occupancy_map : np.ndarray + occupancy grid map + centerline : Raceline, optional + centerline of the track, by default None + raceline : Raceline, optional + raceline of the track, by default None + """ + self.spec = spec + self.filepath = filepath + self.ext = ext + self.occupancy_map = occupancy_map + self.centerline = centerline + self.raceline = raceline + + @staticmethod + def load_spec(track: str, filespec: str) -> TrackSpec: + """ + Load track specification from yaml file. + + Parameters + ---------- + track : str + name of the track + filespec : str + path to the yaml file + + Returns + ------- + TrackSpec + track specification + """ + with open(filespec, "r") as yaml_stream: + map_metadata = yaml.safe_load(yaml_stream) + track_spec = TrackSpec(name=track, **map_metadata) + return track_spec + + @staticmethod + def from_track_name(track: str): + """ + Load track from track name. + + Parameters + ---------- + track : str + name of the track + + Returns + ------- + Track + track object + + Raises + ------ + FileNotFoundError + if the track cannot be loaded + """ + try: + track_dir = find_track_dir(track) + track_spec = Track.load_spec( + track=track, filespec=str(track_dir / f"{track_dir.stem}_map.yaml") + ) + + # load occupancy grid + map_filename = pathlib.Path(track_spec.image) + image = Image.open(track_dir / str(map_filename)).transpose( + Transpose.FLIP_TOP_BOTTOM + ) + occupancy_map = np.array(image).astype(np.float32) + occupancy_map[occupancy_map <= 128] = 0.0 + occupancy_map[occupancy_map > 128] = 255.0 + + # if exists, load centerline + if (track_dir / f"{track}_centerline.csv").exists(): + centerline = Raceline.from_centerline_file( + track_dir / f"{track}_centerline.csv" + ) + else: + centerline = None + + # if exists, load raceline + if (track_dir / f"{track}_raceline.csv").exists(): + raceline = Raceline.from_raceline_file( + track_dir / f"{track}_raceline.csv" + ) + else: + raceline = centerline + + return Track( + spec=track_spec, + filepath=str((track_dir / map_filename.stem).absolute()), + ext=map_filename.suffix, + occupancy_map=occupancy_map, + centerline=centerline, + raceline=raceline, + ) + except Exception as ex: + print(ex) + raise FileNotFoundError(f"It could not load track {track}") from ex + + @staticmethod + def from_refline(x: np.ndarray, y: np.ndarray, velx: np.ndarray,): + """ + Create an empty track reference line. + + Parameters + ---------- + x : np.ndarray + x-coordinates of the waypoints + y : np.ndarray + y-coordinates of the waypoints + velx : np.ndarray + velocities at the waypoints + + Returns + ------- + Track + track object + """ + ds = 0.1 + resolution = 0.05 + margin_perc = 0.1 + + spline = CubicSpline2D(x=x, y=y) + ss, xs, ys, yaws, ks, vxs = [], [], [], [], [], [] + for i_s in np.arange(0, spline.s[-1], ds): + xi, yi = spline.calc_position(i_s) + yaw = spline.calc_yaw(i_s) + k = spline.calc_curvature(i_s) + + # find closest waypoint + closest = np.argmin(np.hypot(x - xi, y - yi)) + v = velx[closest] + + xs.append(xi) + ys.append(yi) + yaws.append(yaw) + ks.append(k) + ss.append(i_s) + vxs.append(v) + + refline = Raceline( + ss=np.array(ss).astype(np.float32), + xs=np.array(xs).astype(np.float32), + ys=np.array(ys).astype(np.float32), + psis=np.array(yaws).astype(np.float32), + kappas=np.array(ks).astype(np.float32), + velxs=np.array(vxs).astype(np.float32), + accxs=np.zeros_like(ss).astype(np.float32), + spline=spline, + ) + + min_x, max_x = np.min(xs), np.max(xs) + min_y, max_y = np.min(ys), np.max(ys) + x_range = max_x - min_x + y_range = max_y - min_y + occupancy_map = 255.0 * np.ones( + ( + int((1 + 2 * margin_perc) * x_range / resolution), + int((1 + 2 * margin_perc) * y_range / resolution), + ), + dtype=np.float32, + ) + # origin is the bottom left corner + origin = (min_x - margin_perc * x_range, min_y - margin_perc * y_range, 0.0) + + track_spec = TrackSpec( + name=None, + image=None, + resolution=resolution, + origin=origin, + negate=False, + occupied_thresh=0.65, + free_thresh=0.196, + ) + + return Track( + spec=track_spec, + filepath=None, + ext=None, + occupancy_map=occupancy_map, + raceline=refline, + centerline=refline, + ) diff --git a/gym/f110_gym/envs/track/utils.py b/gym/f110_gym/envs/track/utils.py new file mode 100644 index 00000000..e3e88a09 --- /dev/null +++ b/gym/f110_gym/envs/track/utils.py @@ -0,0 +1,51 @@ +import pathlib +import tarfile +import tempfile + +import requests + + +def find_track_dir(track_name: str) -> pathlib.Path: + """ + Find the directory of the track map corresponding to the given track name. + + Parameters + ---------- + track_name : str + name of the track + + Returns + ------- + pathlib.Path + path to the track map directory + + Raises + ------ + FileNotFoundError + if no map directory matching the track name is found + """ + map_dir = pathlib.Path(__file__).parent.parent.parent.parent / "maps" + + if not (map_dir / track_name).exists(): + print("Downloading Files for: " + track_name) + tracks_url = "http://api.f1tenth.org/" + track_name + ".tar.xz" + tracks_r = requests.get(url=tracks_url, allow_redirects=True) + if tracks_r.status_code == 404: + raise FileNotFoundError(f"No maps exists for {track_name}.") + + tempdir = tempfile.gettempdir() + "/" + + with open(tempdir + track_name + ".tar.xz", "wb") as f: + f.write(tracks_r.content) + + print("Extracting Files for: " + track_name) + tracks_file = tarfile.open(tempdir + track_name + ".tar.xz") + tracks_file.extractall(map_dir) + tracks_file.close() + + # search for map in the map directory + for subdir in map_dir.iterdir(): + if track_name == str(subdir.stem).replace(" ", ""): + return subdir + + raise FileNotFoundError(f"no mapdir matching {track_name} in {[map_dir]}") diff --git a/gym/f110_gym/envs/utils.py b/gym/f110_gym/envs/utils.py index 2717354c..4bed0ca9 100644 --- a/gym/f110_gym/envs/utils.py +++ b/gym/f110_gym/envs/utils.py @@ -1,4 +1,3 @@ -# types from typing import Any, Dict, TypeVar KeyType = TypeVar("KeyType") diff --git a/gym/f110_gym/test/benchmark_renderer.py b/gym/f110_gym/test/benchmark_renderer.py deleted file mode 100644 index 4cfc4277..00000000 --- a/gym/f110_gym/test/benchmark_renderer.py +++ /dev/null @@ -1,222 +0,0 @@ -import numpy as np - -from f110_gym.envs import F110Env -from f110_gym.envs.utils import deep_update - - -def pretty_print(dict: dict, col_width=15): - keys = list(dict.keys()) - columns = ["key"] + [str(k) for k in dict[keys[0]]] - - # opening line - for _ in columns: - print("|" + "-" * col_width, end="") - print("|") - # header - for col in columns: - padding = max(0, col_width - len(col)) - print("|" + col[:col_width] + " " * padding, end="") - print("|") - # separator line - for _ in columns: - print("|" + "-" * col_width, end="") - print("|") - - # table - for key in keys: - padding = max(0, col_width - len(str(key))) - print("|" + str(key)[:col_width] + " " * padding, end="") - for col in columns[1:]: - padding = max(0, col_width - len(str(dict[key][col]))) - print("|" + str(dict[key][col])[:col_width] + " " * padding, end="") - print("|") - - # footer - for col in columns: - print("|" + "-" * col_width, end="") - print("|") - - -class BenchmarkRenderer: - @staticmethod - def _make_env(config={}, render_mode=None) -> F110Env: - import gymnasium as gym - import f110_gym - - base_config = { - "map": "Spielberg", - "num_agents": 1, - "timestep": 0.01, - "integrator": "rk4", - "control_input": ["speed", "steering_angle"], - "model": "st", - "observation_config": {"type": "kinematic_state"}, - "params": {"mu": 1.0}, - } - config = deep_update(base_config, config) - - env = gym.make( - "f110_gym:f110-v0", - config=config, - render_mode=render_mode, - ) - - return env - - def benchmark_single_agent_rendering(self): - import time - - sim_time = 15.0 # seconds - results = {} - - for render_mode in [None, "human", "human_fast", "rgb_array", "rgb_array_list"]: - env = self._make_env(render_mode=render_mode) - env.reset() - frame = env.render() - - print( - f"Running simulation of {sim_time}s for render mode: {render_mode}..." - ) - - max_steps = int(sim_time / env.timestep) - t0 = time.time() - for _ in range(max_steps): - action = env.action_space.sample() - env.step(action) - frame = env.render() - tf = time.time() - env.close() - - results[render_mode] = { - "sim_time": sim_time, - "elapsed_time": tf - t0, - "fps": max_steps / (tf - t0), - } - - pretty_print(results) - - def benchmark_n_agents_human_rendering(self): - """ - This is meant to benchmark the human rendering mode, for increasing nr of agents. - """ - import time - - sim_time = 15.0 # seconds - render_mode = "human" - - results = {} - - for num_agents in [1, 2, 3, 4, 5, 10]: - env = self._make_env( - config={"num_agents": num_agents}, render_mode=render_mode - ) - env.reset() - frame = env.render() - - print( - f"Running simulation of {num_agents} agents for render mode: {render_mode}..." - ) - - max_steps = int(sim_time / env.timestep) - t0 = time.time() - for _ in range(max_steps): - action = env.action_space.sample() - env.step(action) - frame = env.render() - tf = time.time() - env.close() - - results[num_agents] = { - "sim_time": sim_time, - "elapsed_time": tf - t0, - "fps": max_steps / (tf - t0), - } - - pretty_print(results) - - def benchmark_callbacks_human_rendering(self): - import time - - sim_time = 15.0 # seconds - render_mode = "human" - - results = {} - - class GoStraightPlanner: - def __init__(self, env, agent_id: str = "agent_0"): - self.waypoints = np.stack( - [env.track.raceline.xs, env.track.raceline.ys] - ).T - self.pos = None - self.agent_id = agent_id - - def plan(self, obs): - state = obs[self.agent_id] - self.pos = np.array([state["pose_x"], state["pose_y"]]) - return np.array([0.0, 2.5]) - - def render_waypoints(self, e): - e.render_closed_lines(points=self.waypoints, size=1) - - def render_position(self, e): - if self.pos is not None: - points = self.pos[None] - e.render_points(points, size=1) - - for render_config in [[False, False], [True, False], [True, True]]: - env = self._make_env(render_mode=render_mode) - planner = GoStraightPlanner(env) - - show_path, show_point = render_config - config_str = f"show_path={show_path}, show_point={show_point}" - - if show_path: - env.add_render_callback(callback_func=planner.render_waypoints) - - if show_point: - env.add_render_callback(callback_func=planner.render_position) - - rnd_idx = np.random.randint(0, len(env.track.raceline.xs)) - obs, _ = env.reset( - options={ - "poses": np.array( - [ - [ - env.track.raceline.xs[rnd_idx], - env.track.raceline.ys[rnd_idx], - env.track.raceline.yaws[rnd_idx], - ] - ] - ) - } - ) - frame = env.render() - - print( - f"Running simulation of {config_str} for render mode: {render_mode}..." - ) - - max_steps = int(sim_time / env.timestep) - t0 = time.time() - for _ in range(max_steps): - action = planner.plan(obs=obs) - obs, _, _, _, _ = env.step(np.array([action])) - frame = env.render() - tf = time.time() - env.close() - - results[config_str] = { - "sim_time": sim_time, - "elapsed_time": tf - t0, - "fps": max_steps / (tf - t0), - } - - pretty_print(results) - - -if __name__ == "__main__": - benchmark = BenchmarkRenderer() - - benchmark.benchmark_single_agent_rendering() - benchmark.benchmark_n_agents_human_rendering() - benchmark.benchmark_callbacks_human_rendering() diff --git a/gym/f110_gym/test/test_scan_sim.py b/gym/f110_gym/test/test_scan_sim.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/test_f110_env.py b/tests/test_f110_env.py index 545b9ea3..0a2f5e9b 100644 --- a/tests/test_f110_env.py +++ b/tests/test_f110_env.py @@ -187,8 +187,8 @@ def test_manual_reset_options_in_asynch_vec_env(self): "num_agents": num_agents, "observation_config": {"type": "kinematic_state"}, } - vec_env = gym.vector.make( - "f110_gym:f110-v0", asynchronous=True, config=config, num_envs=num_envs + vec_env = gym.make_vec( + "f110_gym:f110-v0", vectorization_mode="async", config=config, num_envs=num_envs ) rnd_poses = np.random.random((2, 3)) @@ -217,10 +217,10 @@ def test_auto_reset_options_in_synch_vec_env(self): config = { "num_agents": num_agents, "observation_config": {"type": "kinematic_state"}, - "reset_config": {"type": "random_random"}, + "reset_config": {"type": "rl_random_random"}, } - vec_env = gym.vector.make( - "f110_gym:f110-v0", asynchronous=False, config=config, num_envs=num_envs + vec_env = gym.make_vec( + "f110_gym:f110-v0", vectorization_mode="sync", config=config, num_envs=num_envs, ) obss, infos = vec_env.reset() diff --git a/gym/f110_gym/test/test_renderer.py b/tests/test_renderer.py similarity index 95% rename from gym/f110_gym/test/test_renderer.py rename to tests/test_renderer.py index b0eda58a..80f98600 100644 --- a/gym/f110_gym/test/test_renderer.py +++ b/tests/test_renderer.py @@ -24,11 +24,7 @@ def _make_env(config={}, render_mode=None) -> F110Env: } config = deep_update(base_config, config) - env = gym.make( - "f110_gym:f110-v0", - config=config, - render_mode=render_mode, - ) + env = gym.make("f110_gym:f110-v0", config=config, render_mode=render_mode,) return env diff --git a/tests/test_scan_sim.py b/tests/test_scan_sim.py index 3dc859ed..d4d37756 100644 --- a/tests/test_scan_sim.py +++ b/tests/test_scan_sim.py @@ -50,7 +50,7 @@ def _test_map_scan(self, map_name: str, debug=False): scan_rng = np.random.default_rng(seed=12345) scan_sim = ScanSimulator2D(self.num_beams, self.fov) new_scan = np.empty((self.num_test, self.num_beams)) - scan_sim.set_map(map_name=map_name) + scan_sim.set_map(map=map_name) # scan gen loop for i in range(self.num_test): test_pose = self.test_poses[i] @@ -83,7 +83,7 @@ def test_fps(self): # scan fps should be greater than 500 scan_rng = np.random.default_rng(seed=12345) scan_sim = ScanSimulator2D(self.num_beams, self.fov) - scan_sim.set_map(map_name="Spielberg") + scan_sim.set_map(map="Spielberg") import time