Skip to content

Commit

Permalink
Merge pull request #102 from f1tenth/101-decoupled-action-spaces
Browse files Browse the repository at this point in the history
101 decoupled action spaces
  • Loading branch information
hzheng40 authored Dec 27, 2023
2 parents a428945 + dd744b9 commit 0f3b630
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 61 deletions.
2 changes: 1 addition & 1 deletion examples/video_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def main():
"num_agents": 1,
"timestep": 0.01,
"integrator": "rk4",
"control_input": "speed",
"control_input": ["speed", "steering_angle"],
"model": "st",
"observation_config": {"type": "kinematic_state"},
"params": {"mu": 1.0},
Expand Down
2 changes: 1 addition & 1 deletion examples/waypoint_follow.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def main():
"num_agents": num_agents,
"timestep": 0.01,
"integrator": "rk4",
"control_input": "speed",
"control_input": ["speed", "steering_angle"],
"model": "st",
"observation_config": {"type": "kinematic_state"},
"params": {"mu": 1.0},
Expand Down
177 changes: 138 additions & 39 deletions gym/f110_gym/envs/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
from enum import Enum
from typing import Any, Dict, Tuple

import warnings
import gymnasium as gym
import numpy as np
from f110_gym.envs.dynamic_models import pid
from f110_gym.envs.dynamic_models import pid_steer, pid_accl


class CarActionEnum(Enum):
class LongitudinalActionEnum(Enum):
Accl = 1
Speed = 2

Expand All @@ -19,73 +20,171 @@ def from_string(action: str):
return SpeedAction
else:
raise ValueError(f"Unknown action type {action}")


class CarAction:

class LongitudinalAction:
def __init__(self) -> None:
self._action_type = None
self._type = None

self.lower_limit = None
self.upper_limit = None

@abstractmethod
def act(self, action: Any, **kwargs) -> Tuple[float, float]:
raise NotImplementedError("act method not implemented")
def act(self, longitudinal_action: Any, **kwargs) -> float:
raise NotImplementedError("longitudinal act method not implemented")

@property
def type(self) -> str:
return self._action_type
return self._type

@property
def space(self) -> gym.Space:
return NotImplementedError(
f"space method not implemented for action type {self.type}"
)
return gym.spaces.Box(low=self.lower_limit, high=self.upper_limit, dtype=np.float32)

class AcclAction(LongitudinalAction):
def __init__(self, params: Dict) -> None:
super().__init__()
self._type = "accl"
self.lower_limit, self.upper_limit = -params["a_max"], params["a_max"]

def act(self, action: Tuple[float, float], state, params) -> float:
return action

class AcclAction(CarAction):
class SpeedAction(LongitudinalAction):
def __init__(self, params: Dict) -> None:
super().__init__()
self._action_type = "accl"
self._type = "speed"
self.lower_limit, self.upper_limit = params["v_min"], params["v_max"]

self.steering_low, self.steering_high = params["sv_min"], params["sv_max"]
self.acc_low, self.acc_high = -params["a_max"], params["a_max"]
def act(
self, action: Tuple[float, float], state: np.ndarray, params: Dict
) -> float:
accl = pid_accl(
action,
state[3],
params["a_max"],
params["v_max"],
params["v_min"],
)

def act(self, action: Tuple[float, float], state, params) -> Tuple[float, float]:
return action
return accl

@property
def space(self) -> gym.Space:
low = np.array([self.steering_low, self.acc_low]).astype(np.float32)
high = np.array([self.steering_high, self.acc_high]).astype(np.float32)
class SteerAction:
def __init__(self) -> None:
self._type = None

return gym.spaces.Box(low=low, high=high, shape=(2,), dtype=np.float32)
self.lower_limit = None
self.upper_limit = None

@abstractmethod
def act(self, steer_action: Any, **kwargs) -> float:
raise NotImplementedError("steer act method not implemented")

class SpeedAction(CarAction):
@property
def type(self) -> str:
return self._type

@property
def space(self) -> gym.Space:
return gym.spaces.Box(low=self.lower_limit, high=self.upper_limit, dtype=np.float32)

class SteeringAngleAction(SteerAction):
def __init__(self, params: Dict) -> None:
super().__init__()
self._action_type = "speed"

self.steering_low, self.steering_high = params["s_min"], params["s_max"]
self.velocity_low, self.velocity_high = params["v_min"], params["v_max"]
self._type = "steering_angle"
self.lower_limit, self.upper_limit = params["s_min"], params["s_max"]

def act(
self, action: Tuple[float, float], state: np.ndarray, params: Dict
) -> Tuple[float, float]:
accl, sv = pid(
action[0],
action[1],
state[3],
) -> float:
sv = pid_steer(
action,
state[2],
params["sv_max"],
params["a_max"],
params["v_max"],
params["v_min"],
)
return accl, sv
return sv

class SteeringSpeedAction(SteerAction):
def __init__(self, params: Dict) -> None:
super().__init__()
self._type = "steering_speed"
self.lower_limit, self.upper_limit = params["sv_min"], params["sv_max"]

def act(
self, action: Tuple[float, float], state: np.ndarray, params: Dict
) -> float:
return action

class SteerActionEnum(Enum):
Steering_Angle = 1
Steering_Speed = 2

@staticmethod
def from_string(action: str):
if action == "steering_angle":
return SteeringAngleAction
elif action == "steering_speed":
return SteeringSpeedAction
else:
raise ValueError(f"Unknown action type {action}")

class CarAction:
def __init__(self, control_mode : list[str, str], params: Dict) -> None:
long_act_type_fn = None
steer_act_type_fn = None
if type(control_mode) == str: # only one control mode specified
try:
long_act_type_fn = LongitudinalActionEnum.from_string(control_mode)
except ValueError:
try:
steer_act_type_fn = SteerActionEnum.from_string(control_mode)
except ValueError:
raise ValueError(f"Unknown control mode {control_mode}")
if control_mode == "steering_speed":
warnings.warn(
f'Only one control mode specified, using {control_mode} for steering and defaulting to acceleration for longitudinal control'
)
long_act_type_fn = LongitudinalActionEnum.from_string("accl")
else:
warnings.warn(
f'Only one control mode specified, using {control_mode} for steering and defaulting to speed for longitudinal control'
)
long_act_type_fn = LongitudinalActionEnum.from_string("speed")

else:
if control_mode == "accl":
warnings.warn(
f'Only one control mode specified, using {control_mode} for longitudinal control and defaulting to steering speed for steering'
)
steer_act_type_fn = SteerActionEnum.from_string("steering_speed")
else:
warnings.warn(
f'Only one control mode specified, using {control_mode} for longitudinal control and defaulting to steering angle for steering'
)
steer_act_type_fn = SteerActionEnum.from_string("steering_angle")

elif type(control_mode) == list:
long_act_type_fn = LongitudinalActionEnum.from_string(control_mode[0])
steer_act_type_fn = SteerActionEnum.from_string(control_mode[1])
else:
raise ValueError(f"Unknown control mode {control_mode}")

self._longitudinal_action : LongitudinalAction = long_act_type_fn(params)
self._steer_action : SteerAction = steer_act_type_fn(params)

@abstractmethod
def act(self, action: Any, **kwargs) -> Tuple[float, float]:
longitudinal_action = self._longitudinal_action.act(action[0], **kwargs)
steer_action = self._steer_action.act(action[1], **kwargs)
return longitudinal_action, steer_action

@property
def type(self) -> Tuple[str, str]:
return (self._steer_action.type, self._longitudinal_action.type)

@property
def space(self) -> gym.Space:
low = np.array([self.steering_low, self.velocity_low]).astype(np.float32)
high = np.array([self.steering_high, self.velocity_high]).astype(np.float32)
low = np.array([self._steer_action.lower_limit, self._longitudinal_action.lower_limit]).astype(np.float32)
high = np.array([self._steer_action.upper_limit, self._longitudinal_action.upper_limit]).astype(np.float32)

return gym.spaces.Box(low=low, high=high, shape=(2,), dtype=np.float32)

Expand Down
23 changes: 14 additions & 9 deletions gym/f110_gym/envs/dynamic_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,19 @@ def vehicle_dynamics_st(


@njit(cache=True)
def pid(speed, steer, current_speed, current_steer, max_sv, max_a, max_v, min_v):
def pid_steer(steer, current_steer, max_sv):
# steering
steer_diff = steer - current_steer
if np.fabs(steer_diff) > 1e-4:
sv = (steer_diff / np.fabs(steer_diff)) * max_sv
else:
sv = 0.0

return sv


@njit(cache=True)
def pid_accl(speed, current_speed, max_a, max_v, min_v):
"""
Basic controller for speed/steer -> accl./steer vel.
Expand All @@ -413,13 +425,6 @@ def pid(speed, steer, current_speed, current_steer, max_sv, max_a, max_v, min_v)
accl (float): desired input acceleration
sv (float): desired input steering velocity
"""
# steering
steer_diff = steer - current_steer
if np.fabs(steer_diff) > 1e-4:
sv = (steer_diff / np.fabs(steer_diff)) * max_sv
else:
sv = 0.0

# accl
vel_diff = speed - current_speed
# currently forward
Expand All @@ -443,7 +448,7 @@ def pid(speed, steer, current_speed, current_steer, max_sv, max_a, max_v, min_v)
kp = 2.0 * max_a / (-min_v)
accl = kp * vel_diff

return accl, sv
return accl


def func_KS(
Expand Down
11 changes: 5 additions & 6 deletions gym/f110_gym/envs/f110_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
# gym imports
import gymnasium as gym

from f110_gym.envs.action import CarActionEnum, from_single_to_multi_action_space
from f110_gym.envs.action import (CarAction,
from_single_to_multi_action_space)
from f110_gym.envs.integrator import IntegratorType
from f110_gym.envs.rendering import make_renderer

Expand Down Expand Up @@ -102,8 +103,7 @@ def __init__(self, config: dict = None, render_mode=None, **kwargs):
self.integrator = IntegratorType.from_string(self.config["integrator"])
self.model = DynamicModel.from_string(self.config["model"])
self.observation_config = self.config["observation_config"]
action_type_fn = CarActionEnum.from_string(self.config["control_input"])
self.action_type = action_type_fn(params=self.params)
self.action_type = CarAction(self.config["control_input"], params=self.params)

# radius to consider done
self.start_thresh = 0.5 # 10cm
Expand Down Expand Up @@ -225,7 +225,7 @@ def default_config(cls) -> dict:
"ego_idx": 0,
"integrator": "rk4",
"model": "st",
"control_input": "speed",
"control_input": ["speed", "steering_angle"],
"observation_config": {"type": "original"},
"reset_config": {"type": "grid_static"},
}
Expand All @@ -240,8 +240,7 @@ def configure(self, config: dict) -> None:

if hasattr(self, "action_space"):
# if some parameters changed, recompute action space
action_type_fn = CarActionEnum.from_string(self.config["control_input"])
self.action_type = action_type_fn(params=self.params)
self.action_type = CarAction(self.config["control_input"], params=self.params)
self.action_space = from_single_to_multi_action_space(
self.action_type.space, self.num_agents
)
Expand Down
2 changes: 1 addition & 1 deletion gym/f110_gym/test/benchmark_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _make_env(config={}, render_mode=None) -> F110Env:
"num_agents": 1,
"timestep": 0.01,
"integrator": "rk4",
"control_input": "speed",
"control_input": ["speed", "steering_angle"],
"model": "st",
"observation_config": {"type": "kinematic_state"},
"params": {"mu": 1.0},
Expand Down
2 changes: 1 addition & 1 deletion gym/f110_gym/test/test_renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def _make_env(config={}, render_mode=None) -> F110Env:
"num_agents": 1,
"timestep": 0.01,
"integrator": "rk4",
"control_input": "speed",
"control_input": ["speed", "steering_angle"],
"model": "st",
"observation_config": {"type": "kinematic_state"},
"params": {"mu": 1.0},
Expand Down
4 changes: 2 additions & 2 deletions tests/test_f110_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def _make_env(self, config={}):
"num_agents": 1,
"timestep": 0.01,
"integrator": "rk4",
"control_input": "speed",
"control_input": ["speed", "steering_angle"],
"params": {"mu": 1.0},
}
conf = deep_update(conf, config)
Expand Down Expand Up @@ -125,7 +125,7 @@ def test_acceleration_action_space(self):
"""
Test that the acceleration action space is correctly configured.
"""
base_env = self._make_env(config={"control_input": "accl"})
base_env = self._make_env(config={"control_input": ["accl", "steering_speed"]})
params = base_env.sim.params
action_space_low = base_env.action_space.low
action_space_high = base_env.action_space.high
Expand Down
2 changes: 1 addition & 1 deletion tests/test_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def _make_env(config={}) -> F110Env:
"num_agents": 1,
"timestep": 0.01,
"integrator": "rk4",
"control_input": "speed",
"control_input": ["speed", "steering_angle"],
"params": {"mu": 1.0},
}
conf = deep_update(conf, config)
Expand Down

0 comments on commit 0f3b630

Please sign in to comment.