-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimulator_ori.py
149 lines (117 loc) · 4.77 KB
/
simulator_ori.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from __future__ import annotations
"""
Simulator built based on SAPIEN3.
"""
from typing import cast, Union, TYPE_CHECKING, Optional
import logging
import numpy as np
import sapien.core as sapien
import torch
from .entity import Entity, Composite
from .simulator_base import SimulatorBase, FrameLike, SimulatorConfig
if TYPE_CHECKING:
from .robot.robot_base import Robot
from .ros_plugins.module import ROSModule
class Simulator(SimulatorBase):
_loaded_eneity: set["Entity"]
robot: Optional["Robot"]
elements: "Composite"
def __init__(
self,
config: SimulatorConfig,
robot: Optional["Robot"],
elements: dict[str, Union[Entity, dict]],
add_ground: bool = True,
ros_module: Optional["ROSModule"] = None,
):
self.robot = robot
self._setup_elements(elements)
self.add_ground = add_ground and self._if_add_ground(elements)
super().__init__(config)
control_freq = self._control_freq = robot.control_freq if robot is not None else config.control_freq_without_robot
if self._sim_freq % control_freq != 0:
logging.warning(
f"sim_freq({self._sim_freq}) is not divisible by control_freq({control_freq}).",
)
self._sim_steps_per_control = self._sim_freq // control_freq
self.modules: list['ROSModule'] = []
if ros_module is not None:
self.modules.append(ros_module)
for m in self.modules:
m.set_sim(self)
@property
def control_freq(self):
return self._control_freq
@property
def control_timestep(self):
return 1.0 / self._control_freq
@property
def dt(self):
return self.control_timestep
def _if_add_ground(self, elements: dict[str, Union[Entity, dict]]):
for v in elements.values():
if hasattr(v, 'has_ground') and getattr(v, 'has_ground'):
return False
return True
def _setup_elements(self, elements: dict[str, Union[Entity, dict]]):
robot = self.robot
if robot is not None:
self.robot_cameras = robot.get_sensors()
elements = dict(robot=robot, **self.robot_cameras, **elements)
self.elements = Composite('', **elements)
def find(self, uid=''):
return self.elements.find(uid)
def is_loaded(self, entity: "Entity"):
return entity in self._loaded_eneity
def load_entity(self, entity: "Entity"):
if self.is_loaded(entity):
return
entity._load(self)
self._loaded_eneity.add(entity)
# load ground ..
def _load(self):
if self.add_ground:
self._add_ground(render=True)
self._loaded_eneity = set()
self._elem_cache = {}
self.elements.load(self)
# TODO: maybe we can just use self._scene.get_actors() to get all the actors. However, I don't know if the order will be the same.
self.actor_batches.append(self.elements.get_actors())
self.articulation_batches.append(self.elements.get_articulations())
self.camera_batches.append(self.elements.get_sapien_obj_type(sapien.render.RenderCameraComponent))
for m in self.modules:
m.load()
def step(self, action: Union[None, np.ndarray, dict], print_contacts_for_debug: bool=False):
self._viewer_has_scene_updated = False
if action is None: # simulation without action
pass
elif isinstance(action, np.ndarray) or isinstance(action, torch.Tensor):
assert self.robot is not None
self.robot.set_action(action)
else:
raise TypeError(type(action))
#TODO: ROS_MODULE.before_control_step
for m in self.modules:
m.before_control_step()
for _ in range(self._sim_steps_per_control):
if self.robot is not None:
self.robot.before_simulation_step()
self._engine.step_scenes(self._scene_list)
if print_contacts_for_debug:
print(self._scene.get_contacts())
for m in self.modules:
m.after_control_step()
# ---------------------------------------------------------------------------- #
# Advanced: utilities for ROS2 and motion planning. I am not sure if we should
# put them here.
# ---------------------------------------------------------------------------- #
def gen_scene_pcd(self, num_points: int = int(1e5), exclude=()):
"""Generate scene point cloud for motion planning, excluding the robot"""
pcds = []
sim = self
for k, v in sim.elements.items():
if k != 'robot':
out = v.get_pcds(num_points, exclude)
if out is not None:
pcds.append(out)
return np.concatenate(pcds)