Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vision + scoring state machine #143

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
11 changes: 11 additions & 0 deletions components/gripper.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ def get_current_piece(self) -> GamePiece:
return GamePiece.NONE
return self.holding

@feedback
def get_current_piece_as_int(self) -> int:
piece = self.get_current_piece()
if piece == GamePiece.NONE:
return 0
if piece == GamePiece.CONE:
return 1
if piece == GamePiece.CUBE:
return 2
return -1

@feedback
def cube_present(self) -> bool:
return self.get_full_open() and self.cube_break_beam_broken()
Expand Down
176 changes: 176 additions & 0 deletions components/score_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import numpy as np
import numpy.typing as npt
import wpilib
import magicbot
from ntcore import NetworkTableInstance
from utilities.game import GamePiece


class ScoreTracker:
CUBE_MASK = np.array(
[
[0, 1, 0, 0, 1, 0, 0, 1, 0],
[0, 1, 0, 0, 1, 0, 0, 1, 0],
[1, 1, 1, 1, 1, 1, 1, 1, 1],
],
dtype=bool,
)
CONE_MASK = np.array(
[
[1, 0, 1, 1, 0, 1, 1, 0, 1],
[1, 0, 1, 1, 0, 1, 1, 0, 1],
[1, 1, 1, 1, 1, 1, 1, 1, 1],
],
dtype=bool,
)

CONF_EXP_FILTER_ALPHA = 0.8

CONF_THRESHOLD = 0.2

def __init__(self) -> None:
# -1.0 - no piece for certain, 1.0 - a piece for certain, 0.0 - unsure
self.confidences_red: np.ndarray = np.zeros((3, 9), dtype=float)
self.confidences_blue: np.ndarray = np.zeros((3, 9), dtype=float)
self.state_red = np.zeros((3, 9), dtype=bool)
self.state_blue = np.zeros((3, 9), dtype=bool)
self.did_states_change = magicbot.will_reset_to(False)
self.inst = NetworkTableInstance.getDefault()
nt = self.inst.getTable("left_cam")
self.nodes = nt.getEntry("nodes")

def execute(self) -> None:
if not self.nodes.exists():
print("skipping")
return
_data = self.nodes.getStringArray([])
data = self.nt_data_to_node_data(_data)
for node in data:
side = (
wpilib.DriverStation.Alliance.kBlue
if node[0] >= 27
else wpilib.DriverStation.Alliance.kRed
)
col = node[0] % 9
row = (node[0] % 27) // 9
self.add_vision_data(
side=side,
pos=np.array([row, col]),
confidence=(1.0 if node[1] else -0.5),
)

def nt_data_to_node_data(self, data: list[str]) -> list[tuple[int, bool]]:
nodes: list[tuple[int, bool]] = []
for node in data:
as_array = str(node)
a = (int(f"{as_array[0]}{as_array[1]}"), as_array[2] == "1")
nodes.append(a)
return nodes

def add_vision_data(
self, side: wpilib.DriverStation.Alliance, pos: npt.ArrayLike, confidence: float
) -> None:
confidences = (
self.confidences_red if side == side.kRed else self.confidences_blue
)
confidences[pos] = confidences[
pos
] * ScoreTracker.CONF_EXP_FILTER_ALPHA + confidence * (
1.0 - ScoreTracker.CONF_EXP_FILTER_ALPHA
)
if abs(confidences[pos]) > ScoreTracker.CONF_THRESHOLD:
self.did_states_change = True
state = self.state_red if side == side.kRed else self.state_blue
state[pos] = confidence > 0.0

@staticmethod
def count_links(r: npt.NDArray[np.bool_]) -> int:
i = 0
n = 0
length = len(r)
while i < length - 2:
if r[i] and r[i + 1] and r[i + 2]:
n += 1
i += 3
continue
i += 1
return n

@staticmethod
def evaluate_state(a: npt.NDArray[np.bool_]) -> int:
return (
sum(ScoreTracker.count_links(r) for r in a) * 5
+ a[0].sum() * 5
+ a[1].sum() * 3
+ a[2].sum() * 2
)

@staticmethod
def run_lengths_mod3(state: npt.NDArray[np.bool_]) -> npt.NDArray[np.int_]:
"""
Returns an array where corresponding in shape to the input, where
every value is replaced by the length of the longest uninterrupted
run of true values containing it, modulo 3
"""
run_lengths = np.zeros_like(state, dtype=int)
for y in range(3):
x = 0
while x < 9:
if not state[y, x]:
x += 1
continue
acc = 0
for xn in range(x, 9):
if not state[y, xn]:
break
acc += 1
run_lengths[y, x : x + acc] = acc % 3
x += acc
return run_lengths

@staticmethod
def get_in_row(arr: npt.NDArray, x: int, y: int, def_val):
if x < 0 or x > 8:
return def_val
else:
return arr[y, x]

@staticmethod
def get_best_moves(
state: npt.NDArray[np.bool_],
type_to_test: GamePiece,
link_preparation_score: float = 2.5,
) -> npt.NDArray:
vals = np.zeros_like(state, dtype=float)
run_lengths = ScoreTracker.run_lengths_mod3(state)
for y in range(3):
for x in range(9):
if (
state[y, x]
or (
type_to_test == GamePiece.CUBE
and not ScoreTracker.CUBE_MASK[y, x]
)
or (
type_to_test == GamePiece.CONE
and not ScoreTracker.CONE_MASK[y, x]
)
):
continue
val = [5.0, 3.0, 2.0][y]
# Check link completion
if (
ScoreTracker.get_in_row(run_lengths, x - 1, y, 0)
+ ScoreTracker.get_in_row(run_lengths, x + 1, y, 0)
>= 2
):
val += 5.0
# Otherwise, check link preparation (state where a link can be completed after 1 move)
else:
for o in [-2, -1, 1, 2]:
if ScoreTracker.get_in_row(run_lengths, x + o, y, 0) == 1:
val += link_preparation_score
break
vals[y, x] = val
m = vals.max()
return np.argwhere(vals == m)
93 changes: 73 additions & 20 deletions controllers/score_game_piece.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@
from controllers.movement import Movement
from controllers.recover import RecoverController

from magicbot import state, timed_state, StateMachine
from magicbot import state, StateMachine, feedback, timed_state
from enum import Enum, auto
from utilities.game import Node, get_closest_node, get_score_location, Rows

from wpimath.geometry import Translation2d
from utilities.game import (
Node,
get_closest_node,
get_score_location,
Rows,
is_red,
get_closest_node_in_allowed,
)
from components.score_tracker import ScoreTracker


class NodePickStratergy(Enum):
Expand All @@ -25,11 +31,14 @@ class ScoreGamePieceController(StateMachine):

movement: Movement
recover: RecoverController

score_tracker: ScoreTracker

HARD_UP_SPEED = 0.3
ARM_PRE_TIME = 1.5

def __init__(self) -> None:
self.node_stratergy = NodePickStratergy.CLOSEST
self.node_stratergy = NodePickStratergy.BEST
self.override_node = Node(Rows.HIGH, 0)
self.prefered_row = Rows.HIGH
self.target_node = Node(Rows.HIGH, 0)
Expand Down Expand Up @@ -69,32 +78,76 @@ def open_flapper(self) -> None:
@timed_state(duration=0.5, must_finish=True)
def dropping(self) -> None:
self.gripper.open()
if self.gripper.get_full_open():
self.done()

def done(self) -> None:
super().done()
self.movement.inputs_lock = False
self.recover.engage()

def score_best(self) -> None:
self.node_stratergy = NodePickStratergy.BEST

def score_closest_high(self) -> None:
self.target_node = self._get_closest(Rows.HIGH)
self.engage()
self.node_stratergy = NodePickStratergy.CLOSEST
self.prefer_high()

def score_closest_mid(self) -> None:
self.target_node = self._get_closest(Rows.MID)
self.engage()
self.node_stratergy = NodePickStratergy.CLOSEST
self.prefer_high()

def _get_closest(self, row: Rows) -> Node:
def pick_node(self) -> Node:
cur_pos = self.movement.chassis.get_pose().translation()
cur_vel = self.movement.chassis.get_velocity()
lookahead_time = 1.0
effective_pos = cur_pos + Translation2d(
cur_vel.vx * lookahead_time, cur_vel.vy * lookahead_time
)
return get_closest_node(effective_pos, self.gripper.get_current_piece(), row)
if self.node_stratergy is NodePickStratergy.CLOSEST:
return get_closest_node(
cur_pos, self.gripper.get_current_piece(), self.prefered_row, set()
)
elif self.node_stratergy is NodePickStratergy.OVERRIDE:
return self.override_node
elif self.node_stratergy is NodePickStratergy.BEST:
state = (
self.score_tracker.state_blue
if is_red()
else self.score_tracker.state_red
)
best = self.score_tracker.get_best_moves(state, self.gripper.holding)
nodes: list[Node] = []
for i in range(len(best)):
as_tuple = tuple(best[i])
node = Node(Rows(int(as_tuple[0])), as_tuple[1])
nodes.append(node)

return get_closest_node_in_allowed(
cur_pos, self.gripper.get_current_piece(), nodes
)

@feedback
def state_red(self) -> list[bool]:
state: list[bool] = []
for i in self.score_tracker.state_blue.tolist():
for j in i:
state.append(j)
return state

@feedback
def state_blue(self) -> list[bool]:
state: list[bool] = []
for i in self.score_tracker.state_blue.tolist():
for j in i:
state.append(j)
return state

@feedback
def pick_node_as_int(self) -> int:
# node = self.pick_node()
node = Node(Rows.HIGH, 0)
return (node.row.value - 1) * 3 + node.col

def prefer_high(self) -> None:
self.prefered_row = Rows.HIGH

def score_best(self) -> None:
# placeholder
self.score_closest_high()
def prefer_mid(self) -> None:
self.prefered_row = Rows.MID

def score_without_moving(self, node: Node) -> None:
self.target_node = node
Expand Down
3 changes: 3 additions & 0 deletions robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from components.arm import Arm
from components.gripper import Gripper
from components.leds import StatusLights, DisplayType, LedColors
from components.score_tracker import ScoreTracker
from utilities.scalers import rescale_js
from utilities.game import is_red

Expand All @@ -40,6 +41,7 @@ class MyRobot(magicbot.MagicRobot):
intake: Intake
status_lights: StatusLights
gripper: Gripper
score_tracker: ScoreTracker

port_localizer: VisualLocalizer
starboard_localizer: VisualLocalizer
Expand Down Expand Up @@ -258,6 +260,7 @@ def testPeriodic(self) -> None:
self.port_localizer.execute()
self.starboard_localizer.execute()
self.status_lights.execute()
self.score_tracker.execute()

def cancel_controllers(self):
self.acquire_cone.done()
Expand Down
18 changes: 17 additions & 1 deletion utilities/game.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ def get_valid_piece(self) -> GamePiece:
else:
return GamePiece.CONE

def get_id(self) -> int:
return (self.row.value - 1) * 3 + self.col


def get_node_location(node: Node) -> Translation3d:
if is_red():
Expand All @@ -123,8 +126,12 @@ def get_score_location(node: Node) -> tuple[Pose2d, Rotation2d]:
return goal, approach


def get_closest_node(pos: Translation2d, piece: GamePiece, row: Rows) -> Node:
def get_closest_node(
pos: Translation2d, piece: GamePiece, row: Rows, impossible: set[int]
) -> Node:
def get_node_dist(node: Node) -> float:
if node.get_id() in impossible:
return 999999
return get_score_location(node)[0].translation().distance(pos)

if piece == GamePiece.CONE:
Expand All @@ -137,6 +144,15 @@ def get_node_dist(node: Node) -> float:
return min(nodes, key=get_node_dist)


def get_closest_node_in_allowed(
pos: Translation2d, piece: GamePiece, allowed: list[Node]
) -> Node:
def get_node_dist(node: Node) -> float:
return get_score_location(node)[0].translation().distance(pos)

return min(allowed, key=get_node_dist)


# tag in blue loading bay, on red side of field 16=x
tag_4 = apriltag_layout.getTagPose(4)
assert tag_4 is not None
Expand Down