Skip to content

Commit

Permalink
1. 去除用户的PositionUpdate访问权限, FuncAgent直接通过set_position_x系列接口更新位置信息. 2.…
Browse files Browse the repository at this point in the history
… init_position_aoi接口标记为deprecated, 用set_position_aoi取代. 3. 新增set_position_lane接口. 4. 新增set_position_poi接口
  • Loading branch information
PinkGranite committed Apr 17, 2024
1 parent 8f47848 commit 0717c7e
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 85 deletions.
2 changes: 1 addition & 1 deletion pycityagent/ac/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
from .hub_actions import *
from .sim_actions import *

__all__ = [ActionController, Action, HubAction, SimAction, SendUserMessage, SendStreetview, SendPop, PositionUpdate, ShowPath, ShowPosition, SetSchedule, SendAgentMessage]
__all__ = [ActionController, Action, HubAction, SimAction, SendUserMessage, SendStreetview, SendPop, ShowPath, ShowPosition, SetSchedule, SendAgentMessage]
87 changes: 81 additions & 6 deletions pycityagent/agent_func.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""FuncAgent: 功能性智能体及其定义"""

from typing import Union
from pycityagent.urbanllm import UrbanLLM
from .urbanllm import UrbanLLM
from .agent import Agent, AgentType
from .image.image import Image
from .ac.hub_actions import PositionUpdate
from .utils import *

class FuncAgent(Agent):
"""
Expand Down Expand Up @@ -63,23 +65,96 @@ def __init__(
- x (double)
- y (double)
- z (double)
- direction (double): 方向角
- direction (double): 朝向-方向角
"""

async def init_position_aoi(self, aoi_id:int):
self._posUpdate = PositionUpdate(self)

async def set_position_aoi(self, aoi_id:int):
"""
- 将agent的位置初始化到指定aoi
- 根据指定aoi设置aoi_position, longlat_position以及xy_position
- 将agent的位置设定到指定aoi
Args:
- aoi_id (int): AOI id
"""
if aoi_id in self._simulator.map.aois:
aoi = self._simulator.map.aois[aoi_id]
self.motion['position'] = {}
self.motion['position']['aoi_position'] = {'aoi_id': aoi_id}
self.motion['position']['longlat_position'] = {'longitude': aoi['shapely_lnglat'].centroid.coords[0][0], 'latitude': aoi['shapely_lnglat'].centroid.coords[0][1]}
x, y = self._simulator.map.lnglat2xy(lng=self.motion['position']['longlat_position']['longitude'],
lat=self.motion['position']['longlat_position']['latitude'])
self.motion['position']['xy_position'] = {'x': x, 'y': y}
pos = PositionUpdate(self)
await pos.Forward(longlat=[self.motion['position']['longlat_position']['longitude'], self.motion['position']['longlat_position']['latitude']])
await self._posUpdate.Forward(longlat=[self.motion['position']['longlat_position']['longitude'], self.motion['position']['longlat_position']['latitude']])
else:
print("Error: wrong aoi id")

async def set_position_lane(self, lane_id:int, s:float=0.0, direction:Union[float, str]='front'):
"""
- 将agent的位置设定到指定lane
Args:
- lane_id (int): Lane id
- s (float): distance from the start point of the lane, default None, if None, then set to the start point
- direction (Union[float, str]): agent方向角, 默认值为'front'
- float: 直接设置为给定方向角(atan2计算得到)
- str: 可选项['front', 'back'], 指定agent的行走方向
- 对于driving lane而言, 只能朝一个方向通行, 只能是'front'
- 对于walking lane而言, 可以朝两个方向通行, 可以是'front'或'back'
"""
if lane_id in self._simulator.map.lanes:
lane = self._simulator.map.lanes[lane_id]
if s > lane['length']:
print("Error: 's' too large")
self.motion['position'] = {}
self.motion['position']['lane_position'] = {'lane_id': lane_id, 's': s}
nodes = lane['center_line']['nodes']
x, y = get_xy_in_lane(nodes, s)
longlat = self._simulator.map.xy2lnglat(x=x, y=y)
self.motion['position']['longlat_position'] = {
'longitude': longlat[0],
'latitude': longlat[1]
}
self.motion['position']['xy_position'] = {
'x': x,
'y': y
}
if isinstance(direction, float):
self.motion['direction'] = direction
else:
# 计算方向角
direction_ = get_direction_by_s(nodes, s, direction)
self.motion['direction'] = direction_
await self._posUpdate.Forward(longlat=[self.motion['position']['longlat_position']['longitude'], self.motion['position']['longlat_position']['latitude']])
else:
print("Error: wrong lane id")

async def set_position_poi(self, poi_id:int):
"""
- 将agent的位置设定到指定poi
Args:
- poi_id (int): Poi id
"""
if poi_id in self._simulator.map.pois:
poi = self._simulator.map.pois[poi_id]
x = poi['position']['x']
y = poi['position']['y']
longlat = self._simulator.map.xy2lnglat(x=x, y=y)
aoi_id = poi['aoi_id']
self.motion['position'] = {}
self.motion['position']['aoi_position'] = {'aoi_id': aoi_id}
self.motion['position']['longlat_position'] = {
'longitude': longlat[0],
'latitude': longlat[1]
}
self.motion['position']['xy_position'] = {
'x': x,
'y': y
}
await self._posUpdate.Forward(longlat=[self.motion['position']['longlat_position']['longitude'], self.motion['position']['longlat_position']['latitude']])
else:
print("Error: wrong poi id")

def Bind(self):
"""
Expand Down
78 changes: 1 addition & 77 deletions pycityagent/brain/sence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,83 +13,7 @@
)
from .brainfc import BrainFunction
from .static import POI_TYPE_DICT, LEVEL_ONE_PRE

def point_on_line_given_distance(start_node, end_node, distance):
"""
Given two points (start_point and end_point) defining a line, and a distance s to travel along the line,
return the coordinates of the point reached after traveling s units along the line, starting from start_point.
Args:
start_point (tuple): Tuple of (x, y) representing the starting point on the line.
end_point (tuple): Tuple of (x, y) representing the ending point on the line.
distance (float): Distance to travel along the line, starting from start_point.
Returns:
tuple: Tuple of (x, y) representing the new point reached after traveling s units along the line.
"""

x1, y1 = start_node['x'], start_node['y']
x2, y2 = end_node['x'], end_node['y']

# Calculate the slope m and the y-intercept b of the line
if x1 == x2:
# Vertical line, distance is only along the y-axis
return (x1, y1 + distance if distance >= 0 else y1 - abs(distance))
else:
m = (y2 - y1) / (x2 - x1)
b = y1 - m * x1

# Calculate the direction vector (dx, dy) along the line
dx = (x2 - x1) / math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
dy = (y2 - y1) / math.sqrt((x2 - x1)**2 + (y2 - y1)**2)

# Scale the direction vector by the given distance
scaled_dx = dx * distance
scaled_dy = dy * distance

# Calculate the new point's coordinates
x = x1 + scaled_dx
y = y1 + scaled_dy

return [x, y]

def get_xy_in_lane(nodes, distance, direction:str='front'):
temp_sum = 0
remain_s = 0
if direction == 'front':
# 顺道路方向前进
if distance == 0:
return [nodes[0]['x'], nodes[0]['y']]
key_index = 0 # first node
for i in range(1, len(nodes)):
x1, y1 = nodes[i-1]['x'], nodes[i-1]['y']
x2, y2 = nodes[i]['x'], nodes[i]['y']
temp_sum += math.sqrt((x2 - x1)**2 + (y2-y1)**2)
if temp_sum > distance:
remain_s = distance - (temp_sum - math.sqrt((x2 - x1)**2 + (y2-y1)**2))
break;
key_index += 1
if remain_s < 0.5:
return [nodes[key_index]['x'], nodes[key_index]['y']]
longlat = point_on_line_given_distance(nodes[key_index], nodes[key_index+1], remain_s)
return longlat
else:
# 逆道路方向前进
if distance == 0:
return [nodes[-1]['x'], nodes[-1]['y']]
key_index = len(nodes)-1 # last node
for i in range(len(nodes)-1, 0, -1):
x1, y1 = nodes[i]['x'], nodes[i]['y']
x2, y2 = nodes[i-1]['x'], nodes[i-1]['y']
temp_sum += math.sqrt((x2 - x1)**2 + (y2-y1)**2)
if temp_sum > distance:
remain_s = distance - (temp_sum - math.sqrt((x2 - x1)**2 + (y2-y1)**2))
break;
key_index -= 1
if remain_s < 0.5:
return [nodes[key_index]['x'], nodes[key_index]['y']]
longlat = point_on_line_given_distance(nodes[key_index], nodes[key_index-1], remain_s)
return longlat
from ..utils import point_on_line_given_distance, get_xy_in_lane

class SencePlug:
"""
Expand Down
2 changes: 1 addition & 1 deletion pycityagent/urbanllm/urbanllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def text_request(self, dialog:list[dict], temperature:float=1, max_tokens:int=No
if response.status_code == HTTPStatus.OK:
return response.output.choices[0]['message']['content']
else:
return "Error: {}".format(response.status_code)
return "Error: {}, {}".format(response.status_code, response.message)
else:
print("ERROR: Wrong Config")
return "wrong config"
Expand Down
148 changes: 148 additions & 0 deletions pycityagent/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import math

def get_angle(x, y):
return math.atan2(y, x)*180/math.pi

def point_on_line_given_distance(start_node, end_node, distance):
"""
Given two points (start_point and end_point) defining a line, and a distance s to travel along the line,
return the coordinates of the point reached after traveling s units along the line, starting from start_point.
Args:
start_point (tuple): Tuple of (x, y) representing the starting point on the line.
end_point (tuple): Tuple of (x, y) representing the ending point on the line.
distance (float): Distance to travel along the line, starting from start_point.
Returns:
tuple: Tuple of (x, y) representing the new point reached after traveling s units along the line.
"""

x1, y1 = start_node['x'], start_node['y']
x2, y2 = end_node['x'], end_node['y']

# Calculate the slope m and the y-intercept b of the line
if x1 == x2:
# Vertical line, distance is only along the y-axis
return (x1, y1 + distance if distance >= 0 else y1 - abs(distance))
else:
m = (y2 - y1) / (x2 - x1)
b = y1 - m * x1

# Calculate the direction vector (dx, dy) along the line
dx = (x2 - x1) / math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
dy = (y2 - y1) / math.sqrt((x2 - x1)**2 + (y2 - y1)**2)

# Scale the direction vector by the given distance
scaled_dx = dx * distance
scaled_dy = dy * distance

# Calculate the new point's coordinates
x = x1 + scaled_dx
y = y1 + scaled_dy

return [x, y]

def get_xy_in_lane(nodes, distance, direction:str='front'):
temp_sum = 0
remain_s = 0
if direction == 'front':
# 顺道路方向前进
if distance == 0:
return [nodes[0]['x'], nodes[0]['y']]
key_index = 0 # first node
for i in range(1, len(nodes)):
x1, y1 = nodes[i-1]['x'], nodes[i-1]['y']
x2, y2 = nodes[i]['x'], nodes[i]['y']
temp_sum += math.sqrt((x2 - x1)**2 + (y2-y1)**2)
if temp_sum > distance:
remain_s = distance - (temp_sum - math.sqrt((x2 - x1)**2 + (y2-y1)**2))
break;
key_index += 1
if remain_s < 0.5:
return [nodes[key_index]['x'], nodes[key_index]['y']]
longlat = point_on_line_given_distance(nodes[key_index], nodes[key_index+1], remain_s)
return longlat
else:
# 逆道路方向前进
if distance == 0:
return [nodes[-1]['x'], nodes[-1]['y']]
key_index = len(nodes)-1 # last node
for i in range(len(nodes)-1, 0, -1):
x1, y1 = nodes[i]['x'], nodes[i]['y']
x2, y2 = nodes[i-1]['x'], nodes[i-1]['y']
temp_sum += math.sqrt((x2 - x1)**2 + (y2-y1)**2)
if temp_sum > distance:
remain_s = distance - (temp_sum - math.sqrt((x2 - x1)**2 + (y2-y1)**2))
break;
key_index -= 1
if remain_s < 0.5:
return [nodes[key_index]['x'], nodes[key_index]['y']]
longlat = point_on_line_given_distance(nodes[key_index], nodes[key_index-1], remain_s)
return longlat

def get_direction_by_s(nodes, distance, direction:str='front'):
temp_sum = 0
if direction == 'front':
# 顺道路方向前进
if distance == 0:
return [nodes[0]['x'], nodes[0]['y']]
key_index = 0 # first node
for i in range(1, len(nodes)):
x1, y1 = nodes[i-1]['x'], nodes[i-1]['y']
x2, y2 = nodes[i]['x'], nodes[i]['y']
temp_sum += math.sqrt((x2 - x1)**2 + (y2-y1)**2)
if temp_sum > distance:
break;
key_index += 1
if key_index == len(nodes)-1:
# 端点
x = nodes[key_index]['x']-nodes[key_index-1]['x']
y = nodes[key_index]['y']-nodes[key_index-1]['y']
return get_angle(x, y)
else:
# 中间点
x = nodes[key_index+1]['x'] - nodes[key_index]['x']
y = nodes[key_index+1]['y'] - nodes[key_index]['y']
return get_angle(x, y)
elif direction == 'back':
# 逆道路方向前进
if distance == 0:
return [nodes[-1]['x'], nodes[-1]['y']]
key_index = len(nodes)-1 # last node
for i in range(len(nodes)-1, 0, -1):
x1, y1 = nodes[i]['x'], nodes[i]['y']
x2, y2 = nodes[i-1]['x'], nodes[i-1]['y']
temp_sum += math.sqrt((x2 - x1)**2 + (y2-y1)**2)
if temp_sum > distance:
break;
key_index -= 1
if key_index == 0:
x = nodes[key_index]['x'] - nodes[key_index+1]['x']
y = nodes[key_index]['y'] - nodes[key_index+1]['y']
return get_angle(x, y)
else:
x = nodes[key_index-1]['x'] - nodes[key_index]['x']
y = nodes[key_index-1]['y'] - nodes[key_index]['y']
return get_angle(x, y)
else:
print("Warning: wroing direction, 'front' instead")
if distance == 0:
return [nodes[0]['x'], nodes[0]['y']]
key_index = 0 # first node
for i in range(1, len(nodes)):
x1, y1 = nodes[i-1]['x'], nodes[i-1]['y']
x2, y2 = nodes[i]['x'], nodes[i]['y']
temp_sum += math.sqrt((x2 - x1)**2 + (y2-y1)**2)
if temp_sum > distance:
break;
key_index += 1
if key_index == len(nodes)-1:
# 端点
x = nodes[key_index]['x']-nodes[key_index-1]['x']
y = nodes[key_index]['y']-nodes[key_index-1]['y']
return get_angle(x, y)
else:
# 中间点
x = nodes[key_index+1]['x'] - nodes[key_index]['x']
y = nodes[key_index+1]['y'] - nodes[key_index]['y']
return get_angle(x, y)

0 comments on commit 0717c7e

Please sign in to comment.