Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions GEMstack/onboard/planning/creep_planning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
# File: GEMstack/onboard/planning/creep_planning.py
from typing import List, Tuple
import math
from ..component import Component
from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, \
ObjectFrameEnum
from ...utils import serialization
from ...mathutils import transforms
import numpy as np
from .parking_motion_planning import longitudinal_plan, longitudinal_brake


DEBUG = False # Set to False to disable debug output


'''
# OTHER CREEP CONTROL CODE

def inverse_speed_function(distance_to_object):

# max_creep_speed = 1.0 # Maximum speed when creeping (m/s)
# min_creep_speed = 0.2 # Minimum speed to maintain (m/s)
# safe_distance = 5.0 # Distance at which to start slowing down (m)

# # Almost stop when very close (≤0.25m)
# if distance_to_object <= 0.25:
# return 0.0 # Practically zero speed

# # Slow creep when close (≤1.5m)
# if distance_to_object <= 3.0:
# return min_creep_speed
# speed = min_creep_speed + (max_creep_speed - min_creep_speed) * (distance_to_object / safe_distance)
# return min(speed, max_creep_speed)

def pid_speed_control(distance_to_object, target_distance, current_speed, prev_error, integral, dt,
kp=0.5, ki=0.1, kd=0.05, min_speed=0.2, max_speed=1.0):
if distance_to_object <= 0.25:
return 0.0, 0.0, 0.0
error = distance_to_object - target_distance
integral += error * dt
integral = max(-5.0, min(integral, 5.0)) # Prevent integral windup
derivative = (error - prev_error) / dt if dt > 0 else 0
p_term = kp * error
i_term = ki * integral
d_term = kd * derivative
speed_adjustment = p_term + i_term + d_term
base_speed = 0.5 # Base creep speed
target_speed = base_speed + speed_adjustment
target_speed = max(min_speed, min(target_speed, max_speed))
return target_speed, error, integral


'''



class CreepTrajectoryPlanner(Component):
"""Follows the given route. Brakes if the ego–vehicle must yield
(e.g. to a pedestrian) or if the end of the route is near; otherwise,
it accelerates (or cruises) toward a desired speed.
"""

def __init__(self):
self.route_progress = None
self.t_last = None
self.acceleration = 5
self.desired_speed = 2.0
self.deceleration = 2.0
self.emergency_brake = 8.0
# Parameters for end-of-route linear deceleration
self.end_stop_distance = 12.5 # Distance in meters to start linear deceleration
# Did 15, 10, 7.5, 17.5, 20, 12.5
def state_inputs(self):
return ['all']

def state_outputs(self) -> List[str]:
return ['trajectory']

def rate(self):
return 10.0

def check_end_of_route(self, route, current_parameter):
"""Check if vehicle is approaching the end of the route and calculate
appropriate linear deceleration parameters.

Args:
route: The complete route
current_parameter: Current position along the route

Returns:
(bool, float): Tuple containing:
- Whether the vehicle should start decelerating
- Adjusted speed if deceleration needed, otherwise desired_speed
"""
route_length = route.length()

distance_remaining = route_length - current_parameter

if DEBUG:
print(f"[DEBUG] check_end_of_route: Route length = {route_length}, "
f"Current position = {current_parameter}, "
f"Distance remaining = {distance_remaining}")

if distance_remaining <= self.end_stop_distance:
if distance_remaining > 0.1: # Avoid division by very small numbers
required_decel = (self.desired_speed ** 2) / (2 * distance_remaining)

linear_factor = distance_remaining / self.end_stop_distance
adjusted_speed = self.desired_speed * linear_factor
if DEBUG:
print(f"[DEBUG] End deceleration active: {distance_remaining:.2f}m remaining, "
f"required deceleration = {required_decel:.2f} m/s², "
f"speed adjusted to {adjusted_speed:.2f} m/s")

return True, adjusted_speed
else:
return True, 0.0

return False, self.desired_speed

def update(self, state: AllState):
vehicle = state.vehicle # type: VehicleState
route = state.route # type: Route
t = state.t

if DEBUG:
print("[DEBUG] CreepTrajectoryPlanner.update: t =", t)

if self.t_last is None:
self.t_last = t
dt = t - self.t_last
if DEBUG:
print("[DEBUG] CreepTrajectoryPlanner.update: dt =", dt)

curr_x = vehicle.pose.x
curr_y = vehicle.pose.y
curr_v = vehicle.v
if DEBUG:
print(f"[DEBUG] CreepTrajectoryPlanner.update: Vehicle position = ({curr_x}, {curr_y}), speed = {curr_v}, ")

# Determine progress along the route.
if self.route_progress is None:
self.route_progress = 0.0
_, closest_parameter = route.closest_point_local(
[curr_x, curr_y],
(self.route_progress - 5.0, self.route_progress + 5.0)
)
if DEBUG:
print("[DEBUG] CreepTrajectoryPlanner.update: Closest parameter on route =", closest_parameter)
self.route_progress = closest_parameter

# Check if approaching end of route and get adjusted speed
approaching_end, target_speed = self.check_end_of_route(route, closest_parameter)
if DEBUG and approaching_end:
print("[DEBUG] CreepTrajectoryPlanner.update: Vehicle is approaching end of route")
print(f"[DEBUG] CreepTrajectoryPlanner.update: Target speed = {target_speed}")

# Extract a 10 m segment of the route for planning lookahead.
route_with_lookahead = route.trim(closest_parameter, closest_parameter + 10.0)
if DEBUG:
print("[DEBUG] CreepTrajectoryPlanner.update: Route Lookahead =", route_with_lookahead)

print("[DEBUG] state", state.relations)
# Check whether any yield relations (e.g. due to pedestrians) require braking.
stay_braking = False
pointSet = set()
for i in range(len(route_with_lookahead.points)):
if tuple(route_with_lookahead.points[i]) in pointSet:
stay_braking = True
break
pointSet.add(tuple(route_with_lookahead.points[i]))

should_brake = any(
r.type == EntityRelationEnum.STOPPING_AT and r.obj1 == ''
for r in state.relations
)
should_decelerate = any(
r.type == EntityRelationEnum.YIELDING and r.obj1 == ''
for r in state.relations
) if should_brake == False else False

# If approaching end of route, override the target speed
if approaching_end:
should_accelerate = (not should_brake and not should_decelerate and curr_v < target_speed)
else:
should_accelerate = (not should_brake and not should_decelerate and curr_v < self.desired_speed)

if DEBUG:
print("[DEBUG] CreepTrajectoryPlanner.update: stay_braking =", stay_braking)
print("[DEBUG] CreepTrajectoryPlanner.update: should_brake =", should_brake)
print("[DEBUG] CreepTrajectoryPlanner.update: should_accelerate =", should_accelerate)
print("[DEBUG] CreepTrajectoryPlanner.update: should_decelerate =", should_decelerate)
print("[DEBUG] CreepTrajectoryPlanner.update: target_speed =", target_speed if approaching_end else self.desired_speed)

if stay_braking:
traj = longitudinal_brake(route_with_lookahead, 0.0, 0.0, 0.0)
if DEBUG:
print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_brake (stay braking).")
elif should_brake:
traj = longitudinal_brake(route_with_lookahead, self.emergency_brake, curr_v)
if DEBUG:
print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_brake.")
elif should_decelerate:
traj = longitudinal_brake(route_with_lookahead, self.deceleration, curr_v)
if DEBUG:
print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_brake.")
elif approaching_end:
# Use linear deceleration to stop at end of route
traj = longitudinal_plan(route_with_lookahead, self.acceleration,
self.deceleration, target_speed, curr_v)
if DEBUG:
print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_plan with end-of-route deceleration.")
elif should_accelerate:
traj = longitudinal_plan(route_with_lookahead, self.acceleration,
self.deceleration, self.desired_speed, curr_v)
if DEBUG:
print("[DEBUG] CreepTrajectoryPlanner.update: Using longitudinal_plan (accelerate).")
else:
# Maintain current speed if not accelerating or braking.
traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v)
if DEBUG:
print(
"[DEBUG] CreepTrajectoryPlanner.update: Maintaining current speed with longitudinal_plan (0 accel).")

self.t_last = t
if DEBUG:
print(f'[DEBUG] Current Velocity: {curr_v}, Target Speed: {target_speed if approaching_end else self.desired_speed}')
print("[DEBUG] CreepTrajectoryPlanner.update: Returning trajectory with", len(traj.points), "points.")
return traj
Loading